Dubboソース分析のスレッド池
58347 ワード
記事の目次クライテリア Dubboスレッド池分類 Dubboスレッド工場 スレッド池拒否ポリシー CachedThreadPool FixedThed Pool LimitedThreadPool Eager ThreadPool 前言
Dubboスレッド池の分類 GitHub上のDubboの最新のソースコードの中でまた新しいスレッドプールを提供しました。 よく知っていますか?
Dubboスレッド工場
スレッド池を見る前に、まず
スレッドが拒否されたポリシー
Dubboもスレッドプール拒否策を書き直しました。主な操作はオンラインプログラムが拒否された時にスタック情報をファイルに保存します。具体的には私が分析したこの文章を見ることができます。Dubboのソースコード分析のためのスレッドプールの拒否戦略はここでは詳しく紹介しません。
次にスレッド池を見ます。
CachedThreadPool
Fixed ThreadPool
LimitedThreadPool
Eager ThreadPool
注:このスレッド池はdubbo-2.6.5以下では実現されていません。
まず、このスレッドのプールを紹介します。スレッドの池にあるすべてのコアスレッドが忙しくなっている時に、新しいタスクを追加すると、ブロック列に入れずに新しいスレッドを作成します。
まず、カスタムブロック
段
netty
を学んだことがあると信じていますが、ChannelHandler
の各々はそのEventLoop(I/O )
を通じてそのイベントを処理していますので、このスレッドをブロックしないことが大切です。これは全体のI/O
に負の影響を与えます。Dubbo
もそうであるので、Dubbo
は、イベントを非同期的に処理するためのいくつかのスレッドプールを定義している。Dubboスレッド池の分類
Dubbo-2.6.5
バージョンには三つのスレッドがあります。CachedThreadPool
FixedThreadPool
LimitedThreadPool
EagerThreadPool
EagerThreadPool
を除いて、他のスレッドプールはjava.util.concurrent.Executors
を提供してくれたようです。ただ、Dubbo
は自分でもう一度実現しました。カスタムパラメータを調整してください。(スレッドの池に詳しくないのはこの記事です。JAVAスレッドの池ThreadPool Exector詳細は分かりません。)Dubboスレッド工場
スレッド池を見る前に、まず
Dubbo
カスタムスレッド池工場を見てください。public class NamedThreadFactory implements ThreadFactory {
//
protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
//
protected final AtomicInteger mThreadNum = new AtomicInteger(1);
//
protected final String mPrefix;
//
protected final boolean mDaemon;
//
protected final ThreadGroup mGroup;
// :
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
mPrefix = prefix + "-thread-";
mDaemon = daemon;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
このスレッド工場は私達のスレッドプールの名前を作成し、スレッドの名前を作成しました。私達がスタック情報を確認するのに便利です。スレッドは保護/ユーザーの決定権を使用者が決定します。上のスレッド工場はdebug
バージョンのソースコードです。実はGitHub上のDubboの最新のソースコードの中にスレッド工場のためにもう一つの層が封入されています。public class NamedInternalThreadFactory extends NamedThreadFactory {
public NamedInternalThreadFactory() {
super();
}
public NamedInternalThreadFactory(String prefix) {
super(prefix, false);
}
public NamedInternalThreadFactory(String prefix, boolean daemon) {
super(prefix, daemon);
}
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
}
何もしていないようですが、詳細はDubbo-2.6.5
方法にあります。父newThread
に直接使用されているNamedThreadFactory
です。ここで使っているのはnew Thread
です。何の違いがありますか?よく見てください。new InternalThread
ということは、もう一回包装しました。何もしていないようです。この機能はここでは見られません。InternalThread
のソースコードを見たことがあります。Thread
はNetty
Netty
があります。FastThreadLocal
はここでも同じです。FastThreadLocalThread
のソースコードを参考にしました。Dubbo
とNetty
の違いは何ですか?私のこの文章のJavaソースのThreadLocalに移してください。ここでは余計な話はしません。スレッドが拒否されたポリシー
Dubboもスレッドプール拒否策を書き直しました。主な操作はオンラインプログラムが拒否された時にスタック情報をファイルに保存します。具体的には私が分析したこの文章を見ることができます。Dubboのソースコード分析のためのスレッドプールの拒否戦略はここでは詳しく紹介しません。
次にスレッド池を見ます。
CachedThreadPool
public class CachedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
// :Dubbo
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// :0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// :Integer.MAX_VALUE
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// :0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// :60s
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
FastThreadLocal
とは、スレッドが伸縮し、イベントが多いときに新しいスレッドが作成され、イベントが少ない場合には、一定時間を超えてスレッドが回収されます。ThreadLocal
:実は、このCachedThreadPool
はまともな
ではない。おそらく、このスレッドプールのブロック列はデフォルトCachedThreadPool
であるが、ユーザーがcache
変数を構成し、その値が大きいと、使用されるブロック列はSynchronousQueue
である。このとき、queues
がデフォルト値LinkedBlockingQueue
を再使用すると、イベントのブロックが発生する。Fixed ThreadPool
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
corethreads
はコアスレッドと最大スレッドサイズが固定値であることを示しており、0
においてもこのように処理されているが、ここで注意すべき点がある。FixedThreadPool
:Dubbo
スレッドプールを使用すると、デフォルトのスレッドプール数は
であり、デフォルトのブロック列サイズはfixed
であり、デフォルトで使用されているブロック列は200
である。あなたのビジネスイベントが合併した場合、または処理時間が長い場合、ブロック列サイズ、すなわち0
変数を適切に調整してください。そうでないと大量の要求が破棄されます。このスレッド池もSynchronousQueue
がデフォルトで使っているスレッド池で、事故が多いと推定されています。LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
このスレッドの池は面白いです。スレッドの池のスレッド数は上限まで成長してもいいです。いつまでも回収しないというのは疑似概念です。コードを見ると、スレッド設定スレッドの回収時間はqueues
に制限されています。永遠に回収しないと理解できます。これは注意すべきことと言えば、スレッド池は回収されていないので、Dubbo
変数であるスレッド最大制限の値は大きすぎてはいけません。標準Long.MAX_VALUE
を使用すればいいです。threads
を避けることができます。Eager ThreadPool
注:このスレッド池はdubbo-2.6.5以下では実現されていません。
まず、このスレッドのプールを紹介します。スレッドの池にあるすべてのコアスレッドが忙しくなっている時に、新しいタスクを追加すると、ブロック列に入れずに新しいスレッドを作成します。
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
大体の規則は上と同じです。中には見たことがない種類が二つあります。200
とOOM
があります。まず、カスタムブロック
TaskQueue
を見てください。public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
// offer
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// offer
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
//
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
// offer
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
最も主要なロジックは、書き換えられたEagerThreadPoolExecutor
方法の中で、この方法を書き換えることによって、スレッド池に任務を提出する時に、ブロック列がいっぱいになったと偽って、スレッド池を新規にスレッド化してタスクを実行することができます。段
TaskQueue
類のoffer
コードを貼り付けます。 public void execute(Runnable command) {
/* null */
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/* , addWorker */
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/* , , */
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/* , */
//tip: , addWorker ,
else if (!addWorker(command, false))
reject(command);
}
詳しいことを知りたいのですが、このブログのスレッド池のソースコード分析はどうやってコアスレッドが破壊されないことを保証しますか?ThreadPoolExecutor
のソースコードを通して、ブロック列に失敗したときは、直接execute
メソッドを呼び出して、新しいスレッドを作成してタスクを実行します。書き換えたThreadPoolExecutor
のaddWorker
と方法が一致しました。TaskQueue
類を引き続き見ます。// ThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
//
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
//
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
// ThreadPoolExecutor
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
// execute
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
書き換えスレッド池も簡単で、offer
に使用される現在のジョブ数のインターフェースを提供します。EagerThreadPoolExecutor
方法もよく分かります。なぜTaskQueue
方法でexecute
の異常を捕捉したのかという小仲間もいるかもしれない。これはまた、execute
ソースに戻ります。//
else if (!addWorker(command, false))
reject(command);
//
このコードは、RejectedExecutionException
が失敗したときにThreadPoolExecutor
を投げます。書き換えられていないスレッドのプールでこの異常な条件は、ブロック列がいっぱいになり、スレッド数が最大限度に達したことです。書き換えられたaddWorker
は、待ち行列の中で渋滞しています。RejectedExecutionException
はまた、オンラインプログラムのスレッド数が最大制限を下回る場合はTaskQueue
に戻ります。offer
に戻っても、新しいブロック列が満杯とは言えませんので、false
が必要です。//
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
//
スレッド池の分析が終わりました。提出記録を見ました。false
は当時の私達の会社の先輩が書いたもののようです。retryOffer
のEagerThreadPool
です。すごいですね。みんなで一生懸命勉強します。coding