Javaマルチスレッド12同期ツールクラスExchanger

6158 ワード

Javaマルチスレッドディレクトリ

1 Exchangerの紹介


先にCyclicBarrier、CountDownLatch、Semaphoreについて説明しましたが、コンカレントツールクラスの最後のExchangeについて説明します.Exchangerはスレッド間のコラボレーションに使用されるツールクラスで、Exchangerはスレッド間のデータ交換に使用され、同期ポイントを提供します.この同期ポイントでは、2つのスレッドが互いのデータを交換することができます.この2つのスレッドはexchangeメソッドでデータを交換します.最初のスレッドが先にexchangeメソッドを実行すると、2番目のスレッドもexchangeメソッドを実行するのをずっと待っています.2つのスレッドが同期ポイントに達すると、この2つのスレッドはデータを交換できます.
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
以上の説明では、いくつかのポイントを示します.
  • のような対外的な動作は同期されている.
  • は、出現したスレッド間でデータを交換するために使用される.
  • は、双方向の同期キューと見なすことができる.
  • は、遺伝子アルゴリズム、流水線設計などのシーンに適用することができる.次にapiドキュメントを見て、このクラスは対外的なインタフェースを提供して非常に簡潔で、1つのパラメトリック構造関数、2つのリロードのモデルexchange方法:
  • public V exchange(V x) throws InterruptedException
    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
    

    2 Exchangerインスタンス

    public class ExchangerTest {
        public static void main(String[] args) {
            ExecutorService executor = Executors.newCachedThreadPool();
            final Exchanger exchanger = new Exchanger();
            executor.execute(new Runnable() {
                String data = "data1";
    
                @Override
                public void run() {
                    doExchangeWork(data, exchanger);
                }
            });
    
            executor.execute(new Runnable() {
                String data = "data2";
    
                @Override
                public void run() {
                    doExchangeWork(data, exchanger);
                }
            });
            executor.shutdown();
        }
    
        private static void doExchangeWork(String data, Exchanger exchanger) {
            try {
                System.out.println(Thread.currentThread().getName() + "  " + data + "  ");
                Thread.sleep((long) (Math.random() * 1000));
    
                String exchangeData = (String) exchanger.exchange(data);
                System.out.println(Thread.currentThread().getName() + "   " + exchangeData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    pool-1-thread-1  data1  
    pool-1-thread-2  data2  
    pool-1-thread-2   data1
    pool-1-thread-1   data2
    

    スレッドAがExchangeオブジェクトのexchange()メソッドを呼び出すと、スレッドBもexchange()メソッドを呼び出すまでブロックされ、スレッドが安全にデータを交換した後、スレッドAとBが実行されます.
    Exchange待機タイムアウト
    public class ExchangerTest {
        public static void main(String[] args) {
            ExecutorService executor = Executors.newCachedThreadPool();
            final Exchanger exchanger = new Exchanger();
            executor.execute(new Runnable() {
                String data = "data1";
    
                @Override
                public void run() {
                    doExchangeWork(data, exchanger);
                }
            });
    
            executor.execute(new Runnable() {
                String data = "data2";
    
                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (3000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    doExchangeWork(data, exchanger);
                }
            });
            executor.shutdown();
        }
    
        private static void doExchangeWork(String data, Exchanger exchanger) {
            try {
                System.out.println(Thread.currentThread().getName() + "  " + data + "  ");
    
                // 3 
                String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS);
                System.out.println(Thread.currentThread().getName() + "   " + exchangeData);
            } catch ( TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    pool-1-thread-1  data1  
    java.util.concurrent.TimeoutException
        at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
        at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
        at ExchangerTest.access$000(ExchangerTest.java:3)
        at ExchangerTest$1.run(ExchangerTest.java:12)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    pool-1-thread-2  data2  
    java.util.concurrent.TimeoutException
        at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
        at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
        at ExchangerTest.access$000(ExchangerTest.java:3)
        at ExchangerTest$2.run(ExchangerTest.java:26)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    実戦シーンは定時タスクを設計し、毎日未明に実行します.タイミングタスクで2つのスレッドを開始します.1つのスレッドは、ビジネス集計表(xxx_info)のクエリー統計を担当し、統計の結果をメモリバッファに配置し、もう1つのスレッドは、バッファ内の統計結果を読み取り、ビジネス統計表(xxx_statistics)に挿入します.親、このようなシーンは感じがしますか?間違いない!2つのスレッドはメモリの中で大量にデータを交換して、この事は私達はExchangerを使ってすることができます!

    3実現原理


    Exchanger(エクスチェンジ)は、スレッド間のコラボレーションのためのツールクラスです.Exchangerはスレッド間のデータ交換に使用します.2つのスレッドが互いにデータを交換できる同期ポイントを提供します.この2つのスレッドはexchangeメソッドでデータを交換し、最初のスレッドが先にexchangeメソッドを実行すると、2番目のスレッドもexchangeを実行するのをずっと待っています.2つのスレッドが同期点に達すると、この2つのスレッドはデータを交換し、このスレッドで生成されたデータを相手に渡すことができます.そこでExchangerを使用するポイントは,ペアのスレッドに対してexchange()メソッドを用い,1対のスレッドが同期点に達するとデータを交換することである.したがって、このツールクラスのスレッドオブジェクトはペアになります.Exchangerクラスには、String exchange(Vx):交換、交換の開始、別のスレッド呼び出しexchangeの待機の2つの方法があります.String exchange(V x,long timeout,TimeUnit unit):交換、交換の開始、別のスレッド呼び出しexchangeの待機に使用し、最大待機時間を設定し、待機時間がtimeoutを超えると待機を停止します.

    クローズアップ感謝


    java.util.concurrent.Exchangerの応用例と原理javaの高級---->ThreadのExchangerの使用を浅く分析する