1.概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
2.321原则
三种角色:生产者、消费者、仓库
两种关系:生产者与生产者之间是互斥关系,消费者与消费者之间是互斥关系,生产者与消费者之间是同步与互斥关系。
一个交易场所:仓库
3.优点
- 解耦–生产者。消费者之间不直接通信,降低了耦合度。
- 支持并发
- 支持忙闲不均
4.PV原语描述
s1初始值为缓冲区大小、s2初始值为0
生产者:
生产一个产品;
P(s1);
送产品到缓冲区;
V(s2);
消费者:
P(s2);
从缓冲区取出产品;
V(s2);
消费水平;
5.代码实现
5.1 synchronized + wait() + notify() 方式
package ProducerAndConsumer;
import java.util.ArrayList;
import java.util.List;
class Produce extends Thread {
List<Object> productBuffer; // 产品缓冲区
int bufferCapacity; // 缓冲区容量
public Produce(int bufferCapacity,List<Object> productBuffer){
this.productBuffer = productBuffer;
this.bufferCapacity = bufferCapacity;
}
@Override
public void run() {
while (true){ // 生产行为
synchronized (productBuffer){ // 如果某一个生产者能执行进来,说明此线程具有productBuffer对象的控制权,其它线程(生产者&消费者)都必须等待
if(productBuffer.size()==bufferCapacity){ // 缓冲区满了
try {
productBuffer.wait(1); // 释放控制权并等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}else { // 缓冲区没满可以继续生产
productBuffer.add(new Object());
System.out.println("生产者生产了1件物品,当前缓冲区里还有" + productBuffer.size() + "件物品");
productBuffer.notifyAll(); // 唤醒等待队列中所有线程
}
}
// 模拟生产缓冲时间
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread{
List<Object> productBuffer; // 产品缓冲区
int bufferCapacity; // 缓冲区容量
public Consumer(int bufferCapacity,List<Object> productBuffer){
this.productBuffer = productBuffer;
this.bufferCapacity = bufferCapacity;
}
@Override
public void run(){
while (true){//消费行为
synchronized (productBuffer){
if(productBuffer.isEmpty()){ //产品缓冲区为空,不能消费,只能等待
try {
productBuffer.wait(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else { // 缓冲区没空可以继续消费
productBuffer.remove(0);
System.out.println("消费者消费了1个物品,当前缓冲区里还有" + productBuffer.size() + "件物品");
productBuffer.notifyAll();
}
}
// 模拟消费缓冲时间
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ProducerAndConsumer {
public static void main(String[] args) {
List<Object> productBuffer = new ArrayList<>(); // 产品缓冲区
int bufferCapacity = 3; // 缓冲区容量
for (int i = 0; i < 3; i++) {
new Produce(bufferCapacity,productBuffer).start();
}
for (int i = 0; i < 3; i++) {
new Consumer(bufferCapacity,productBuffer).start();
}
}
}
5.2 可重入锁ReentrantLock (配合Condition)方式
package ProducerAndConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class Produce extends Thread {
List<Object> productBuffer; // 产品缓冲区
int bufferCapacity; // 缓冲区容量
ReentrantLock lock; // 可重入锁
Condition producerCondition; // 生产者condition
Condition consumerCondition; // 消费者condition
public Produce(int bufferCapacity,List<Object> productBuffer,ReentrantLock lock,Condition producerCondition,Condition consumerCondition){
this.productBuffer = productBuffer;
this.bufferCapacity = bufferCapacity;
this.lock = lock;
this.producerCondition = producerCondition;
this.consumerCondition = consumerCondition;
}
@Override
public void run() {
while (true) { // 生产行为
lock.lock(); //加锁
if (productBuffer.size() == bufferCapacity) { // 缓冲区满了
try {
producerCondition.await(); // 释放控制权并等待
} catch (InterruptedException e) {
e.printStackTrace();
}
} else { // 缓冲区没满可以继续生产
productBuffer.add(new Object());
System.out.println("生产者生产了1件物品,当前缓冲区里还有" + productBuffer.size() + "件物品");
consumerCondition.signal(); // 唤醒消费者线程
}
lock.unlock(); //解锁
// 模拟生产缓冲时间
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread{
List<Object> productBuffer; // 产品缓冲区
int bufferCapacity; // 缓冲区容量
ReentrantLock lock; // 可重入锁
Condition producerCondition; // 生产者condition
Condition consumerCondition; // 消费者condition
public Consumer(int bufferCapacity,List<Object> productBuffer,ReentrantLock lock,Condition producerCondition,Condition consumerCondition){
this.productBuffer = productBuffer;
this.bufferCapacity = bufferCapacity;
this.lock = lock;
this.producerCondition = producerCondition;
this.consumerCondition = consumerCondition;
}
@Override
public void run(){
while (true){//消费行为
lock.lock();//加锁
if(productBuffer.isEmpty()){ //产品缓冲区为空,不能消费,只能等待
try {
consumerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else { // 缓冲区没空可以继续消费
productBuffer.remove(0);
System.out.println("消费者消费了1个物品,当前缓冲区里还有" + productBuffer.size() + "件物品");
producerCondition.signal(); // 唤醒生产者线程继续生产
}
lock.unlock(); //解锁
// 模拟消费缓冲时间
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ProducerAndConsumer {
public static void main(String[] args) {
List<Object> productBuffer = new ArrayList<>(); // 产品缓冲区
int bufferCapacity = 3; // 缓冲区容量
ReentrantLock lock = new ReentrantLock(); // 可重入锁
Condition producerCondition = lock.newCondition(); // 生产者condition
Condition consumerCondition = lock.newCondition(); // 消费者condition
for (int i = 0; i < 3; i++) {
new Produce(bufferCapacity,productBuffer,lock,producerCondition,consumerCondition).start();
}
for (int i = 0; i < 3; i++) {
new Consumer(bufferCapacity,productBuffer,lock,producerCondition,consumerCondition).start();
}
}
}
评论 (0)