Java同時発注スレッドプールと原子カウント


ビッグデータ量関連の業務処理ロジックについては,JDKが提供し,マルチスレッドの場合の業務データ処理を一括して解決することが直接的な考え方である.スレッドプールはスレッドを管理する方法を提供し、スレッドの利用率を高めることができ、発注中の原子カウントはマルチスレッドの場合、同期コードを書くことを避けることができます.
ここではまずjdkと発注中のスレッドプールプロセッサThreadPoolExecutorを原子カウントクラスAomicIntegerとカウントダウンロックCountDownLatchのいくつかの一般的なapiで紹介します.
一、ThreadPoolExecutorJDK 。Executors 1 Executors.newCachedThreadPool 2 Executors.newFixedThreadPool

3 Executors.newSingleThreadExecutor 4 、Executors.newScheduledThreadPool()
       。 :

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(ThreadPoolUtil.CORE_POOL_SIZE, ThreadPoolUtil.MAX_POOL_SIZE, ThreadPoolUtil.KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(ThreadPoolUtil.WORK_QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());

 
ここではexecuteの と びつけてパラメータの を に する.
1、corePoolSize:コアスレッドの .つまり、スレッドプールが する スレッド です.Idle timeがkeepAliveTimeを えると、スレッドインスタンスは し、スレッドプールから します.コアスレッド の きさは、 なスレッド を するために、アプリケーションシーンやCPUの に じて に する がある.
2、maximumPoolSize:スレッドプールメンテナンスの スレッド . キューの 、これは です. キューでは、この で なhandlerを してスレッドを できます.
3、keepAliveTimeおよびunitはidleTimeを するために される.すなわち、スレッドプールは、スレッドが するアイドル および を する.4、 workQueueタスクキュー: な のキュー、SynchronousQueue : 。 。LinkedBlockingQueue: , corePoolSize , , 。ArrayBlockingQueue: 。 size 。5、policy: maximumPoolSizeの の ポリシー. のhandlerを したり、スレッドプールで したりすることができます.
AbortPolicy:javaを げ す.util.concurrent.RejectedExecutionException
CallerRunsPolicy: のタスクの を します.execute()メソッドを に り し び します.
DiscardOldestPolicy: いタスクを てる
DiscardPolicy: のタスクを
 
の からexecuteメソッドの を ることができます. の は にcorePoolSizeが に たされ、タスクがキューに され、 きスレッドがあるとキュー のタスク「offer」が され、キューもいっぱいになるとhandlerに されてタスクが されます.
 
public void execute(Runnable command) {
//          
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//               size>core     
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
//    maxmum   ,     reject  
                reject(command); // is shutdown or saturated
        }
    }

、AomicIntegerカウンタ
マルチスレッドの 、カウンタを1つ くのは で くのが です. されたAomicIntegerは、プログラマーをいくつかの コードを くことから します.AomicIntegerで も な はincrementAndGet()です.ソースコードを てください.
 
  /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

ここでcompareAndSetメソッドはカウントを り さないことを する です.ここでCAS の が び され、 と が されますが、nextを むときに のスレッドが の を した でも が しますが、この の は くなく、synchronizedやnotifyを するよりも です.incrementAndGetを び すたびにAomicIntegerの が され、 び すのも です.
、CountDownLatch
AomicIntegerと わせてカウントダウンできます. された のスレッドがすべて する を した に を うことを し、 が0であることを った 、 のスレッドを して を します.このアプリケーションでは、すべてのAomicIntegerカウントが した 、つまりすべてのサブスレッドの が した 、 スレッドを して を します.CountDownLatchでよく われる2つの はawait()とcountDown()です. の りawaitメソッドは、スレッドの の をブロックし、サブスレッドの を します.countDown()も です. び すたびにlatchの が1つ し、その は で できます. にはスレッドのsizeを する があります. び し にcountDownができるようにfinallyコードブロックに れることをお めします.カウントを するかどうかはcountDownで、 スレッドは にそこにブロックされます.countDown び しの クラスSyncが するAbstractQueuedSynchronizerのreleaseShared()メソッド.tryReleaseShared(1)がtrueを すとdoReleaseShared()メソッドがさらに されます. を てみましょう.
 
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }

Nodeは をマークするために されます.ヘッドであれば がキャンセルされないことを します.tailが キューの である .CAS によりオペレーティングシステムにおけるwaitStatusを する.