Java中の同時工具類

8672 ワード

CountDownLatchマルチスレッドの完了を待つCountDownLatchは、他のスレッドの動作が完了するのを待つ1つまたは複数のスレッドを許可する。
例えば、excelを解析し、一つのスレッドがsheetページを解析し、全てのスレッドが解析された後、解析完了を提示する。ジョインを使用して実装することもでき、CountDownLatchを使用することもできる。join joinを使用して、現在の実行スレッドがjoinスレッドの実行終了を待つようにする。
package com.thread;

public class JoinCountDownLatchTest {
    public static void main(String[] args) {
        Thread parser1 = new Thread(new Runnable(){
            @Override
            public void run() {
                
            }
        });
        Thread parser2 = new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    Thread.sleep(1000*2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("parser2 finished!");
            }
        });
        parser1.start();
        parser2.start();
        try {
            parser1.join();
            parser2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("All parser finished!");
    }
}
CountDownLatch CountDownLatchを使用して、カウンタとしてNが入ってきます。count Downを実行するたびに、Nは1を減らします。CountDownLatchのawait方法は、Nがゼロになるまで現在のスレッドをブロックします。count Downは、スレッド内のN個のステップまたはN個のスレッドであってもよい。
スレッドがcountDownメソッドを呼び出し、スレッドがawaitメソッドを呼び出す。
package com.thread;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
    public static CountDownLatch cdl = new CountDownLatch(2);
    
    public static void main(String[] args) {
        new Thread(new Runnable(){
            @Override
            public void run() {
                System.out.println(1);
                cdl.countDown();
                System.out.println(2);
                cdl.countDown();
            }
        }).start();
        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(3);
    }
}
CyclicBarrier同期バリア
一つのスレッドを一つの障壁(または同期ポイント)に到達させるとブロックされ、最後のスレッドが障壁に到達するまで障壁が開かれ、すべてのスレッドが下に進みます。
package com.thread;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    
    public static void main(String[] args) throws Exception {
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println(2);
                    c.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println(1);
        c.await();
        System.out.println(3);
    }
}
/*
1
2
3
*/
CyclicBarrierアップグレード版
高級な構成方法CyclicBarrier(int parties, Runnable barrierAction):全スレッドが同期ポイントに到達した後、優先的にbarrierActionを実行し、このスレッドが実行されるのを待って、awaitの後の方法を実行し続ける。
package com.thread;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2, new A());
    
    public static void main(String[] args) throws Exception {
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println(2);
                    c.await();
                    System.out.println(2.1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println(1);
        c.await();
        System.out.println(1.1);
    }
    
    static class A implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(3);
        }
    }
}
/*
1
2
3
2.1
1.1
*/
CyclicBarrierアプリケーションシーンCyclicBarrierCountDownLatchの違いCountDownBatchのカウンタは一回しか使えません。CyclicBarrierのカウンタはreset()の方法でリセットできます。したがって、CyclicBarrierは、より複雑な機能を実現することができる。例えば、計算エラーを処理して、カウンタをリセットして、スレッドをもう一度実行させることができます。CyclicBarrierの他の方法:
  • getNumberWaiting:閉塞スレッド数を取得する。
  • isBroken()は、ブロックされたスレッドが中断されているかどうかを調べるために使用される。
  • package com.thread;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest2 {
        static CyclicBarrier c = new CyclicBarrier(2);
        
        public static void main(String[] args) {
            Thread t1 = new Thread(new Runnable(){
                @Override
                public void run() {
                    try {
                        c.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }       
            });
            t1.start();
            t1.interrupt();
            try {
                c.await();
            } catch (Exception e) {//           ???   
                System.out.println(c.isBroken());
                e.printStackTrace();
            }
        }
    }
    
    併発スレッド数を制御するSemaphoreSemaphore(信号量)は、特定のリソースに同時にアクセスするスレッドの数を制御するために使用され、各スレッドを調整することによって、共通リソースの適切な使用を保証する。
  • int availablePermits:この信号量の中で現在利用可能な許可証の数を返します。
  • int getQueueLength():ライセンス取得待ちスレッド数を返す。
  • boolean hasQueueThreads:スレッドがあるかどうかは、取得許可証を待っています。
  • void reducePermits(int reduction):reductionの許可を減らす。
  • Collection getQueuedThreads():ライセンス取得待ちスレッドセット
  • をすべて返す。
    以下のスレッドは同時に30個のスレッドを開いて、全部runメソッド内に入っていますが、同時にs.acquire();***s.release();間を実行するスレッドは10個しかありません。
    package com.thread;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
        private static final int THREAD_COUNT = 30;
        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        private static Semaphore s = new Semaphore(10);
        
        public static void main(String[] args) {
            for(int i=0; i
    スレッド間交換データExchangerExchangerは、スレッド間連携のためのツールクラスであり、スレッド間のデータ交換に用いられる。これは同期ポイントを提供しており、この同期ポイントでは、2つのスレッドが互いのデータを交換することができます。第1のスレッドは、まずexchange()方法を実行し、第2のスレッドもexchange()方法を実行し、2つのスレッドが同時に同期ポイントに到達すると、これらの2つのスレッドはデータを交換することができる。
    一つのスレッドがexchange()方法を実行していない場合、ずっと待ち続けます。特別な状況が心配なら、exchange(V v,longtimeout, TimeUnit unit)を使って最大待ち時間を設定してもいいです。
    package com.thread;
    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ExchangerTest {
        private static final Exchanger exgr = new Exchanger();
        private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
        
        public static void main(String[] args) {
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    String a = "    A";
                    try {
                        String b = exgr.exchange(a);
                        System.out.println("a       .a=" + a+";b="+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    String b = "    B";
                    try {
                        Thread.sleep(3000);
                        String a = exgr.exchange(b);//  b     a   
                        System.out.println("b       .a=" + a+";b="+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    /*
    b       .a=    A;b=    B
    a       .a=    A;b=    B
    */