Javaの生産者-消費者モデル(Producer and Consumer Pattern in Java)

27078 ワード

生産者-消費者モデルはマルチスレッド問題の古典的な問題であり、面接のよくある問題でもある.いくつかの一般的な実装方法があります.
1. wait()/notify()
2. lock & condition
3. BlockingQueue
 
以下、逐一分析します.
 
1. wait()/notify()
第1の実装は、ルートクラスObjectの2つの方法wait()/notify()を用いて、スレッドの実行を停止または起動する.これも最も原始的な実現です.
 1 public class WaitNotifyBroker<T> implements Broker<T> {

 2 

 3     private final Object[] items;

 4 

 5     private int takeIndex;

 6     private int putIndex;

 7     private int count;

 8 

 9     public WaitNotifyBroker(int capacity) {

10         this.items = new Object[capacity];

11     }

12 

13     @SuppressWarnings("unchecked")

14     @Override

15     public T take() {

16         T tmpObj = null;

17         try {

18             synchronized (items) {

19                 while (0 == count) {

20                     items.wait();

21                 }

22                 tmpObj = (T) items[takeIndex];

23                 if (++takeIndex == items.length) {

24                     takeIndex = 0;

25                 }

26                 count--;

27                 items.notify();

28             }

29         } catch (InterruptedException e) {

30             e.printStackTrace();

31         }

32 

33         return tmpObj;

34     }

35 

36     @Override

37     public void put(T obj) {

38         try {

39             synchronized (items) {

40                 while (items.length == count) {

41                     items.wait();

42                 }

43 

44                 items[putIndex] = obj;

45                 if (++putIndex == items.length) {

46                     putIndex = 0;

47                 }

48                 count++;

49                 items.notify();

50             }

51         } catch (InterruptedException e) {

52             e.printStackTrace();

53         }

54 

55     }

56 

57 }

ここではArrayを用いてBufferを構築してデータにアクセスし,count,putIndex,takeIndexを用いてFirst−In−Firsst−Outを保証した.
Arrayの代わりにLinkedListを利用すれば、比較的簡単です.
LinkedListの実装は、『Java 7 Concurrency Cookbook』第2章wait/notifyを参照してください.
 
 
2. lock & condition
lock&conditionは、synchronizedやwait()/notify()のような機能も実際に実現されていますが、ロックの追加やロックの解除、一時停止、目覚ましの面では、より繊細で制御可能です.
JDKのBlockingQueueのデフォルト実装でもlock&conditionを利用しています.この文書もlock&conditionを利用してBlockingQueueを書く方法を詳しく紹介しています.ここでLinkedListを変えてもう一度実現します.
 1 public class LockConditionBroker<T> implements Broker<T> {

 2 

 3     private final ReentrantLock lock;

 4     private final Condition notFull;

 5     private final Condition notEmpty;

 6     private final int capacity;

 7     private LinkedList<T> items;

 8 

 9     public LockConditionBroker(int capacity) {

10         this.lock = new ReentrantLock();

11         this.notFull = lock.newCondition();

12         this.notEmpty = lock.newCondition();

13         this.capacity = capacity;

14 

15         items = new LinkedList<T>();

16     }

17 

18     @Override

19     public T take() {

20         T tmpObj = null;

21         lock.lock();

22         try {

23             while (items.size() == 0) {

24                 notEmpty.await();

25             }

26 

27             tmpObj = items.poll();

28             notFull.signalAll();

29 

30         } catch (InterruptedException e) {

31             e.printStackTrace();

32         } finally {

33             lock.unlock();

34         }

35         return tmpObj;

36     }

37 

38     @Override

39     public void put(T obj) {

40         lock.lock();

41         try {

42             while (items.size() == capacity) {

43                 notFull.await();

44             }

45 

46             items.offer(obj);

47             notEmpty.signalAll();

48 

49         } catch (InterruptedException e) {

50             e.printStackTrace();

51         } finally {

52             lock.unlock();

53         }

54 

55     }

56 }

 
 
3. BlockingQueue
最後にこの方法も、最も簡単で最もお勧めです.提供するツールを利用して発注する:キューをブロックし、ブロックの論理をBlockingQueueに渡す.
実際,上記1と2の方法で実現されるBrokerクラスは,単純なブロックキューと見なすこともできるが,標準パッケージほど完備していない.
 1 public class BlockingQueueBroker<T> implements Broker<T> {

 2 

 3     private final BlockingQueue<T> queue;

 4 

 5     public BlockingQueueBroker() {

 6         this.queue = new LinkedBlockingQueue<T>();

 7     }

 8 

 9     @Override

10     public T take() {

11         try {

12             return queue.take();

13         } catch (InterruptedException e) {

14             e.printStackTrace();

15         }

16 

17         return null;

18     }

19 

20     @Override

21     public void put(T obj) {

22         try {

23             queue.put(obj);

24         } catch (InterruptedException e) {

25             e.printStackTrace();

26         }

27     }

28 

29 }

私たちのキューはマークアップパッケージのLinkedBlockingQueueをカプセル化しており、非常に簡単で効率的です.
 
次に、1 P 2 Cの例を示します.
 1 public interface Broker<T> {

 2 

 3     T take();

 4 

 5     void put(T obj);

 6 

 7 }

 8 

 9 

10 public class Producer implements Runnable {

11 

12     private final Broker<Integer> broker;

13     private final String name;

14 

15     public Producer(Broker<Integer> broker, String name) {

16         this.broker = broker;

17         this.name = name;

18     }

19 

20     @Override

21     public void run() {

22         try {

23             for (int i = 0; i < 5; i++) {

24                 broker.put(i);

25                 System.out.format("%s produced: %s%n", name, i);

26                 Thread.sleep(1000);

27             }

28             broker.put(-1);

29             System.out.println("produced termination signal");

30         } catch (InterruptedException e) {

31             e.printStackTrace();

32             return;

33         }

34 

35     }

36 

37 }

38 

39 

40 public class Consumer implements Runnable {

41 

42     private final Broker<Integer> broker;

43     private final String name;

44 

45     public Consumer(Broker<Integer> broker, String name) {

46         this.broker = broker;

47         this.name = name;

48     }

49 

50     @Override

51     public void run() {

52         try {

53             for (Integer message = broker.take(); message != -1; message = broker.take()) {

54                 System.out.format("%s consumed: %s%n", name, message);

55                 Thread.sleep(1000);

56             }

57             System.out.println("received termination signal");

58         } catch (InterruptedException e) {

59             e.printStackTrace();

60             return;

61         }

62 

63     }

64 

65 }

66 

67 

68 public class Main {

69 

70     public static void main(String[] args) {

71         Broker<Integer> broker = new WaitNotifyBroker<Integer>(5);

72 //         Broker<Integer> broker = new LockConditionBroker<Integer>(5);

73 //         Broker<Integer> broker = new BlockingQueueBroker<Integer>();

74 

75         new Thread(new Producer(broker, "prod 1")).start();

76         new Thread(new Consumer(broker, "cons 1")).start();

77         new Thread(new Consumer(broker, "cons 2")).start();

78 

79     }

80 

81 }

 
上記の方法以外にも、実は多くの第三者の共同発注がこの問題を解決することができます.例えばLMAX DisruptorやChronicleなど
 
本文が終わる.
 
参照先:
《Java 7 Concurrency Cookbook》