Java同時実行(基礎知識):ブロッキングキューと生産者消費者モデル

4545 ワード

1、ブロックキュー
BlockingQueueはスレッドセキュリティのQueueバージョンであり、その名前からブロックをサポートするQueue実装であることがわかります.空のBlockingQueueにデータを要求すると、BlockingQueueに非空にブロックされます.フルBlockingQueueにデータを挿入すると、スレッドはBlockingQueue挿入可能にブロックされます.
BlockingQueueの方法は4つの形式で現れ、すぐには満たされないが将来のある時点で満たされる可能性のある操作に対して、この4つの形式の処理方式は異なる.1つ目は異常を投げ出すことであり、2つ目は特殊な値(nullまたはfalse、具体的には操作に依存する)を返すことであり、3つ目は操作が成功する前に、現在のスレッドは無期限にブロックされ、4つ目は、放棄する前に所定の最大時間制限内にのみブロックされる.次の表に、これらの方法をまとめます.
 
異常を投げ出す
特殊値
ブロッキング
タイムアウト
挿入
add(e)
offer(e)
put(e)
offer(e, time, unit)
削除
remove()
poll()
take()
take(time, unit)
けんさ
element()
peek()
 
 
BlockingQueueはnull要素を受け入れません.null要素をadd、put、offerで試みた場合、いくつかのインプリメンテーションはNull PointerExceptionを放出します.nullはpoll操作の失敗を示す警戒値として使用される.
BlockingQueueは、容量を限定してもよい.任意の所定の時間にremainingCapacityを持つことができ、この容量を超えると、ブロックなしにputに要素を追加することはできません.内部容量制約のないBlockingQueueは常にIntegerに報告する.MAX_VALUEの残容量.
JDKでは、以下のブロックキュー実装が提供されています.
  • ArrayBlockingQueue:配列構造からなる境界ブロックキュー.
  • LinkedBlockingQueue:チェーンテーブル構造からなる境界ブロックキュー.
  • PriorityBlockingQueue:優先順位ソートをサポートする無境界ブロックキュー.
  • DelayQueue:遅延期間が満了してこそデータを取り出すことができる無境界ブロックキュー.
  • SynchronousQueue:要素を格納しないブロックキュー.

  • 2、生産者消費者モデル
    ブロッキング・キューは、「完了する必要がある作業を特定する」と「作業を実行する」の2つのプロセスを分離し、作業項目を「完了する」リストに挿入して、発見後すぐに処理するのではなく、後で処理する生産者-消費者モードをサポートします.生産者-消費者モデルは、生産者クラスと消費者クラスの間のコード依存性を排除するため、開発プロセスを簡略化することができ、また、このモデルは、データを処理する速度が異なるため、データを使用するプロセスとデータを使用するプロセスをデカップリングしてワークロード管理を簡素化することができる.
    ブロックキューに基づいて構築された生産者−消費者設計では、データ生成時に生産者はデータをキューに入れ、消費者がデータを処理する準備をすると、キューからデータを取得する.生産者は、消費者のIDや数、またはそれらが唯一の生産者であるかどうかを知る必要はありません.データをキューに入れるだけです.同様に、消費者も生産者が誰なのか、あるいは仕事がどこから来たのかを知る必要はありません.BlockingQueueは、任意の数の生産者と消費者をサポートする生産者-消費者設計の実現プロセスを簡素化します.一般的な生産者-消費者設計モデルの1つは、スレッドプールとワークキューの組み合わせであり、Executorタスク実行フレームワークでこのモデルが体現されています.
    ブロックキューは、take操作が利用可能なデータがあるまでブロックされるため、生産者ができるだけ早く作業項目を生成して消費者を忙しくさせることができなければ、消費者は仕事があるまで待つしかないため、生産者-消費者モードの符号化を簡略化する.同様に、put操作も符号化を簡略化することができ、ブロックキューを使用すると、キューが満たされると、生産者はブロックされ、作業を継続することができず、消費者は生産者の速度に追いつく時間がある.
    3、デスクトップ検索例
    一部のデスクトップ検索プログラムと同様に、ローカルドライブ上のファイルと履歴書インデックスをスキャンして検索するエージェントなど、生産者と消費者に分解するのに適したタイプのプログラムがあります.次のコードでは、CrawlerThreadでは、ファイル階層内でインデックス基準を満たすファイルを検索し、その名前をワークキューに配置する生産者タスクが与えられます.IndexerThreadでは、キューからファイル名を取り出し、インデックスを作成する消費者タスクが与えられています.
    例は「Java同時プログラミング実戦」から来ている.
    class CrawlerThread extends Thread {
    	private final BlockingQueue<File> fileQueue;
    	private final File root;
    
    	public CrawlerThread(BlockingQueue<File> fileQueue, File root) {
    		super();
    		this.fileQueue = fileQueue;
    		this.root = root;
    	}
    
    	public void run() {
    		try {
    			crawl(root);
    		} catch (InterruptedException e) {
    			Thread.currentThread().interrupt();
    		}
    	}
    
    	private void crawl(File root) throws InterruptedException {
    		File[] entries = root.listFiles();
    		if (entries != null) {
    			for (File entry : entries) {
    				if (entry.isDirectory())
    					crawl(entry);
    				else if (!fileQueue.contains(entry))
    					fileQueue.put(entry);
    			}
    		}
    	}
    }
    
    class IndexerThread extends Thread {
    	private final BlockingQueue<File> queue;
    
    	public IndexerThread(BlockingQueue<File> queue) {
    		super();
    		this.queue = queue;
    	}
    
    	public void run() {
    		try {
    			while (true) {
    				indexFile(queue.take());
    			}
    		} catch (InterruptedException consumed) {
    			Thread.currentThread().interrupt();
    		}
    	}
    
    	public void indexFile(File file) {
    		/* ... */
    	};
    }

    生産者-消費者モデルはスレッドに適した方法でデスクトップ検索問題をより簡単なコンポーネントに分解し、ファイル遍歴やインデックスの作成などの機能を独立した操作に分解し、すべての機能を1つの操作に配置するよりもコードの可読性と再利用性が高くなります.各操作は1つのタスクを完了するだけで、また、ブロックキューはすべての制御フローを担当するため、各機能のコードはより簡単で明確になります.
    生産者-消費者モデルは、多くのパフォーマンスメリットをもたらします.生産者と消費者は同時に実行することができ、I/O密集型、CPU密集型の場合、並列実行のスループットはシリアル実行のスループットより高い.生産者と消費者の並列度が異なる場合、それらを結合すると、全体の並列度が両者の中でより小さな並列度に低下します.
    次のコードは、複数の検索スレッドとインデックススレッドを起動するために使用されます.
    public static void main(String[] args) {
    		final int N_CONCUMERS = 10;
    		File[] roots = ...;
    		
    		BlockingQueue<File> queue = new LinkedBlockingQueue<File>(1000);
    		for(File root : roots)
    			new CrawlerThread(queue, root).start();
    		
    		for(int i = 0; i < N_CONCUMERS; i++)
    			new IndexerThread(queue).start();
    	}