Java同時シリアルスレッドプール


目次
  • ディレクトリ
  • 前言
  • 構想
  • サンプルコード
  • 前言
    2016年の新年に2人の仲間とチャットしたとき、マルチスレッドが並ぶ問題に言及しました.すなわち,サーバの1つのアプリケーションが大量のスレッドを同時に処理する必要がある場合,大きな同時失敗を防止するためにキューに並ぶメカニズムが望ましい.
    大学院生の頃は私もサーバー開発だったので、これにも興味がありました.また、Androidを作ったこの2年間、Androidのソースコードを研究することで、Javaがマルチスレッドを同時に処理することも理解できました.
    では、問題は、シリアルスレッドプールをどのように実現するかです.
    構想
    シリアルスレッドプールとは?つまり、我々のRunnableオブジェクトには、キューの末尾から順番に入り、キューの先頭からRunnableを選択して実行するキューメカニズムがあるはずです.
    私たちが考えを持っている以上:1.必要なデータ構造を考えてみましょうか?キューの末尾からRunnableオブジェクトを挿入し、キューの先頭からRunnableオブジェクトを実行する場合は、当然キューが必要です.JavaのSDKは、両端キュー:ArrayDequeなどの優れたキューデータ構造を提供しています.
  • スレッドの実行に関わるため、まず適切なスレッドプールが必要であり、ThreadPoolExecutorクラスを使用して構築することができます.
  • シリアル実行である以上、シリアルメカニズムをどのように維持するか.tryとfinallyメカニズムにより,入力したRunnableオブジェクトを新しいRunnableオブジェクトに再カプセル化し,新しいRunnableのrunメソッドのtryブロックでRunnableのrunメソッドを実行し,finallyで実行キューヘッダRunnableオブジェクトをキューから呼び出し,スレッドプールで実行するメソッドを入れることができる.

  • サンプルコード
    import java.util.ArrayDeque;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /** * Created by wzy on 16-1-5. */
    public class SerialExecutor {
        private Runnable mActive;
        private ArrayDeque<Runnable> mArrayDeque = new ArrayDeque<>();
    
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        private static final int KEEP_ALIVE = 1;
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingDeque<>(128);
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Serial thread #" + mCount.getAndIncrement());
            }
        };
        private static final ThreadPoolExecutor THREAD_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
    
        public synchronized void execute(final Runnable r) {
            mArrayDeque.offer(new Runnable() {
                @Override
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            //        mActivie  ,        scheduleNext  
            if (mActive == null) {
                scheduleNext();
            }
        }
    
        private void scheduleNext() {
            if ((mActive = mArrayDeque.poll()) != null) {
                THREAD_EXECUTOR.execute(mActive);
            }
        }
    
        public static void main(String[] args) {
            SerialExecutor serialExecutor = new SerialExecutor();
            for (int i = 0; i < 10; i ++) {
                final int j = i;
                serialExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("The num is :" + (j + 1));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }

    実行結果は次のとおりです.
    The num is :1
    The num is :2
    The num is :3
    The num is :4
    The num is :5
    The num is :6
    The num is :7
    The num is :8
    The num is :9
    The num is :10