Java同時発注スレッドプールと原子カウント
5784 ワード
ビッグデータ量関連の業務処理ロジックについては,JDKが提供し,マルチスレッドの場合の業務データ処理を一括して解決することが直接的な考え方である.スレッドプールはスレッドを管理する方法を提供し、スレッドの利用率を高めることができ、発注中の原子カウントはマルチスレッドの場合、同期コードを書くことを避けることができます.
ここではまずjdkと発注中のスレッドプールプロセッサThreadPoolExecutorを原子カウントクラスAomicIntegerとカウントダウンロックCountDownLatchのいくつかの一般的なapiで紹介します.
一、ThreadPoolExecutor
ここではまずjdkと発注中のスレッドプールプロセッサThreadPoolExecutorを原子カウントクラスAomicIntegerとカウントダウンロックCountDownLatchのいくつかの一般的なapiで紹介します.
一、ThreadPoolExecutor
JDK 。Executors
1
、Executors.newCachedThreadPool
2
、Executors.newFixedThreadPool
3
、Executors.newSingleThreadExecutor
4
、Executors.newScheduledThreadPool()
。 :
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(ThreadPoolUtil.CORE_POOL_SIZE, ThreadPoolUtil.MAX_POOL_SIZE, ThreadPoolUtil.KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(ThreadPoolUtil.WORK_QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());
ここではexecuteの と びつけてパラメータの を に する.
1、corePoolSize:コアスレッドの .つまり、スレッドプールが する スレッド です.Idle timeがkeepAliveTimeを えると、スレッドインスタンスは し、スレッドプールから します.コアスレッド の きさは、 なスレッド を するために、アプリケーションシーンやCPUの に じて に する がある.
2、maximumPoolSize:スレッドプールメンテナンスの スレッド . キューの 、これは です. キューでは、この で なhandlerを してスレッドを できます.
3、keepAliveTimeおよびunitはidleTimeを するために される.すなわち、スレッドプールは、スレッドが するアイドル および を する.4、
workQueueタスクキュー: な のキュー、SynchronousQueue
: 。 。LinkedBlockingQueue: , corePoolSize , , 。ArrayBlockingQueue: 。 size 。
5、policy:
。
maximumPoolSizeの の ポリシー. のhandlerを したり、スレッドプールで したりすることができます.
AbortPolicy:javaを げ す.util.concurrent.RejectedExecutionException
CallerRunsPolicy: のタスクの を します.execute()メソッドを に り し び します.
DiscardOldestPolicy: いタスクを てる
DiscardPolicy: のタスクを
の からexecuteメソッドの を ることができます. の は にcorePoolSizeが に たされ、タスクがキューに され、 きスレッドがあるとキュー のタスク「offer」が され、キューもいっぱいになるとhandlerに されてタスクが されます.
public void execute(Runnable command) {
//
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// size>core
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
// maxmum , reject
reject(command); // is shutdown or saturated
}
}
、AomicIntegerカウンタ
マルチスレッドの 、カウンタを1つ くのは で くのが です. されたAomicIntegerは、プログラマーをいくつかの コードを くことから します.AomicIntegerで も な はincrementAndGet()です.ソースコードを てください.
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
ここでcompareAndSetメソッドはカウントを り さないことを する です.ここでCAS の が び され、 と が されますが、nextを むときに のスレッドが の を した でも が しますが、この の は くなく、synchronizedやnotifyを するよりも です.incrementAndGetを び すたびにAomicIntegerの が され、 び すのも です.
、CountDownLatch
AomicIntegerと わせてカウントダウンできます. された のスレッドがすべて する を した に を うことを し、 が0であることを った 、 のスレッドを して を します.このアプリケーションでは、すべてのAomicIntegerカウントが した 、つまりすべてのサブスレッドの が した 、 スレッドを して を します.CountDownLatchでよく われる2つの はawait()とcountDown()です. の りawaitメソッドは、スレッドの の をブロックし、サブスレッドの を します.countDown()も です. び すたびにlatchの が1つ し、その は で できます. にはスレッドのsizeを する があります. び し にcountDownができるようにfinallyコードブロックに れることをお めします.カウントを するかどうかはcountDownで、 スレッドは にそこにブロックされます.countDown び しの クラスSyncが するAbstractQueuedSynchronizerのreleaseShared()メソッド.tryReleaseShared(1)がtrueを すとdoReleaseShared()メソッドがさらに されます. を てみましょう.
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
Nodeは をマークするために されます.ヘッドであれば がキャンセルされないことを します.tailが キューの である .CAS によりオペレーティングシステムにおけるwaitStatusを する.