SynchronousQueue使用例

4730 ワード

シーケンス
本文は主にSynchronousQueueについて述べる.
定義#テイギ#
SynchronousQueueは、キュー内の要素のストレージスペースを維持しないため、実際には真のキューではありません.他のキューとは異なり、要素をキューに追加または削除するのを待っているスレッドのセットが維持されます.
皿を洗うたとえを例にとると、これは皿棚がなく、洗った皿を次の空き乾燥機に直接入れることに相当する.このようなキューの実装方法は奇妙に見えますが、作業を直接提供できるため、生産者から消費者へのデータ移動の遅延が低減されます.(従来のキューでは、1つのワークユニットが納品できる前に、シリアル方式で最初にエンキュー(Enqueue)やデキュー(Dequeue)などの操作を完了する必要があります.)
ダイレクト・デリバリ方式では、タスク・ステータスに関する詳細な情報も生産者にフィードバックされます.納品が受け入れられると、消費者は簡単にタスクをキューに入れるのではなく、タスクを手に入れたことを知っています.この違いは、ファイルを同僚に直接渡すか、ファイルをメールボックスに入れて、できるだけ早くファイルを手に入れることを望んでいます.
SynchronousQueueにはストレージ機能がないため、putとtakeは別のスレッドが納品プロセスに参加する準備ができているまでブロックされます.同期キューは、十分な消費者があり、常に1人の消費者が納品の準備ができている場合にのみ使用できます.
≪インスタンス|Instance|emdw≫
public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue synchronousQueue = new SynchronousQueue();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

データを挿入するスレッドと、データを取得するスレッドを交互に実行します.
シーンの適用
Executors.newCachedThreadPool()
 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue(),
                                      threadFactory);
    }

ThreadPoolExecutor内部実装タスクのコミット時に呼び出されるのは、ワークキュー(BlockingQueueインタフェースの実装クラス)の非ブロック入キューメソッドである(offerメソッド)なので、SynchronousQueueをワークキューとして使用する前提で、クライアントコードがスレッドプールにタスクをコミットする場合、スレッドプールに空きスレッドがなく、SynchronousQueueインスタンスからタスクを取り出すことができ、対応するofferメソッド呼び出しが失敗する(つまり、タスクはワークキューに格納されていません).この場合、ThreadPoolExecutorは、このキューに失敗したタスクを処理するために新しいワーカースレッドを新規作成します(スレッドプールのサイズが最大スレッドプールのサイズに達していないと仮定します).
したがって、SynchronousQueueをワークキューとして使用すると、ワークキュー自体は実行するタスクの数を制限しません.ただし、Integerではなく、スレッドプールの最大サイズを合理的な有限値とする必要がある.MAX_VALUE、そうでないと、スレッドプール内のワーカースレッドの数がシステムリソースに耐えられないまで増加する可能性があります.
アプリケーションが実際に大きなワークキュー容量を必要とし、無制限なワークキューによる問題を回避するには、SynchronousQueueを考慮してください.SynchronousQueue実装ではキャッシュスペースは使用されません.
SynchronousQueueを使用する目的は、「コミットされたタスクに対して、空きスレッドがある場合は空きスレッドを使用して処理し、そうでない場合は新しいスレッドを作成してタスクを処理する」ことを保証することです.
doc
  • A Guide to Java SynchronousQueue
  • SynchronousQueue Example in Java - Produer Consumer Solution
  • Java SynchronousQueue