public class Data {
private int id;
//生产量
private int num;
public Data(int id,int num){
this.id=id;
this.num=num;
}
public Data(){
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
生产者
import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
//共享阻塞队列
private BlockingDeque queue;
//是否还在运行
private volatile boolean isRunning = true;
//id生成器原子操作
private static AtomicInteger count = new AtomicInteger();
// 生成随机数
private static Random random = new Random();
public Producer(BlockingDeque queue){
this.queue=queue;
}
@Override
public void run() {
try{
while(isRunning){
// 模拟生产耗时
Thread.sleep(random.nextInt(1000));
int num=count.incrementAndGet();
Data data=new Data(num,num);
System.out.println("当前>>生产者:"+Thread.currentThread().getName()+"生产量"+num);
if(!queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("生产失败...");
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public void stop(){
isRunning=false;
}
}
消费者
import java.util.Random;
import java.util.concurrent.BlockingDeque;
public class Consumer implements Runnable {
//双端队列,加入或者取出元素都是线程安全的
private BlockingDeque queue;
private static Random random=new Random();
public Consumer(BlockingDeque queue){
this.queue=queue;
}
@Override
public void run(){
while (true){
try{
// 检索并删除,如果需要等待、直到元素可用。
Data data= queue.take();
//模拟消费耗时
Thread.sleep(random.nextInt(1000));
if(data!=null){
System.out.println("当前<<消费者:"+Thread.currentThread().getName()+",消费量"+data.getNum());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
测试
import java.util.concurrent.*;
public class TestPro {
public static void main(String[] args) throws InterruptedException{
BlockingDeque queue = new LinkedBlockingDeque(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service= Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(3000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(1000);
service.shutdown();
}
}
结果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hBNwcyGG-1582708080156)(https://raw.githubusercontent.com/iszhonghu/Picture-bed/master/img/20200226105053.png)]
最后一次生产20此时所有的生产者都停止生产了,但是此时产品池还没空,于是消费者继续消费,直到把产品池中的数据消耗完
方法二:使用Object的wait()/notify()方法wait()/ nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait():当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。 notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。 生产者import java.util.Queue;
import java.util.Random;
public class Producer extends Thread {
private Queue queue;
String name;
int maxSize;
int i=0;
public Producer(String name,Queue queue,int maxSize){
super(name);
this.name=name;
this.queue=queue;
this.maxSize=maxSize;
}
@Override
public void run(){
while (true){
synchronized (queue){
while (queue.size()==maxSize){
try{
System.out.println("队列已经满了,生产者["+name+"]线程等待"+"消费者从队列中消费产品。");
queue.wait();
}catch (Exception e){
e.printStackTrace();
}
}
System.out.println("[" + name + "] 生产产品 : +" + i);
queue.offer(i++);
queue.notifyAll();
try{
Thread.sleep(new Random().nextInt(1000));
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
消费者
import java.util.Queue;
import java.util.Random;
public class Consumer extends Thread {
private Queue queue;
String name;
int maxSize;
public Consumer(String name,Queuequeue,int maxSize){
super(name);
this.name=name;
this.queue=queue;
this.maxSize=maxSize;
}
@Override
public void run(){
while (true){
synchronized (queue){
while (queue.isEmpty()){
try{
System.out.println("队列是空的 消费者[" + name + "] 等待生产者生产");
queue.wait();
}catch (Exception e){
e.printStackTrace();
}
}
int x=queue.poll();
System.out.println("[" + name + "] 消费产品 : " + x);
queue.notifyAll();
try{
Thread.sleep(new Random().nextInt(1000));
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
生产者消费者模式
import java.util.LinkedList;
import java.util.Queue;
/**
* 生产者消费者模式:使用Object.wait()/notify()方法实现
*/
public class ProdicerConsumer {
private static final int CAPACITY = 500;
public static void main(String[] args) {
Queue queue = new LinkedList();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
结果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2s2ckWoo-1582708080157)(https://raw.githubusercontent.com/iszhonghu/Picture-bed/master/img/20200226160351.png)]
注意要点 判断Queue大小为0或者大于queueSize时须使用while (condition) {}
,不能使用 if(condition) {}
。其中 while(condition)
循环,它又被叫做**“自旋锁”。为防止该线程没有收到notify()
调用也从wait()
中返回(也称作虚假唤醒**),这个线程会重新去检查condition条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。
方法三:使用Lock和Condition的await() / signal()方法
在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition接口的await()
和signal()
就是其中用来做同步的两种方法,它们的功能基本上和Object的wait()
/ nofity()
相同,完全可以取代它们,但是它们和新引入的锁定机制Lock
直接挂钩,具有更大的灵活性。通过在Lock
对象上调用newCondition()
方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerByLock {
private static final int CAPACITY = 5;
private static final Lock lock = new ReentrantLock();
private static final Condition fullCondition = lock.newCondition(); //队列满的条件
private static final Condition emptyCondition = lock.newCondition(); //队列空的条件
public static void main(String args[]){
Queue queue = new LinkedList();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生产者
*/
public static class Producer extends Thread{
private Queue queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//获得锁
lock.lock();
while(queue.size() == maxSize){
try {
System.out .println("队列已经满了,生产者["+name+"]线程等待"+"消费者从队列中消费产品。");
//条件不满足,生产阻塞
fullCondition.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] 生产产品 : +" + i);
queue.offer(i++);
//唤醒其他所有生产者、消费者
fullCondition.signalAll();
emptyCondition.signalAll();
//释放锁
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
public static class Consumer extends Thread{
private Queue queue;
String name;
int maxSize;
public Consumer(String name, Queue queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//获得锁
lock.lock();
while(queue.isEmpty()){
try {
System.out.println("队列是空的 消费者[" + name + "] 等待生产者生产");
//条件不满足,消费阻塞
emptyCondition.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] 消费产品 : " + x);
//唤醒其他所有生产者、消费者
fullCondition.signalAll();
emptyCondition.signalAll();
//释放锁
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
结果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QNVflgMO-1582708080158)(https://raw.githubusercontent.com/iszhonghu/Picture-bed/master/img/20200226164915.png)]