生产者消费者模式java的三种实现

Thea ·
更新时间:2024-11-10
· 821 次阅读

生产者消费者 介绍 优点 可以解耦生产者和消费者,互相之间不会产生影响 支持并发操作,生产者只管生产数据,生产的数据放到缓冲区中,而不需要等消费者消费完再生产下一个数据,不会造成阻塞 支持忙闲不均 数据单元 特性 关联到业务对象 完整性 独立性 颗粒度 设计 缓冲区一般使用阻塞队列,当队列满时会阻塞生产者继续生产数据,直到有消费者来消费了数据。当队列为空时也会阻塞消费者继续消费 生产消费者问题是个非常典型的多线程问题,涉及到的对象包括“生产者”,“消费者”,“仓库”,“产品” 关系 生产者只有在仓库未满时生产,仓满就停止生产 消费者仅仅在仓库有产品的时候才能消费,仓空就等待 当消费者发现仓库没产品消费的时候通知生产者生产 生产者在生成出产品后通知等待的消费者去消费 wait/notify方法 sleep()是Thread类的方法,而wait(),notify(),notifyAll()是Object类中定义的方法,尽管这两个方法都会影响线程的执行行为,但是本质上是有区别的 Thread.sleep()不会导致锁行为的改变,可以简单的认为,与锁相关的方法都定义在Object类中,因此调用Thread.sleep()是不会影响锁的相关行为 Thread.sleep和object.wait都会暂停当前的线程,对于CPU资源来说,不管是哪种方式暂停线程,都表示它暂时不需要CPU的执行时间,OS会将执行时间分配给其他线程。区别是调用wait后,需要别的线程执行notify/notifyAll才能重新获得CPU执行时间 实现生产者消费者模型 BlockingQueue阻塞队列方法 Object的wait()/notify()方法 Lock和Condition的await()/signal()方法 java实现 方法一:使用BlockingQueue阻塞队列方法 数据类Data public class Data { private int id; //生产量 private int num; public Data(int id,int num){ this.id=id; this.num=num; } public Data(){ } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } } 生产者 import java.util.Random; import java.util.concurrent.BlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { //共享阻塞队列 private BlockingDeque queue; //是否还在运行 private volatile boolean isRunning = true; //id生成器原子操作 private static AtomicInteger count = new AtomicInteger(); // 生成随机数 private static Random random = new Random(); public Producer(BlockingDeque queue){ this.queue=queue; } @Override public void run() { try{ while(isRunning){ // 模拟生产耗时 Thread.sleep(random.nextInt(1000)); int num=count.incrementAndGet(); Data data=new Data(num,num); System.out.println("当前>>生产者:"+Thread.currentThread().getName()+"生产量"+num); if(!queue.offer(data,2, TimeUnit.SECONDS)){ System.out.println("生产失败..."); } } }catch (Exception e){ e.printStackTrace(); } } public void stop(){ isRunning=false; } } 消费者 import java.util.Random; import java.util.concurrent.BlockingDeque; public class Consumer implements Runnable { //双端队列,加入或者取出元素都是线程安全的 private BlockingDeque queue; private static Random random=new Random(); public Consumer(BlockingDeque queue){ this.queue=queue; } @Override public void run(){ while (true){ try{ // 检索并删除,如果需要等待、直到元素可用。 Data data= queue.take(); //模拟消费耗时 Thread.sleep(random.nextInt(1000)); if(data!=null){ System.out.println("当前<<消费者:"+Thread.currentThread().getName()+",消费量"+data.getNum()); } }catch (Exception e){ e.printStackTrace(); } } } } 测试 import java.util.concurrent.*; public class TestPro { public static void main(String[] args) throws InterruptedException{ BlockingDeque queue = new LinkedBlockingDeque(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); ExecutorService service= Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(3000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(1000); service.shutdown(); } } 结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hBNwcyGG-1582708080156)(https://raw.githubusercontent.com/iszhonghu/Picture-bed/master/img/20200226105053.png)]

最后一次生产20此时所有的生产者都停止生产了,但是此时产品池还没空,于是消费者继续消费,直到把产品池中的数据消耗完

方法二:使用Object的wait()/notify()方法

wait()/ nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。

wait():当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。 notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。 生产者 import java.util.Queue; import java.util.Random; public class Producer extends Thread { private Queue queue; String name; int maxSize; int i=0; public Producer(String name,Queue queue,int maxSize){ super(name); this.name=name; this.queue=queue; this.maxSize=maxSize; } @Override public void run(){ while (true){ synchronized (queue){ while (queue.size()==maxSize){ try{ System.out.println("队列已经满了,生产者["+name+"]线程等待"+"消费者从队列中消费产品。"); queue.wait(); }catch (Exception e){ e.printStackTrace(); } } System.out.println("[" + name + "] 生产产品 : +" + i); queue.offer(i++); queue.notifyAll(); try{ Thread.sleep(new Random().nextInt(1000)); }catch (InterruptedException e){ e.printStackTrace(); } } } } } 消费者 import java.util.Queue; import java.util.Random; public class Consumer extends Thread { private Queue queue; String name; int maxSize; public Consumer(String name,Queuequeue,int maxSize){ super(name); this.name=name; this.queue=queue; this.maxSize=maxSize; } @Override public void run(){ while (true){ synchronized (queue){ while (queue.isEmpty()){ try{ System.out.println("队列是空的 消费者[" + name + "] 等待生产者生产"); queue.wait(); }catch (Exception e){ e.printStackTrace(); } } int x=queue.poll(); System.out.println("[" + name + "] 消费产品 : " + x); queue.notifyAll(); try{ Thread.sleep(new Random().nextInt(1000)); }catch (InterruptedException e){ e.printStackTrace(); } } } } } 生产者消费者模式 import java.util.LinkedList; import java.util.Queue; /** * 生产者消费者模式:使用Object.wait()/notify()方法实现 */ public class ProdicerConsumer { private static final int CAPACITY = 500; public static void main(String[] args) { Queue queue = new LinkedList(); Thread producer1 = new Producer("P-1", queue, CAPACITY); Thread producer2 = new Producer("P-2", queue, CAPACITY); Thread consumer1 = new Consumer("C1", queue, CAPACITY); Thread consumer2 = new Consumer("C2", queue, CAPACITY); Thread consumer3 = new Consumer("C3", queue, CAPACITY); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); consumer3.start(); } } 结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2s2ckWoo-1582708080157)(https://raw.githubusercontent.com/iszhonghu/Picture-bed/master/img/20200226160351.png)]

注意要点 判断Queue大小为0或者大于queueSize时须使用 while (condition) {},不能使用 if(condition) {}。其中 while(condition)循环,它又被叫做**“自旋锁”。为防止该线程没有收到notify()调用也从wait()中返回(也称作虚假唤醒**),这个线程会重新去检查condition条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。 方法三:使用Lock和Condition的await() / signal()方法

在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition接口的await()signal()就是其中用来做同步的两种方法,它们的功能基本上和Object的wait()/ nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

生产者消费者模式:使用Lock和Condition实现 import java.util.LinkedList; import java.util.Queue; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerByLock { private static final int CAPACITY = 5; private static final Lock lock = new ReentrantLock(); private static final Condition fullCondition = lock.newCondition(); //队列满的条件 private static final Condition emptyCondition = lock.newCondition(); //队列空的条件 public static void main(String args[]){ Queue queue = new LinkedList(); Thread producer1 = new Producer("P-1", queue, CAPACITY); Thread producer2 = new Producer("P-2", queue, CAPACITY); Thread consumer1 = new Consumer("C1", queue, CAPACITY); Thread consumer2 = new Consumer("C2", queue, CAPACITY); Thread consumer3 = new Consumer("C3", queue, CAPACITY); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); consumer3.start(); } /** * 生产者 */ public static class Producer extends Thread{ private Queue queue; String name; int maxSize; int i = 0; public Producer(String name, Queue queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ //获得锁 lock.lock(); while(queue.size() == maxSize){ try { System.out .println("队列已经满了,生产者["+name+"]线程等待"+"消费者从队列中消费产品。"); //条件不满足,生产阻塞 fullCondition.await(); } catch (InterruptedException ex) { ex.printStackTrace(); } } System.out.println("[" + name + "] 生产产品 : +" + i); queue.offer(i++); //唤醒其他所有生产者、消费者 fullCondition.signalAll(); emptyCondition.signalAll(); //释放锁 lock.unlock(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 消费者 */ public static class Consumer extends Thread{ private Queue queue; String name; int maxSize; public Consumer(String name, Queue queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ //获得锁 lock.lock(); while(queue.isEmpty()){ try { System.out.println("队列是空的 消费者[" + name + "] 等待生产者生产"); //条件不满足,消费阻塞 emptyCondition.await(); } catch (Exception ex) { ex.printStackTrace(); } } int x = queue.poll(); System.out.println("[" + name + "] 消费产品 : " + x); //唤醒其他所有生产者、消费者 fullCondition.signalAll(); emptyCondition.signalAll(); //释放锁 lock.unlock(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } } 结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QNVflgMO-1582708080158)(https://raw.githubusercontent.com/iszhonghu/Picture-bed/master/img/20200226164915.png)]


作者:冢狐



java的 JAVA

需要 登录 后方可回复, 如果你还没有账号请 注册新账号