Java中の同時工具類
8672 ワード
CountDownLatch
マルチスレッドの完了を待つCountDownLatch
は、他のスレッドの動作が完了するのを待つ1つまたは複数のスレッドを許可する。例えば、excelを解析し、一つのスレッドがsheetページを解析し、全てのスレッドが解析された後、解析完了を提示する。ジョインを使用して実装することもでき、
CountDownLatch
を使用することもできる。join
joinを使用して、現在の実行スレッドがjoinスレッドの実行終了を待つようにする。package com.thread;
public class JoinCountDownLatchTest {
public static void main(String[] args) {
Thread parser1 = new Thread(new Runnable(){
@Override
public void run() {
}
});
Thread parser2 = new Thread(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(1000*2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("parser2 finished!");
}
});
parser1.start();
parser2.start();
try {
parser1.join();
parser2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("All parser finished!");
}
}
CountDownLatch
CountDownLatch
を使用して、カウンタとしてNが入ってきます。count Downを実行するたびに、Nは1を減らします。CountDownLatch
のawait方法は、Nがゼロになるまで現在のスレッドをブロックします。count Downは、スレッド内のN個のステップまたはN個のスレッドであってもよい。スレッドが
countDown
メソッドを呼び出し、スレッドがawait
メソッドを呼び出す。package com.thread;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static CountDownLatch cdl = new CountDownLatch(2);
public static void main(String[] args) {
new Thread(new Runnable(){
@Override
public void run() {
System.out.println(1);
cdl.countDown();
System.out.println(2);
cdl.countDown();
}
}).start();
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(3);
}
}
CyclicBarrier
同期バリア一つのスレッドを一つの障壁(または同期ポイント)に到達させるとブロックされ、最後のスレッドが障壁に到達するまで障壁が開かれ、すべてのスレッドが下に進みます。
package com.thread;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) throws Exception {
new Thread(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println(2);
c.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println(1);
c.await();
System.out.println(3);
}
}
/*
1
2
3
*/
*CyclicBarrier
アップグレード版高級な構成方法
CyclicBarrier(int parties, Runnable barrierAction)
:全スレッドが同期ポイントに到達した後、優先的にbarrierAction
を実行し、このスレッドが実行されるのを待って、await
の後の方法を実行し続ける。package com.thread;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) throws Exception {
new Thread(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println(2);
c.await();
System.out.println(2.1);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println(1);
c.await();
System.out.println(1.1);
}
static class A implements Runnable{
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(3);
}
}
}
/*
1
2
3
2.1
1.1
*/
CyclicBarrier
アプリケーションシーンCyclicBarrier
とCountDownLatch
の違いCountDownBatch
のカウンタは一回しか使えません。CyclicBarrier
のカウンタはreset()
の方法でリセットできます。したがって、CyclicBarrier
は、より複雑な機能を実現することができる。例えば、計算エラーを処理して、カウンタをリセットして、スレッドをもう一度実行させることができます。CyclicBarrier
の他の方法:getNumberWaiting
:閉塞スレッド数を取得する。isBroken()
は、ブロックされたスレッドが中断されているかどうかを調べるために使用される。package com.thread;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
t1.start();
t1.interrupt();
try {
c.await();
} catch (Exception e) {// ???
System.out.println(c.isBroken());
e.printStackTrace();
}
}
}
併発スレッド数を制御するSemaphore
Semaphore
(信号量)は、特定のリソースに同時にアクセスするスレッドの数を制御するために使用され、各スレッドを調整することによって、共通リソースの適切な使用を保証する。int availablePermits
:この信号量の中で現在利用可能な許可証の数を返します。int getQueueLength()
:ライセンス取得待ちスレッド数を返す。boolean hasQueueThreads
:スレッドがあるかどうかは、取得許可証を待っています。void reducePermits(int reduction)
:reductionの許可を減らす。Collection getQueuedThreads()
:ライセンス取得待ちスレッドセット以下のスレッドは同時に30個のスレッドを開いて、全部runメソッド内に入っていますが、同時に
s.acquire();***s.release();
間を実行するスレッドは10個しかありません。package com.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for(int i=0; i
スレッド間交換データExchanger
Exchanger
は、スレッド間連携のためのツールクラスであり、スレッド間のデータ交換に用いられる。これは同期ポイントを提供しており、この同期ポイントでは、2つのスレッドが互いのデータを交換することができます。第1のスレッドは、まずexchange()
方法を実行し、第2のスレッドもexchange()
方法を実行し、2つのスレッドが同時に同期ポイントに到達すると、これらの2つのスレッドはデータを交換することができる。一つのスレッドが
exchange()
方法を実行していない場合、ずっと待ち続けます。特別な状況が心配なら、exchange(V v,longtimeout, TimeUnit unit)
を使って最大待ち時間を設定してもいいです。package com.thread;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
private static final Exchanger exgr = new Exchanger();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable(){
@Override
public void run() {
String a = " A";
try {
String b = exgr.exchange(a);
System.out.println("a .a=" + a+";b="+b);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable(){
@Override
public void run() {
String b = " B";
try {
Thread.sleep(3000);
String a = exgr.exchange(b);// b a
System.out.println("b .a=" + a+";b="+b);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
/*
b .a= A;b= B
a .a= A;b= B
*/