JAva 1.7集合ソースコード鑑賞シリーズ:スレッドプール原理
Executorsでは、次の方法がサポートされています.
共通の構成文字列が設定されているExecutorServiceを作成して返します.共通の構成文字列が設定されているScheduledExecutorServiceを作成して返します.実装固有のメソッドにアクセスできないように再構成を無効にする「パッケージ」ExecutorServiceメソッドを作成して戻します.新しく作成したスレッドを既知の状態に設定できるThreadFactoryを作成して返します.非閉パッケージ形式のCallableのメソッドを作成して返します.これにより、Callableを必要とする実行メソッドに使用できます.上記はExecutorsクラスのapiでの説明ですが、Executorsは実際には工場クラスであり、さまざまなパラメータのスレッドプールを提供して使用されていることがよくわかります.
相対的にnewCachedThreadPoolとnewFixedThreadPoolが多く使われています.これは使用方法です.トランスポートゲートは次にソースコードを見てみましょう.1.なぜこのように使うのですか.2.不適切な使用はどのような問題を引き起こしますか?3.彼らの実現にはどんな違いがありますか.
ThreadPoolExecutorの説明
ThreadPoolExecutorは、AbstractExecutorServiceを継承し、ExecutorServiceを実装するスレッドプールの実装クラスです.fixedとcacheスレッドプールは、デフォルトパラメータが異なる点で実装されます.両者のコードを見てみましょう.
インスタンス化ThreadPoolExecutorクラスの3番目のパラメータは異なり、1つは60で、1つは0です.これは彼らの最大の違いだ.次に、ThreadPoolExecutorクラスのパラメータについて説明します.
ThreadPoolExecutorのパラメータ
コアおよび最大プールサイズ新しいタスクがメソッドexecute(java.lang.Runnable)でコミットされると、実行されるスレッドがcorePoolSizeより少ない場合、他の補助スレッドがアイドルであっても要求を処理するために新しいスレッドが作成されます.実行中のスレッドがcorePoolSizeより多く、maximumPoolSizeより少ない場合は、キューがいっぱいになったときにのみ新しいスレッドが作成されます.corePoolSizeがmaximumPoolSizeと同じに設定されている場合、スレッドプールは固定サイズで作成されます.maximumPoolSizeを基本的な無境界値(Integer.MAX_VALUEなど)に設定すると、プールは任意の数の同時タスクに適応できます.ほとんどの場合、コアおよび最大プールサイズは構造に基づいてのみ設定されますが、setCorePoolSize(int)およびsetMaximumPoolSize(int)を使用して動的に変更することもできます.
新しいスレッドを作成ThreadFactoryを使用して新しいスレッドを作成します.他に説明がない場合は、同じThreadGroupでExecutors.defaultThreadFactory()を使用してスレッドを作成し、これらのスレッドは同じNORM_を有します.PRIORITY優先度とデーモン以外のプロセスステータス.異なるThreadFactoryを指定することで、スレッドの名前、スレッドグループ、優先度、デーモンのステータスなどを変更できます.newThreadからnullが返されたときにThreadFactoryがスレッドを作成できなかった場合、実行プログラムは実行を続行しますが、タスクは実行できません.
デフォルトでは、コアスレッドが最初に新しいタスクが到着したときに作成および起動された場合でも、メソッドprestartCoreThread()またはprestartAllCoreThreads()を使用して動的に書き換えることができます.空でないキューを持つプールを構築する場合は、スレッドを事前に起動する必要があります.
アクティブな時間を維持プールにcorePoolSizeよりも多くのスレッドがある場合、これらの複数のスレッドは、アイドル時間がkeepAliveTimeを超えると終了します(getKeepAliveTime(java.util.concurrent.TimeUnitを参照).これにより、プールが非アクティブな場合にリソース消費を削減する方法が提供されます.プールが後でアクティブになった場合、新しいスレッドを作成できます.このパラメータは、setKeepAliveTime(long,java.util.concurrent.TimeUnit)を使用して動的に変更することもできます.Long.MAX_の使用VALUE TimeUnit.NANOSECONDSの値は、オフになる前に、以前の終了状態から有効にアイドルスレッドを無効にします.デフォルトでは、corePoolSizeThreadsよりも多くのスレッドがある場合にのみ、アクティブなポリシーを適用します.ただし、keepAliveTime値が0でない限り、allowCoreThreadTimeOut(boolean)メソッドはこのタイムアウトポリシーをコアスレッドに適用することもできます.
キュー内のすべてのBlockingQueueは、コミットされたタスクを転送および保持するために使用できます.このキューを使用して、プール・サイズと対話できます.実行中のスレッドがcorePoolSizeより少ない場合、Executorは常にキューなしで新しいスレッドを追加することを優先します.実行中のスレッドがcorePoolSize以上である場合、Executorは常に新しいスレッドを追加せずにキューにリクエストを追加することを優先します.リクエストをキューに追加できない場合は、maximumPoolSizeを超えない限り、新しいスレッドを作成します.この場合、タスクは拒否されます.
フック(hook)メソッドこのようなprotected書き換え可能なbeforeExecute(java.lang.Thread,java.lang.Runnable)メソッドとafterExecute(java.lang.Runnable,java.lang.Throwable)メソッドは、それぞれ各タスクを実行する前と後に呼び出されます.実行環境を操作するために使用できます.たとえば、ThreadLocalを再初期化したり、統計を収集したり、ログ・エントリを追加したりします.さらに、メソッドterminated()を書き換えて、Executorが完全に終了した後に完了する必要があるすべての特殊な処理を実行することもできます.フック(hook)またはコールバックメソッドが異常を放出すると、内部補助スレッドは順次失敗し、突然終了します.beforeExecute(Thread t,Runnable r)が、所与のスレッドの所与のRunnableを実行する前に呼び出す方法.t-タスクrを実行するスレッド.r-実行されるタスク.
afterExecute(Runnable r,Throwable t)は、所与のRunnableの実行の完了に基づいて呼び出される方法である.r-完了したrunnableスレッド.t-終了を招く異常;実行が正常に終了した場合はnullです.
terminated()Executorが終了したときに呼び出されるメソッド.
以上のフックメソッドは、サブクラスでカスタマイズされた操作を実行しないことを実現します.
終了プログラムANDが参照しなくなったプールには、残りのスレッドが自動的にshutdownされます.ユーザーがshutdown()を呼び出すのを忘れた場合でも、参照を取り消すプールの回収を確実にするには、使用されていないスレッドの最終終了をスケジュールする必要があります.アクティブな時間を適切に維持し、0コアスレッドの下境界を使用するか、allowCoreThreadTimeOut(boolean)を設定する必要があります.
共通の構成文字列が設定されているExecutorServiceを作成して返します.共通の構成文字列が設定されているScheduledExecutorServiceを作成して返します.実装固有のメソッドにアクセスできないように再構成を無効にする「パッケージ」ExecutorServiceメソッドを作成して戻します.新しく作成したスレッドを既知の状態に設定できるThreadFactoryを作成して返します.非閉パッケージ形式のCallableのメソッドを作成して返します.これにより、Callableを必要とする実行メソッドに使用できます.上記はExecutorsクラスのapiでの説明ですが、Executorsは実際には工場クラスであり、さまざまなパラメータのスレッドプールを提供して使用されていることがよくわかります.
相対的にnewCachedThreadPoolとnewFixedThreadPoolが多く使われています.これは使用方法です.トランスポートゲートは次にソースコードを見てみましょう.1.なぜこのように使うのですか.2.不適切な使用はどのような問題を引き起こしますか?3.彼らの実現にはどんな違いがありますか.
ThreadPoolExecutorの説明
ThreadPoolExecutorは、AbstractExecutorServiceを継承し、ExecutorServiceを実装するスレッドプールの実装クラスです.fixedとcacheスレッドプールは、デフォルトパラメータが異なる点で実装されます.両者のコードを見てみましょう.
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue(),
threadFactory);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory);
}
インスタンス化ThreadPoolExecutorクラスの3番目のパラメータは異なり、1つは60で、1つは0です.これは彼らの最大の違いだ.次に、ThreadPoolExecutorクラスのパラメータについて説明します.
ThreadPoolExecutorのパラメータ
//ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
corePoolSize - , 。
maximumPoolSize - 。
keepAliveTime - , 。
unit - keepAliveTime 。
workQueue - 。 execute Runnable 。
handler - 。
*/
コアおよび最大プールサイズ新しいタスクがメソッドexecute(java.lang.Runnable)でコミットされると、実行されるスレッドがcorePoolSizeより少ない場合、他の補助スレッドがアイドルであっても要求を処理するために新しいスレッドが作成されます.実行中のスレッドがcorePoolSizeより多く、maximumPoolSizeより少ない場合は、キューがいっぱいになったときにのみ新しいスレッドが作成されます.corePoolSizeがmaximumPoolSizeと同じに設定されている場合、スレッドプールは固定サイズで作成されます.maximumPoolSizeを基本的な無境界値(Integer.MAX_VALUEなど)に設定すると、プールは任意の数の同時タスクに適応できます.ほとんどの場合、コアおよび最大プールサイズは構造に基づいてのみ設定されますが、setCorePoolSize(int)およびsetMaximumPoolSize(int)を使用して動的に変更することもできます.
新しいスレッドを作成ThreadFactoryを使用して新しいスレッドを作成します.他に説明がない場合は、同じThreadGroupでExecutors.defaultThreadFactory()を使用してスレッドを作成し、これらのスレッドは同じNORM_を有します.PRIORITY優先度とデーモン以外のプロセスステータス.異なるThreadFactoryを指定することで、スレッドの名前、スレッドグループ、優先度、デーモンのステータスなどを変更できます.newThreadからnullが返されたときにThreadFactoryがスレッドを作成できなかった場合、実行プログラムは実行を続行しますが、タスクは実行できません.
デフォルトでは、コアスレッドが最初に新しいタスクが到着したときに作成および起動された場合でも、メソッドprestartCoreThread()またはprestartAllCoreThreads()を使用して動的に書き換えることができます.空でないキューを持つプールを構築する場合は、スレッドを事前に起動する必要があります.
アクティブな時間を維持プールにcorePoolSizeよりも多くのスレッドがある場合、これらの複数のスレッドは、アイドル時間がkeepAliveTimeを超えると終了します(getKeepAliveTime(java.util.concurrent.TimeUnitを参照).これにより、プールが非アクティブな場合にリソース消費を削減する方法が提供されます.プールが後でアクティブになった場合、新しいスレッドを作成できます.このパラメータは、setKeepAliveTime(long,java.util.concurrent.TimeUnit)を使用して動的に変更することもできます.Long.MAX_の使用VALUE TimeUnit.NANOSECONDSの値は、オフになる前に、以前の終了状態から有効にアイドルスレッドを無効にします.デフォルトでは、corePoolSizeThreadsよりも多くのスレッドがある場合にのみ、アクティブなポリシーを適用します.ただし、keepAliveTime値が0でない限り、allowCoreThreadTimeOut(boolean)メソッドはこのタイムアウトポリシーをコアスレッドに適用することもできます.
キュー内のすべてのBlockingQueueは、コミットされたタスクを転送および保持するために使用できます.このキューを使用して、プール・サイズと対話できます.実行中のスレッドがcorePoolSizeより少ない場合、Executorは常にキューなしで新しいスレッドを追加することを優先します.実行中のスレッドがcorePoolSize以上である場合、Executorは常に新しいスレッドを追加せずにキューにリクエストを追加することを優先します.リクエストをキューに追加できない場合は、maximumPoolSizeを超えない限り、新しいスレッドを作成します.この場合、タスクは拒否されます.
フック(hook)メソッドこのようなprotected書き換え可能なbeforeExecute(java.lang.Thread,java.lang.Runnable)メソッドとafterExecute(java.lang.Runnable,java.lang.Throwable)メソッドは、それぞれ各タスクを実行する前と後に呼び出されます.実行環境を操作するために使用できます.たとえば、ThreadLocalを再初期化したり、統計を収集したり、ログ・エントリを追加したりします.さらに、メソッドterminated()を書き換えて、Executorが完全に終了した後に完了する必要があるすべての特殊な処理を実行することもできます.フック(hook)またはコールバックメソッドが異常を放出すると、内部補助スレッドは順次失敗し、突然終了します.beforeExecute(Thread t,Runnable r)が、所与のスレッドの所与のRunnableを実行する前に呼び出す方法.t-タスクrを実行するスレッド.r-実行されるタスク.
afterExecute(Runnable r,Throwable t)は、所与のRunnableの実行の完了に基づいて呼び出される方法である.r-完了したrunnableスレッド.t-終了を招く異常;実行が正常に終了した場合はnullです.
terminated()Executorが終了したときに呼び出されるメソッド.
以上のフックメソッドは、サブクラスでカスタマイズされた操作を実行しないことを実現します.
// ,Workder
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//
beforeExecute(wt, task);
Throwable thrown = null;
try {
// run
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// terminated(), terminated
processWorkerExit(w, completedAbruptly);
}
}
// , null,
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
終了プログラムANDが参照しなくなったプールには、残りのスレッドが自動的にshutdownされます.ユーザーがshutdown()を呼び出すのを忘れた場合でも、参照を取り消すプールの回収を確実にするには、使用されていないスレッドの最終終了をスケジュールする必要があります.アクティブな時間を適切に維持し、0コアスレッドの下境界を使用するか、allowCoreThreadTimeOut(boolean)を設定する必要があります.
//
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
// , ,
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}