スレッドプールソースプロセスの整理
12357 ワード
ずっとスレッドプールを使っていて、最近暇なので、実行プロセスを見たいです!ついでに記録を!構造方法を一目見る
excuteの方法を見てみましょう
ctl AtomicIntegerタイプのスレッドプールの動作と動作を表すworkerの数private static final int RUNNING=-1<
まとめ:原子性の増加workerのcountが追加に成功した後!スレッドをwrokerクラスにカプセル化し、スレッドの状態が正常であると判断したら、workerをworker対列に追加し、スレッドのstartメソッドを呼び出します!runメソッド実行!次にrunworkerメソッドを呼び出してworkerクラスを見てみましょう
runworkerメソッドは、実行前に最初にunlockし、中断を許可します!whileはnullではないと判断して実行を開始し、すぐにロックをかけます!この場合、他のスレッドの割り込みは許可されません.現在のスレッドの実行状態が判断され、終了されていないと判断した後、workerにカプセル化されたrunnableのrunメソッド、すなわちスレッドプールから転送されたタスクが実行され、現在のworkerのtaskおよびgetTask()が取得したタスクはnullでない方が繰り返し実行されます!タスクが実行できない場合は、最終的にprocessWorkerExit()スレッドを実行してメソッドを終了します.
次にgettaskメソッドがどのようにタスクを取得するかを見てみましょう!
gettaskメソッド初期化タイムアウトtagはfalse,1)はループに入り,タスクキューにタスクがある場合に最大スレッド数が設定した最大数を超えるとスレッドを減らそうと判断し始める.2』キューからタスクを取り、タスク個数>コアスレッド個数の場合、pollを呼び出してタスクキューworkQueueの先頭からrunnableを取り、タスク個数の場合
プロセスWorkerExit()スレッド終了メソッドの続行
総括1』completedAbruptlyがtrueに対して実行過程に異常が発生したことを説明した場合、直接workerスレッドを減らし、ロックし、workersキューからworker 2を除去する.ONEがtrueの場合は1つだけ終了します.そうでない場合、workersキューはすべて終了します.3」スレッドは停止する必要がありますか.スレッドステータスがTIDYINGの場合、すべてのタスクが終了したことを示します.ctlレコードの「タスク数」が0の場合、スレッドプールはTIDYINGステータスになります.terminated()4を実行します.スレッドステータスがrunningまたはshutdownの場合、現在のスレッドが突然終了した場合、addWorker()現在のスレッドが突然終了していない場合、現在のスレッド数<メンテナンスするスレッド数、addWorker()スレッドプールshutdown()を呼び出すと、workQueueが空前になるまでスレッドプールはcorePoolSizeスレッドを維持し、順次破棄されます.
まとめ:簡単に言えばスレッドプールはスレッドファクトリを通じていくつかのスレッドを作成し、これらのスレッド内でfor(;)を使用します.常にブロックキューからタスクを取り、excuteでコミットしたタスクが最も重い場合は、このブロックキューに入ります.タスクの実行中、スレッドとブロックキューが満額になった場合、コールバックが投げ出されます.RejectedExecutionHandler、ここではタスクを失うべきか、異常を投げ出すべきか、最も古いタスクを失うべきか、などを処理します.タスクの実行が完了すると、スレッドは順次終了します.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { // !
if (corePoolSize < 0 || // 0
maximumPoolSize <= 0 || // 0
maximumPoolSize < corePoolSize || //
keepAliveTime < 0) // 0
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
excuteの方法を見てみましょう
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //AtomicInteger worker
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); //AtomicInteger worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // , ,
reject(command);
else if (workerCountOf(recheck) == 0) // 0
addWorker(null, false);
}
else if (!addWorker(command, false))//
reject(command); //
}
ctl AtomicIntegerタイプのスレッドプールの動作と動作を表すworkerの数private static final int RUNNING=-1<
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { //
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // running
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; // null , returnfalse
for (;;) {
int wc = workerCountOf(c);//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // core = true wc corePoolSize , maximumPoolSize boolean
return false; // false
if (compareAndIncrementWorkerCount(c)) // workercount ,
break retry; // !
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // ,
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false; //
boolean workerAdded = false; // start
Worker w = null;
try {
w = new Worker(firstTask); // worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // , workers
int s = workers.size();
if (s > largestPoolSize) // , ! largestPoolSize ?
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); //
}
if (workerAdded) {
t.start(); // start
workerStarted = true;// true
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);// workers ! workercount
}
return workerStarted;
}
まとめ:原子性の増加workerのcountが追加に成功した後!スレッドをwrokerクラスにカプセル化し、スレッドの状態が正常であると判断したら、workerをworker対列に追加し、スレッドのstartメソッドを呼び出します!runメソッド実行!次にrunworkerメソッドを呼び出してworkerクラスを見てみましょう
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || // >=stop
(Thread.interrupted() && //
runStateAtLeast(ctl.get(), STOP))) && //ctl, , !
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // task
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally { // , ,completedAbruptly true false
processWorkerExit(w, completedAbruptly);
}
}
runworkerメソッドは、実行前に最初にunlockし、中断を許可します!whileはnullではないと判断して実行を開始し、すぐにロックをかけます!この場合、他のスレッドの割り込みは許可されません.現在のスレッドの実行状態が判断され、終了されていないと判断した後、workerにカプセル化されたrunnableのrunメソッド、すなわちスレッドプールから転送されたタスクが実行され、現在のworkerのtaskおよびgetTask()が取得したタスクはnullでない方が繰り返し実行されます!タスクが実行できない場合は、最終的にprocessWorkerExit()スレッドを実行してメソッドを終了します.
次にgettaskメソッドがどのようにタスクを取得するかを見てみましょう!
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // null, workerCount
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // , >
if ((wc > maximumPoolSize || (timed && timedOut))// count>1 null workcount
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;// workercount !
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// poll
workQueue.take(); //false
if (r != null) // null
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
gettaskメソッド初期化タイムアウトtagはfalse,1)はループに入り,タスクキューにタスクがある場合に最大スレッド数が設定した最大数を超えるとスレッドを減らそうと判断し始める.2』キューからタスクを取り、タスク個数>コアスレッド個数の場合、pollを呼び出してタスクキューworkQueueの先頭からrunnableを取り、タスク個数の場合
プロセスWorkerExit()スレッド終了メソッドの続行
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount(); //runworker , workercount
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); //
} finally {
mainLock.unlock();
}
tryTerminate(); //
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // stop shoutdown running
if (!completedAbruptly) { //
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//allowCoreThreadTimeOut keeptimeout ,min 0,
if (min == 0 && !workQueue.isEmpty()) // min 0, !
min = 1;
if (workerCountOf(c) >= min) // , return addworker()
return; // replacement not needed
}
addWorker(null, false);//workercount , worker
}
}
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
if (isRunning(c) || // shutdown running, shutdown, null return
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); // ONLY_ONE true
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //TIDYING
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));//,ctl ” ” 0
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
総括1』completedAbruptlyがtrueに対して実行過程に異常が発生したことを説明した場合、直接workerスレッドを減らし、ロックし、workersキューからworker 2を除去する.ONEがtrueの場合は1つだけ終了します.そうでない場合、workersキューはすべて終了します.3」スレッドは停止する必要がありますか.スレッドステータスがTIDYINGの場合、すべてのタスクが終了したことを示します.ctlレコードの「タスク数」が0の場合、スレッドプールはTIDYINGステータスになります.terminated()4を実行します.スレッドステータスがrunningまたはshutdownの場合、現在のスレッドが突然終了した場合、addWorker()現在のスレッドが突然終了していない場合、現在のスレッド数<メンテナンスするスレッド数、addWorker()スレッドプールshutdown()を呼び出すと、workQueueが空前になるまでスレッドプールはcorePoolSizeスレッドを維持し、順次破棄されます.
まとめ:簡単に言えばスレッドプールはスレッドファクトリを通じていくつかのスレッドを作成し、これらのスレッド内でfor(;)を使用します.常にブロックキューからタスクを取り、excuteでコミットしたタスクが最も重い場合は、このブロックキューに入ります.タスクの実行中、スレッドとブロックキューが満額になった場合、コールバックが投げ出されます.RejectedExecutionHandler、ここではタスクを失うべきか、異常を投げ出すべきか、最も古いタスクを失うべきか、などを処理します.タスクの実行が完了すると、スレッドは順次終了します.