JAVA并发编程使用ArrayBlockingQueue实现生产者消费者模型
在学习多线程时,有一个模型大家应该非常熟悉,就是生产者和消费者模型,可以对可享变量进行保护,从而实现多线程的同步与互斥。今天用并发集合框架来实现这个模型。在介绍之前,先来看一个并发框架类ArrayBlockingQueue,它可以看作是一个容器,需要指定它的大小,把它看作是一个公共存储的地方,应该在生产者与消费者的模型中,它的大致意思如下:
生产者:如果空间够,就存放,否则等待;
消费者:如果有物品,就拿走,否则等待。
对应的,它有两个常用的方法,put()和take()方法,对应放对象和取走对象。
现在我们做一个示例,大家就明白了,具体的实现过程,我在并发框架的源代码解析中去分析它是如何实现多线程的同步与互斥的。
我们假设有一个篮子,最多可以放3个苹果,有多个人可以放苹果,也有多个人可以拿走苹果。
public class Apple { private String appName; public Apple(String appName){ this .appName =appName ; } public String getAppName() { return appName ; } public void setAppName(String appName) { this .appName = appName ; } public String toString(){ return this .appName ; } } public class Bascket { //共享存储 ArrayBlockingQueue bascket= new ArrayBlockingQueue(3); public void add(Apple apple) throws InterruptedException{ if (bascket .size()>=3){ System. out .println("add block,please waiting..." ); } bascket.put( apple); } public Apple get() throws InterruptedException{ if (bascket .size()==0){ System. out .println("get block,there is no food..." ); } return (Apple)bascket .take(); } } //生产者 public class Producer implements Runnable{ private Bascket bascket ; private String name ; public Producer(Bascket bascket ,String name ){ this .bascket =bascket ; this .name =name ; } public void run(){ while (true ){ try { System. out .println(name +"produce.." ); bascket.add( new Apple("name" +Math.random()*100)); } catch (InterruptedException e ) { // TODO Auto-generated catch block e.printStackTrace(); } try { Thread. sleep(1000); } catch (InterruptedException e ) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //消费者 public class Consumer implements Runnable{ private Bascket bascket ; private String name ; public Consumer(Bascket bascket ,String name ){ this .bascket =bascket ; this .name =name ; } public void run(){ while (true ){ try { System. out .println(name +":consumer" +bascket .get()); } catch (InterruptedException e1 ) { // TODO Auto-generated catch block e1.printStackTrace(); } try { Thread. sleep(1000); } catch (InterruptedException e ) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
测试代码:
public class TestDemo { public static void main(String args[]){ Bascket bascket= new Bascket(); Consumer c1= new Consumer(bascket ,"c1" ); Producer p1= new Producer(bascket ,"p1" ); Producer p2= new Producer(bascket ,"p2" ); //线程池管理 ExecutorService service = Executors. newCachedThreadPool(); service.execute( c1); service.execute( p1); service.execute( p2); } }
我们使用同步代码块或者同步关键字来实现多线程的同步问题,并发框架其实在内部帮助我们已经做了,我们只需要把它当作一个工具来使用,具体的实现过程, 我会在并发框架中它一一对它们的实现原理进行分析。