Minaのスレッドプール実装解析(1)
スレッドプールは、同時アプリケーションで、各タスク呼び出しのオーバーヘッドを低減し、パフォーマンスを向上させるためによく使用されるテクノロジーです.この技術はminaで多く使用されており、Executorsのファクトリメソッドによってスレッドプールが構築されているほか、ThreadPoolExecutorが独自のスレッドプールを提供しているOrderedThreadPoolExecutorとUnorderedThreadPoolExecutorからも継承されている.この2つは主にExecutorFilterフィルタに適用される.このフィルタはmina内部で実装される多くのフィルタの1つであり,I/O eventsをスレッドプールにコミットして同じIOSessionのイベントを同時に処理するのが主な役割であり,そのデフォルトのスレッドプールの構造は前者である.この2つのスレッドプールの違いは,I/Oイベントを同時に処理する場合,前者は同じセッションのイベントの処理順序を保証できるが,後者は保証できないため,セッションClosedイベントがmessageReceivedイベントより前に処理される可能性があることである.次に、イベントの処理順序をコードから復号する方法を面接します.
Minaのソースコードを分析する最大の感覚は、そのマルチスレッドアプリケーションの精細さであり、ソースコードから自分の疑問を解決するたびに、言葉では言い表せない喜びがある.一般的なフレームワークのソースコードが主に設計構造を見ると、Minaのソースコードのすばらしさは具体的な実現にあり、一見複雑で混乱しているが、ほほほ.まずソースの一部を見てみましょう.
まず、OrderedThreadPoolExecutorの一部の実装を参照してください.これは、ThreadPoolExecutorのいくつかの静的定数に加えて、独自のいくつかの変数をリストします.EXIT_SIGNALは空を表すIOSessionで、このスレッドプールで得られたものがすべてEXIT_であればSIGNAL、それでは処理も終わります.waitingSessionsは利用可能なI/Oセッションを格納するキューであり、このキューが後のマルチスレッド処理セッションのI/Oイベントに秩序ある役割を果たしている.このデータ機構はUnorderedThreadPoolExecutor内にはなく、後でイベントに対する粒度が前者より大きいことがわかる.workersはWorkerの集合であり、各WorkerはRunnableインタフェースを実現し、スレッドプールはこれらのWorkerスレッドを管理して同時イベント処理を実行する.イメージ的なネーミングで、これらのWorkerはスレッドプール内のアルバイトです.idleWorkersは、空き労働者をタイムリーに統計してさらなる搾取を担当しており、マルチスレッドでの数値の増減にすぎないため、atomicパッケージを使用することは間違いなく最善の選択です.最後にSessionTasksQueueという内部クラスですが、この内部のデータ構造は実際にはキューであり、各IOSessionに対応するI/Oイベントの処理を担当しています.
このように
waitingSessionsでIOSessionを区別し、SessionTasksQueueで各IOSessionのI/Oイベントを区別するこの2層構造は、秩序化処理にデータ構造の保証を提供することができる.UnorderedThreadPoolExecutorと比較すると、IOSessionではなくI/OイベントのアクセスキューLinkedBlockingQueueのインスタンスのみが提供されます.上記のソースコードから、getSessionTasksQueue()メソッドは、特定のIOSessionに関連付けられたイベントキューを取得しようとし、そうでない場合はIOSessionにイベントキューのプロパティを追加することがわかります.addWorker()は、スレッドプールの管理に新しいWorkerスレッドを開始します.これらの操作は、順序付けされているスレッドプールでも無秩序なスレッドプールでもほぼ一致します.両者の差が最も反映されるのはスレッドプールのexecute()メソッドとその内部のWorkerのrun()の実現であり,異なるデータ構造を用いてI/Oイベントのアクセス処理を行うことで両者の差が現れる.まず、秩序ある実装を見てみましょう.
この方法は,I/Oイベントをスレッドプール処理,すなわちI/Oイベントの格納にコミットすることと簡単に理解できる.前述した順序付けされたスレッドプールの実装は2層構造を採用しているので、コードがはっきりしていて、まずI/Oイベントに対応するIOSessionを探して、それからIOSessionのイベントキュー属性(なければ作成)を探してイベントをキューに追加します.
コードセグメント1の動作の原子性を保証するためsynchronizedのロック機構を用いた.offerSessionここでの役割はwaitingSessions内の非EXITを保証することです.SIGNALのIOSessionは唯一です.addWorkerIfNecessary()は、空き労働者がイベントを処理していない場合に新しい人手(スレッド)を追加し、最終的にaddWorker()を呼び出す.無秩序スレッドプールの実装を見てみましょう.
これは簡単に明らかであり,getQueue()はクラスのコンストラクタが提供するLinkedBlockingQueueの例を得る.offer()メソッドは、非ブロックのエンキュー実装に属するため、キューが不満であっても現在のスレッドをブロックしません.したがって、このI/Oイベントのストレージは、実際にはIOSessionの区分には関与しません.
同じIOSessionの複数のI/Oイベントは同じキューに保存され、異なるIOSessionのI/Oイベントもこのキューに保存されます.順序付きスレッド・プールの実装は、次のとおりです.
同じIOSessionの複数のI/Oイベントは同じキューにのみ存在し、各IOSessionには独自のイベントキューが対応しています.
Minaのソースコードを分析する最大の感覚は、そのマルチスレッドアプリケーションの精細さであり、ソースコードから自分の疑問を解決するたびに、言葉では言い表せない喜びがある.一般的なフレームワークのソースコードが主に設計構造を見ると、Minaのソースコードのすばらしさは具体的な実現にあり、一見複雑で混乱しているが、ほほほ.まずソースの一部を見てみましょう.
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
...
private static final IoSession EXIT_SIGNAL = new DummySession();
private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
private final Set<Worker> workers = new HashSet<Worker>();
private final AtomicInteger idleWorkers = new AtomicInteger();
private long completedTaskCount;
private volatile boolean shutdown;
...
private SessionTasksQueue getSessionTasksQueue(IoSession session) {
SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
if (queue == null) {
queue = new SessionTasksQueue();
SessionTasksQueue oldQueue =
(SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
if (oldQueue != null) {
queue = oldQueue;
}
}
return queue;
}
private void addWorker() {
synchronized (workers) {
if (workers.size() >= super.getMaximumPoolSize()) {
return;
}
// Create a new worker, and add it to the thread pool
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
// As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
// Now, we can start it.
thread.start();
workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
}
}
}
private class SessionTasksQueue {
private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
private boolean processingCompleted = true;
}
...
}
まず、OrderedThreadPoolExecutorの一部の実装を参照してください.これは、ThreadPoolExecutorのいくつかの静的定数に加えて、独自のいくつかの変数をリストします.EXIT_SIGNALは空を表すIOSessionで、このスレッドプールで得られたものがすべてEXIT_であればSIGNAL、それでは処理も終わります.waitingSessionsは利用可能なI/Oセッションを格納するキューであり、このキューが後のマルチスレッド処理セッションのI/Oイベントに秩序ある役割を果たしている.このデータ機構はUnorderedThreadPoolExecutor内にはなく、後でイベントに対する粒度が前者より大きいことがわかる.workersはWorkerの集合であり、各WorkerはRunnableインタフェースを実現し、スレッドプールはこれらのWorkerスレッドを管理して同時イベント処理を実行する.イメージ的なネーミングで、これらのWorkerはスレッドプール内のアルバイトです.idleWorkersは、空き労働者をタイムリーに統計してさらなる搾取を担当しており、マルチスレッドでの数値の増減にすぎないため、atomicパッケージを使用することは間違いなく最善の選択です.最後にSessionTasksQueueという内部クラスですが、この内部のデータ構造は実際にはキューであり、各IOSessionに対応するI/Oイベントの処理を担当しています.
このように
waitingSessionsでIOSessionを区別し、SessionTasksQueueで各IOSessionのI/Oイベントを区別するこの2層構造は、秩序化処理にデータ構造の保証を提供することができる.UnorderedThreadPoolExecutorと比較すると、IOSessionではなくI/OイベントのアクセスキューLinkedBlockingQueueのインスタンスのみが提供されます.上記のソースコードから、getSessionTasksQueue()メソッドは、特定のIOSessionに関連付けられたイベントキューを取得しようとし、そうでない場合はIOSessionにイベントキューのプロパティを追加することがわかります.addWorker()は、スレッドプールの管理に新しいWorkerスレッドを開始します.これらの操作は、順序付けされているスレッドプールでも無秩序なスレッドプールでもほぼ一致します.両者の差が最も反映されるのはスレッドプールのexecute()メソッドとその内部のWorkerのrun()の実現であり,異なるデータ構造を用いてI/Oイベントのアクセス処理を行うことで両者の差が現れる.まず、秩序ある実装を見てみましょう.
public void execute(Runnable task) {
...
IoEvent event = (IoEvent) task;
IoSession session = event.getSession();
SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
boolean offerSession;
boolean offerEvent = eventQueueHandler.accept(this, event);
if (offerEvent) {
synchronized (tasksQueue) {
//*********************************1**********************************
tasksQueue.offer(event);
if (sessionTasksQueue.processingCompleted) {
sessionTasksQueue.processingCompleted = false;
offerSession = true;
} else {
offerSession = false;
}
...
//*********************************end********************************
}
} else {
offerSession = false;
}
if (offerSession) {
waitingSessions.offer(session);
}
addWorkerIfNecessary();
...
}
この方法は,I/Oイベントをスレッドプール処理,すなわちI/Oイベントの格納にコミットすることと簡単に理解できる.前述した順序付けされたスレッドプールの実装は2層構造を採用しているので、コードがはっきりしていて、まずI/Oイベントに対応するIOSessionを探して、それからIOSessionのイベントキュー属性(なければ作成)を探してイベントをキューに追加します.
コードセグメント1の動作の原子性を保証するためsynchronizedのロック機構を用いた.offerSessionここでの役割はwaitingSessions内の非EXITを保証することです.SIGNALのIOSessionは唯一です.addWorkerIfNecessary()は、空き労働者がイベントを処理していない場合に新しい人手(スレッド)を追加し、最終的にaddWorker()を呼び出す.無秩序スレッドプールの実装を見てみましょう.
public void execute(Runnable task) {
...
IoEvent e = (IoEvent) task;
boolean offeredEvent = queueHandler.accept(this, e);
if (offeredEvent) {
getQueue().offer(e);
}
addWorkerIfNecessary();
...
}
これは簡単に明らかであり,getQueue()はクラスのコンストラクタが提供するLinkedBlockingQueueの例を得る.offer()メソッドは、非ブロックのエンキュー実装に属するため、キューが不満であっても現在のスレッドをブロックしません.したがって、このI/Oイベントのストレージは、実際にはIOSessionの区分には関与しません.
同じIOSessionの複数のI/Oイベントは同じキューに保存され、異なるIOSessionのI/Oイベントもこのキューに保存されます.順序付きスレッド・プールの実装は、次のとおりです.
同じIOSessionの複数のI/Oイベントは同じキューにのみ存在し、各IOSessionには独自のイベントキューが対応しています.