Java concurrencyスレッド池のスレッド池原理(二)_動力ノードJava学院の整理
12671 ワード
スレッドプールの例
スレッドプールを解析する前に、簡単なスレッドプールの例を見てください。
スレッドのソースコードの分析
(一)「スレッド池」を作成する
以下、newFixedThreadPool()でスレッド池の作成過程を紹介します。
1.newFixedThreadPool()
newFixedThreadPool()はExectors.javaで定義されています。ソースは以下の通りです。
newFixedThreadPool()は、ThreadPool Exector()を呼び出すと、Linked Blocking Que()のオブジェクトが渡されますが、Linked Blocking Queは一方向リンクで実現されるブロック列です。オンラインのプログラムでは、このブロック列によって、「スレッド内のタスクの数が許容されているタスクの数を超えた場合、一部のタスクは待ち時間を短縮する」ということを実現します。
Linked BlockingQueの実現の詳細については、読者が「Javaマルチスレッドシリーズ」「JUC集合」08のLinked Blocking Queを参照することができます。
2.ThreadPool Exector()
ThreadPool Exector()はThreadPool Exector.javaで定義されています。ソースは以下の通りです。
corepoolSize、maximPoolSize、unit、keepAlive Time、workQueなどの変数の値は既知であり、それらはすべてnewFixedentreadPool()を通じて伝達されます。次に、threadFactoryとhandlerのオブジェクトを見てみます。
2.1 ThreadFactory
スレッド池のThreadFactoryはスレッド工場であり、スレッド池の作成スレッドはスレッド工場の対象となる。
上記のthreadFactoryオブジェクトは、Exectors.default ThreadFactory()を通じて戻ります。Exectors.javaのdefault ThreadFactoryのソースコードは以下の通りです。
newThreadを通じてスレッドを作成する機能を提供しています。以下、newThreadについて簡単に説明します。newThread作成されたスレッドに対応するジョブはRunnableオブジェクトで、作成されたスレッドはすべて「非守護スレッド」であり、「スレッド優先度はThread.NORM_」である。PRIORITY」。
2.2 RejectedExecution Handler
handlerはThreadPoolExectorにおける拒否戦略の処理の文脈である。拒否戦略とは、スレッドプールにジョブを追加する場合、スレッドプールが当該タスクに対して行った対応策を拒否することです。
スレッドプールはデフォルトではdefaultHandlerポリシー、すなわちAbortPolicyポリシーを採用します。AbortPolicy戦略では、スレッドプールがジョブを拒否すると異常が発生します。
default Handlerの定義は以下の通りです。
prvate static final RejectedExecution Handler=new AbortPolicy();
AbortPolicyのソースコードは以下の通りです。
1.execute()
execute()はThreadPoolExector.javaで定義されています。ソースは以下の通りです。
場合1--「スレッド池におけるジョブ数」<「コアプールサイズ」の場合、すなわちスレッド池の中でcorePoolSize以下のタスク;スレッドを新規作成し、スレッドに追加して実行します。
場合2-「スレッド池のタスク数」=「コアプールサイズ」、「スレッドプールは許容状態」このとき、待ち時間をブロック列に追加します。この場合、再度「スレッド池の状態」を確認しますが、「2回目に読んだスレッドプールの状態」と「1回目に読んだスレッドプールの状態」が異なると、ブロック列からジョブを削除します。
ケース3--上記以外の2つの場合。この場合、スレッドの新規作成を試み、スレッドにこのタスクを追加して実行します。もし実行に失敗したら、reject()を通じてこのタスクを拒否します。
2.addWorkカー()
addWorkカーのソースコードは以下の通りです。
addWorkカーは、タスクをスレッド池に追加し、タスクを起動する役割を果たしています。
coreがtrueであれば、corePoolSizeを限界とし、「スレッド池に既にジョブがある」=corepoolSizeを返す。coreがfalseであれば、maximPoolSizeを限界とし、「スレッドの池にジョブの数がある」=maximPoolSizeを返します。
addWorker()は、まずforループを通して、ctlの状態を更新しようと試みます。ctlは「スレッドプールにおけるジョブ数とスレッドプール状態」を記録しています。
更新が成功したら、tryモジュールでスレッドにタスクを追加し、タスクがあるスレッドを起動します。
addWorkカー()から、スレッドプールはタスクを追加する時、タスクに対応するWorkカーオブジェクトを作成します。一方のWorkderオブジェクトは、Threadオブジェクトを含んでいます。01)Workカーオブジェクトを「スレッドのウォーカーセット」に追加することにより、スレッドプールにジョブを追加することができます。02)Workカーに対応するThreadスレッドを起動することにより、このタスクを実行します。
3.submit()
補足して説明します。submit()も実際にexecute()を呼び出して実現したものです。ソースは以下の通りです。
shutdownのソースコードは以下の通りです。
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。
スレッドプールを解析する前に、簡単なスレッドプールの例を見てください。
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//
ExecutorService pool = Executors.newFixedThreadPool(2);
// Runnable ,Thread Runnable
Thread ta = new MyThread();
Thread tb = new MyThread();
Thread tc = new MyThread();
Thread td = new MyThread();
Thread te = new MyThread();
//
pool.execute(ta);
pool.execute(tb);
pool.execute(tc);
pool.execute(td);
pool.execute(te);
//
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ " is running.");
}
}
実行結果:
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
例えば、スレッド池の作成を含み、スレッドプールにジョブを追加し、スレッドを閉じる3つの主要なステップ。後で、私達はこの3つの方面からThreadPool Exectorを分析します。 スレッドのソースコードの分析
(一)「スレッド池」を作成する
以下、newFixedThreadPool()でスレッド池の作成過程を紹介します。
1.newFixedThreadPool()
newFixedThreadPool()はExectors.javaで定義されています。ソースは以下の通りです。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
説明:newFixedThreadPool(int nThreads)の役割はスレッド池を作成することであり、スレッド池の容量はnThreadsである。newFixedThreadPool()は、ThreadPool Exector()を呼び出すと、Linked Blocking Que()のオブジェクトが渡されますが、Linked Blocking Queは一方向リンクで実現されるブロック列です。オンラインのプログラムでは、このブロック列によって、「スレッド内のタスクの数が許容されているタスクの数を超えた場合、一部のタスクは待ち時間を短縮する」ということを実現します。
Linked BlockingQueの実現の詳細については、読者が「Javaマルチスレッドシリーズ」「JUC集合」08のLinked Blocking Queを参照することができます。
2.ThreadPool Exector()
ThreadPool Exector()はThreadPool Exector.javaで定義されています。ソースは以下の通りです。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
説明:この関数は実際にThreadPoolExectorを呼び出すもう一つのコンストラクタです。関数のソースコードは以下の通りです。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//
this.corePoolSize = corePoolSize;
//
this.maximumPoolSize = maximumPoolSize;
//
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
//
this.threadFactory = threadFactory;
//
this.handler = handler;
}
説明:ThreadPool Exector()の構造関数では初期化動作が行われます。corepoolSize、maximPoolSize、unit、keepAlive Time、workQueなどの変数の値は既知であり、それらはすべてnewFixedentreadPool()を通じて伝達されます。次に、threadFactoryとhandlerのオブジェクトを見てみます。
2.1 ThreadFactory
スレッド池のThreadFactoryはスレッド工場であり、スレッド池の作成スレッドはスレッド工場の対象となる。
上記のthreadFactoryオブジェクトは、Exectors.default ThreadFactory()を通じて戻ります。Exectors.javaのdefault ThreadFactoryのソースコードは以下の通りです。
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
default ThreadFactory()はDefault ThreadFactoryオブジェクトに戻ります。Exectors.javaのDefault ThreadFactoryのソースコードは以下の通りです。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// API。
public Thread newThread(Runnable r) {
// Runnable r
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// “ ”
if (t.isDaemon())
t.setDaemon(false);
// “Thread.NORM_PRIORITY”
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
説明:ThreadFactoryの役割はスレッドを作成する機能を提供するスレッド工場です。newThreadを通じてスレッドを作成する機能を提供しています。以下、newThreadについて簡単に説明します。newThread作成されたスレッドに対応するジョブはRunnableオブジェクトで、作成されたスレッドはすべて「非守護スレッド」であり、「スレッド優先度はThread.NORM_」である。PRIORITY」。
2.2 RejectedExecution Handler
handlerはThreadPoolExectorにおける拒否戦略の処理の文脈である。拒否戦略とは、スレッドプールにジョブを追加する場合、スレッドプールが当該タスクに対して行った対応策を拒否することです。
スレッドプールはデフォルトではdefaultHandlerポリシー、すなわちAbortPolicyポリシーを採用します。AbortPolicy戦略では、スレッドプールがジョブを拒否すると異常が発生します。
default Handlerの定義は以下の通りです。
prvate static final RejectedExecution Handler=new AbortPolicy();
AbortPolicyのソースコードは以下の通りです。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
//
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
(二)タスクを「スレッド池」に追加します。1.execute()
execute()はThreadPoolExector.javaで定義されています。ソースは以下の通りです。
public void execute(Runnable command) {
// null, 。
if (command == null)
throw new NullPointerException();
// ctl int 。 int " " " "
int c = ctl.get();
// < " " , corePoolSize 。
// addWorker(command, true) , (command) ; , 。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// >= " " ,
// ," " , 。
if (isRunning(c) && workQueue.offer(command)) {
// “ ”, , ; reject() 。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// , " " 0, addWorker(null, false) , null。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// addWorker(command, false) , (command) ; , 。
// addWorker(command, false) , reject() 。
else if (!addWorker(command, false))
reject(command);
}
説明:execute()の役割は、スレッドにタスクを追加して実行することです。3つの状況に分けて処理します。場合1--「スレッド池におけるジョブ数」<「コアプールサイズ」の場合、すなわちスレッド池の中でcorePoolSize以下のタスク;スレッドを新規作成し、スレッドに追加して実行します。
場合2-「スレッド池のタスク数」=「コアプールサイズ」、「スレッドプールは許容状態」このとき、待ち時間をブロック列に追加します。この場合、再度「スレッド池の状態」を確認しますが、「2回目に読んだスレッドプールの状態」と「1回目に読んだスレッドプールの状態」が異なると、ブロック列からジョブを削除します。
ケース3--上記以外の2つの場合。この場合、スレッドの新規作成を試み、スレッドにこのタスクを追加して実行します。もし実行に失敗したら、reject()を通じてこのタスクを拒否します。
2.addWorkカー()
addWorkカーのソースコードは以下の通りです。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// " " , ctl。
for (;;) {
// ctl int 。 int " " " "
int c = ctl.get();
// 。
int rs = runStateOf(c);
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 。
int wc = workerCountOf(c);
// " " , false。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS c +1。 , 。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// " ", , retry 。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
// , 。
try {
final ReentrantLock mainLock = this.mainLock;
// Worker, firstTask Worker 。
w = new Worker(firstTask);
// Worker 。
final Thread t = w.thread;
if (t != null) {
//
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// " "
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Worker (w) " Worker (workers)"
workers.add(w);
// largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//
mainLock.unlock();
}
// " " , 。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
// 。
return workerStarted;
}
説明:addWorkカーは、タスクをスレッド池に追加し、タスクを起動する役割を果たしています。
coreがtrueであれば、corePoolSizeを限界とし、「スレッド池に既にジョブがある」=corepoolSizeを返す。coreがfalseであれば、maximPoolSizeを限界とし、「スレッドの池にジョブの数がある」=maximPoolSizeを返します。
addWorker()は、まずforループを通して、ctlの状態を更新しようと試みます。ctlは「スレッドプールにおけるジョブ数とスレッドプール状態」を記録しています。
更新が成功したら、tryモジュールでスレッドにタスクを追加し、タスクがあるスレッドを起動します。
addWorkカー()から、スレッドプールはタスクを追加する時、タスクに対応するWorkカーオブジェクトを作成します。一方のWorkderオブジェクトは、Threadオブジェクトを含んでいます。01)Workカーオブジェクトを「スレッドのウォーカーセット」に追加することにより、スレッドプールにジョブを追加することができます。02)Workカーに対応するThreadスレッドを起動することにより、このタスクを実行します。
3.submit()
補足して説明します。submit()も実際にexecute()を呼び出して実現したものです。ソースは以下の通りです。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
(三)「スレッド池」を閉じるshutdownのソースコードは以下の通りです。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//
mainLock.lock();
try {
// “ ” 。
checkShutdownAccess();
// 。
advanceRunState(SHUTDOWN);
// 。
interruptIdleWorkers();
// , ThreadPoolExecutor 。
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//
mainLock.unlock();
}
//
tryTerminate();
}
説明:shutdown()の役割はスレッドを閉じることです。以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。