スレッドプールThreadPoolExecutor実装原理
10840 ワード
1.ThreadPoolExecutorソースコード解析
ctlはスレッドプールの現在の状態とスレッド数を表す.32ビット、上位3ビットがステータス、下位29ビットがスレッド最大のスレッド数
構築方法:デフォルトhandlerはAbortPolicy、ThreadFactoryデフォルトはDefaultThreadFactory
Runnableタスクをスレッドプールのキーコードに投げ込む
addWorkerは、スレッドプール内部のworkersプロパティにworkerを追加する、現在追加されているworkerをスピンさせ、firtstTaskを実行する.
2.Worker
runWorkerメソッドとは、firstTaskタスクを実行する後、スピンし、実行するタスクの実行をキューから取得するプロセスである.これまでrunWorkerがいつ実行するかを考えていたが、addWorkerのコードの注釈に戻ると、addメソッドでworkerを作成すると同時にworkerのrunメソッドを実行するようになったことがわかる.スピンが発生した.
だからまとめてみましょう.Runnableがスレッドに投げ込まれると:
現在のスレッドプール数runは立ち上がり、スピンし、実行可能なタスク実行をキューから絶えず取得する.
現在のスレッドプール数>=corePoolSize、ブロックキューにタスクを追加するキューがいっぱいになったら、最大スレッドプール数maxPoolSize まで新しいスレッド(新しいWorker)の作成を続行します.
拒否ポリシーを実行します.
上はRunnableのオンラインスレッドプールでの実行であり、Callableメソッドである.submitを使用する場合.違います.コードを見続け、使い方を見てみましょう.
どうやって実現したのでしょうか?submitがcallableをコミットと、FutureTaskインスタンスがパッケージされます.FutureTaskはRunnableFuture、Runnable、Futureを実現した.簡単に言えば、それは結果を得ることができるRunnableです.
submitはCallableタスクの後、executeメソッドを実行する、以前に実行したRunnableステップと何の差もないが、現在のタスクを実行するとき、FutureTaskは独自のrunメソッドを実行する.コードを見てみましょう
3.FutureTask
重要な属性
runメソッドのソースコードは以下の通りである.runnableインスタンスの方法とは異なり、タスク実行後の結果が保存される.
getメソッドのソースコードは、現在のtask状態が未完了、すなわちresultが計算されていない場合、getが来る.スレッドは、ステータスが完了するまでブロックされ、resultに戻る.
getメソッドの中のawaitDoneはスレッドをブロックし、喚起する必要があるスレッドをキューrunメソッドの中のset喚起スレッドに直列にし、順番にキューの中の待機スレッドを喚起する.ノード=nullをgcにする.この2つの方法のコードは自分で見ることができる.
ctlはスレッドプールの現在の状態とスレッド数を表す.32ビット、上位3ビットがステータス、下位29ビットがスレッド最大のスレッド数
private static int runStateOf(int c) { return c & ~CAPACITY; } //
private static int workerCountOf(int c) { return c & CAPACITY; } //
private static int ctlOf(int rs, int wc) { return rs | wc; } // ctl
private final HashSet workers = new HashSet(); //
private final ReentrantLock mainLock = new ReentrantLock(); // add to workes
構築方法:デフォルトhandlerはAbortPolicy、ThreadFactoryデフォルトはDefaultThreadFactory
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Runnableタスクをスレッドプールのキーコードに投げ込む
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // corePoolSize, , .
if (addWorker(command, true))
return;
c = ctl.get(); // ,
}
if (isRunning(c) && workQueue.offer(command)) { //
int recheck = ctl.get(); // ,
if (! isRunning(recheck) && remove(command)) // ,
reject(command); //
else if (workerCountOf(recheck) == 0) //corePoolSize=0
addWorker(null, false);
}
else if (!addWorker(command, false)) // ,
reject(command); //
}
addWorkerは、スレッドプール内部のworkersプロパティにworkerを追加する、現在追加されているworkerをスピンさせ、firtstTaskを実行する.
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// ,
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// ,new Worker , Worker, t worker ,
// t.start Worker run , worker runWorker .
// , , runWorker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // workers.add ,
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // workers , workers
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); //
}
if (workerAdded) {
t.start(); // runWorker
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.Worker
:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable -- Runnable, AQS
:
final Thread thread; worker
Runnable firstTask; worker runnable task.
run :
public void run() {
runWorker(this);
}
runWorkerメソッドとは、firstTaskタスクを実行する後、スピンし、実行するタスクの実行をキューから取得するプロセスである.これまでrunWorkerがいつ実行するかを考えていたが、addWorkerのコードの注釈に戻ると、addメソッドでworkerを作成すると同時にworkerのrunメソッドを実行するようになったことがわかる.スピンが発生した.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // , ,
// . allowCoreThreadTimeOut,keepAliveTime ,
//getTask=null,
w.lock(); // , worker, ,
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // worker +1
w.unlock(); //worker
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask :, :
// allowCoreThreadTimeOut = true || > corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// , poll keepAliveTime null,
// take ,take .
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
だからまとめてみましょう.Runnableがスレッドに投げ込まれると:
現在のスレッドプール数runは立ち上がり、スピンし、実行可能なタスク実行をキューから絶えず取得する.
現在のスレッドプール数>=corePoolSize、ブロックキューにタスクを追加するキューがいっぱいになったら、最大スレッドプール数maxPoolSize まで新しいスレッド(新しいWorker)の作成を続行します.
拒否ポリシーを実行します.
上はRunnableのオンラインスレッドプールでの実行であり、Callableメソッドである.submitを使用する場合.違います.コードを見続け、使い方を見てみましょう.
ExecutorService runnableService = Executors.newFixedThreadPool(3);
Future r1 = runnableService.submit(new TestCallable(1));
r1.get() result
どうやって実現したのでしょうか?submitがcallableをコミットと、FutureTaskインスタンスがパッケージされます.FutureTaskはRunnableFuture、Runnable、Futureを実現した.簡単に言えば、それは結果を得ることができるRunnableです.
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}
submitはCallableタスクの後、executeメソッドを実行する、以前に実行したRunnableステップと何の差もないが、現在のタスクを実行するとき、FutureTaskは独自のrunメソッドを実行する.コードを見てみましょう
3.FutureTask
重要な属性
/** The underlying callable; nulled out after running */ callable
private Callable callable;
/** The result to return or exception to throw from get() */ //callable
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner; //
/** Treiber stack of waiting threads */
private volatile WaitNode waiters; //get
runメソッドのソースコードは以下の通りである.runnableインスタンスの方法とは異なり、タスク実行後の結果が保存される.
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable; // callble
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // callable call , result
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); // task , result outcome , .
FutureTask get result
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
getメソッドのソースコードは、現在のtask状態が未完了、すなわちresultが計算されていない場合、getが来る.スレッドは、ステータスが完了するまでブロックされ、resultに戻る.
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
getメソッドの中のawaitDoneはスレッドをブロックし、喚起する必要があるスレッドをキューrunメソッドの中のset喚起スレッドに直列にし、順番にキューの中の待機スレッドを喚起する.ノード=nullをgcにする.この2つの方法のコードは自分で見ることができる.