Minaのスレッドプール実装解析(1)


スレッドプールは、同時アプリケーションで、各タスク呼び出しのオーバーヘッドを低減し、パフォーマンスを向上させるためによく使用されるテクノロジーです.この技術はminaで多く使用されており、Executorsのファクトリメソッドによってスレッドプールが構築されているほか、ThreadPoolExecutorが独自のスレッドプールを提供しているOrderedThreadPoolExecutorとUnorderedThreadPoolExecutorからも継承されている.この2つは主にExecutorFilterフィルタに適用される.このフィルタはmina内部で実装される多くのフィルタの1つであり,I/O eventsをスレッドプールにコミットして同じIOSessionのイベントを同時に処理するのが主な役割であり,そのデフォルトのスレッドプールの構造は前者である.この2つのスレッドプールの違いは,I/Oイベントを同時に処理する場合,前者は同じセッションのイベントの処理順序を保証できるが,後者は保証できないため,セッションClosedイベントがmessageReceivedイベントより前に処理される可能性があることである.次に、イベントの処理順序をコードから復号する方法を面接します.
Minaのソースコードを分析する最大の感覚は、そのマルチスレッドアプリケーションの精細さであり、ソースコードから自分の疑問を解決するたびに、言葉では言い表せない喜びがある.一般的なフレームワークのソースコードが主に設計構造を見ると、Minaのソースコードのすばらしさは具体的な実現にあり、一見複雑で混乱しているが、ほほほ.まずソースの一部を見てみましょう.
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
    ...
    private static final IoSession EXIT_SIGNAL = new DummySession();

    private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
    
    private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();

    private final Set<Worker> workers = new HashSet<Worker>();

    private final AtomicInteger idleWorkers = new AtomicInteger();

    private long completedTaskCount;
    private volatile boolean shutdown;
    ...
    private SessionTasksQueue getSessionTasksQueue(IoSession session) {
        SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);

        if (queue == null) {
            queue = new SessionTasksQueue();
            SessionTasksQueue oldQueue = 
                (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
            
            if (oldQueue != null) {
                queue = oldQueue;
            }
        }
        
        return queue;
    }

    private void addWorker() {
        synchronized (workers) {
            if (workers.size() >= super.getMaximumPoolSize()) {
                return;
            }

            // Create a new worker, and add it to the thread pool
            Worker worker = new Worker();
            Thread thread = getThreadFactory().newThread(worker);
            
            // As we have added a new thread, it's considered as idle.
            idleWorkers.incrementAndGet();
            
            // Now, we can start it.
            thread.start();
            workers.add(worker);

            if (workers.size() > largestPoolSize) {
                largestPoolSize = workers.size();
            }
        }
    }

    private class SessionTasksQueue {
        private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
        
        private boolean processingCompleted = true;
    }
    ...
}

まず、OrderedThreadPoolExecutorの一部の実装を参照してください.これは、ThreadPoolExecutorのいくつかの静的定数に加えて、独自のいくつかの変数をリストします.EXIT_SIGNALは空を表すIOSessionで、このスレッドプールで得られたものがすべてEXIT_であればSIGNAL、それでは処理も終わります.waitingSessionsは利用可能なI/Oセッションを格納するキューであり、このキューが後のマルチスレッド処理セッションのI/Oイベントに秩序ある役割を果たしている.このデータ機構はUnorderedThreadPoolExecutor内にはなく、後でイベントに対する粒度が前者より大きいことがわかる.workersはWorkerの集合であり、各WorkerはRunnableインタフェースを実現し、スレッドプールはこれらのWorkerスレッドを管理して同時イベント処理を実行する.イメージ的なネーミングで、これらのWorkerはスレッドプール内のアルバイトです.idleWorkersは、空き労働者をタイムリーに統計してさらなる搾取を担当しており、マルチスレッドでの数値の増減にすぎないため、atomicパッケージを使用することは間違いなく最善の選択です.最後にSessionTasksQueueという内部クラスですが、この内部のデータ構造は実際にはキューであり、各IOSessionに対応するI/Oイベントの処理を担当しています.
このように
waitingSessionsでIOSessionを区別し、SessionTasksQueueで各IOSessionのI/Oイベントを区別するこの2層構造は、秩序化処理にデータ構造の保証を提供することができる.UnorderedThreadPoolExecutorと比較すると、IOSessionではなくI/OイベントのアクセスキューLinkedBlockingQueueのインスタンスのみが提供されます.上記のソースコードから、getSessionTasksQueue()メソッドは、特定のIOSessionに関連付けられたイベントキューを取得しようとし、そうでない場合はIOSessionにイベントキューのプロパティを追加することがわかります.addWorker()は、スレッドプールの管理に新しいWorkerスレッドを開始します.これらの操作は、順序付けされているスレッドプールでも無秩序なスレッドプールでもほぼ一致します.両者の差が最も反映されるのはスレッドプールのexecute()メソッドとその内部のWorkerのrun()の実現であり,異なるデータ構造を用いてI/Oイベントのアクセス処理を行うことで両者の差が現れる.まず、秩序ある実装を見てみましょう.
    public void execute(Runnable task) {
        ...
        IoEvent event = (IoEvent) task;
        
        IoSession session = event.getSession();
        
        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
        
        boolean offerSession;

        boolean offerEvent = eventQueueHandler.accept(this, event);
        
        if (offerEvent) {
            synchronized (tasksQueue) {
                //*********************************1**********************************
                tasksQueue.offer(event);
                
                if (sessionTasksQueue.processingCompleted) {
                    sessionTasksQueue.processingCompleted = false;
                    offerSession = true;
                } else {
                    offerSession = false;
                }
                ...
                //*********************************end********************************
            }
        } else {
            offerSession = false;
        }

        if (offerSession) {
            waitingSessions.offer(session);
        }

        addWorkerIfNecessary();
        ...
    }

この方法は,I/Oイベントをスレッドプール処理,すなわちI/Oイベントの格納にコミットすることと簡単に理解できる.前述した順序付けされたスレッドプールの実装は2層構造を採用しているので、コードがはっきりしていて、まずI/Oイベントに対応するIOSessionを探して、それからIOSessionのイベントキュー属性(なければ作成)を探してイベントをキューに追加します.
コードセグメント1の動作の原子性を保証するためsynchronizedのロック機構を用いた.offerSessionここでの役割はwaitingSessions内の非EXITを保証することです.SIGNALのIOSessionは唯一です.addWorkerIfNecessary()は、空き労働者がイベントを処理していない場合に新しい人手(スレッド)を追加し、最終的にaddWorker()を呼び出す.無秩序スレッドプールの実装を見てみましょう.
    public void execute(Runnable task) {
        ...
        IoEvent e = (IoEvent) task;
        boolean offeredEvent = queueHandler.accept(this, e);
        if (offeredEvent) {
            getQueue().offer(e);
        }

        addWorkerIfNecessary();
        ...
    }

これは簡単に明らかであり,getQueue()はクラスのコンストラクタが提供するLinkedBlockingQueueの例を得る.offer()メソッドは、非ブロックのエンキュー実装に属するため、キューが不満であっても現在のスレッドをブロックしません.したがって、このI/Oイベントのストレージは、実際にはIOSessionの区分には関与しません.
同じIOSessionの複数のI/Oイベントは同じキューに保存され、異なるIOSessionのI/Oイベントもこのキューに保存されます.順序付きスレッド・プールの実装は、次のとおりです.
同じIOSessionの複数のI/Oイベントは同じキューにのみ存在し、各IOSessionには独自のイベントキューが対応しています.