[Java併発]JavaにおけるExectorフレームワーク(四)

23504 ワード

前言
Java SE 5からのExecutorフレームワークでは、プログラマがマルチスレッドのプログラムを書き込み、タスクの提出と実行を容易にし、自分のタスクを提出するだけで、実行とリターンは自分で管理する必要がありません。主に次のようなオブジェクトがあります。
  • Executors
  • ExecutorService
  • Future
  • Callable
  • ThreadPoolExecutor
  • 次に私達は一つ一つ紹介します。ExecutorsExecutorServiceExecutorServiceは、Executorsのいくつかの静的方法によって生成されうる。Executorsは全Executorsフレームの入口である。次のいくつかの方法を見てもいいです。
  • newCachedThreadPool:キャッシュスレッド池は、先にプールの中に以前に確立されたスレッドがあるかどうかを確認し、もしあれば、reuse.がないなら、新しいスレッドを作ってプールに参加します。
  • newFixedThreadPool:固定サイズのスレッド池。
  • newScheduledThreadPool:一例スレッド池。
  • ExecutorServiceは5つの状態がある。
  • RUNNIG:動作段階において、ExecutorServiceは、新しいスレッドを受け入れることができる(execute()またはsubmit()方法を起動する)。
  • SHUTDOWN:クローズ状態は新しいスレッドを受け付けないが、既に提出されたスレッドは実行される(shutdown方法を呼び出してクローズ状態に入る)。
  • STOP:新しいタスクを受け付けず、列の中に閉塞したタスクを処理せずに、処理中のジョブを中断します。
  • TIDYING:移行状態、つまり全てのタスクが実行されました。現在のスレッドプールには有効なスレッドがありません。この場合、スレッド池の状態はTIDYINGとなり、terminated()メソッドを呼び出します。
  • TEMINATED:終了状態。terminated()メソッドは、完了後の状態を呼び出します。
  • これらの状態の変化は、-RUNNNNIG->SHUTDOWN On invocation of shutdown()、perhaps implicitly in finaliz-()STOP On n invocation of shutdow Now()-SHUTDOWN->DTIYING-DYYinineeeeeeeeeexxxxxxxxxxxxxxxxeeeeeinininininininininininininininininininininininininininineeeeeeeeewwwwwFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFterminated()Hook method has complettedCallableRunnbaleExecutorServiceは、2種類のスレッドオブジェクトRunnableCallableを受け入れることができ、以下を参照してください。
     @org.junit.Test
        public void test() {
            Runnable runnable = () -> System.out.println("Hello World");
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(runnable);
            executorService.shutdown();
        }
    結果は以下の通りです
    Hello World
    Callableを見て、CallableRunnableと似ていますが、Callableは戻り値があり、Futureオブジェクトを返してから、Futureの中の方法を呼び出します。公式サイトではこう定義されています。
    A task that returns a regult and may throw an exception.Implements define a single method with no argments caled call.The Callable interface is to Runnable,in that both are are designed for clease staressdoes not return a relt and cannot throw a checked exception.CallableRunnableと類似しているが、その多くは結果を返す(Vと同様に、クラス署名における汎型のタイプである)。Executorsにおいて、RunnableCallableに適合させる方法が見られる。
     /** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result. This * can be useful when applying methods requiring a * {@code Callable} to an otherwise resultless action. * @param task the task to run * @param result the result to return * @param <T> the type of the result * @return a callable object * @throws NullPointerException if task null */
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
        /** * A callable that runs given task and returns given result */
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    
    次に、Futureのいくつかの定義について:
    A Future represents the reresult of an asynch ronous computation.Methods are provided to check if the computation is complettee,to wait for its complection,and to retrieve the computed.The reult can the computedition。blocking if necessary until it is ready.Cancellation is performed by the cancel method.Additional methods are provided to determine the task compled normally or wance.Once a computation hates compled,Oncethe computation can not be cancelled.If you would like to use a Future for the sake of cancelabilitybut not provide a usable relt,you can declea types of the form Fute
     @org.junit.Test
        public void test() {
            Callable<Integer> callable = () -> 3;
    
            FutureTask<Integer> future = new FutureTask<>(callable);
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.submit(future);
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    リターンの結果があります。
    3
    ThreadFactoryExecutorsでデフォルトのスレッド工場類を作成し、作成したスレッドの名前はpool-N-thread-M:where N is the sequence number of this factory,and M is the sequence number of the thread created by this factory.
       public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
        /** * The default thread factory. */
        static class DefaultThreadFactory implements ThreadFactory {
            /* 2      ,            。 */
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    スレッドの池
    前述のように、いくつかのスレッドがあります。-newCachedThreadPool:キャッシュスレッドがありますか?先にプールに以前に確立されたスレッドがありますか?あれば、reuseです。ないなら、新しいスレッドを作ってプールに参加します。newFixedThreadPool:固定サイズのスレッド池。-newScheduledThreadPool:一例スレッド池。
    次に、これらのスレッド池を見てみましょう。newCachedThreadPool
     @org.junit.Test
        public void test() {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 100; i++) {
                executorService.execute(new MyRunnable());
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    
        class MyRunnable implements Runnable {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        }
    
    出力結果は以下の通りです。
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-4
    pool-1-thread-5
    pool-1-thread-6
    pool-1-thread-7
    pool-1-thread-8
    pool-1-thread-9
    pool-1-thread-10
    .....
    pool-1-thread-100
    newFixedThreadPool
    public class Test {
        @org.junit.Test
        public void test() {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 100; i++) {
                executorService.execute(new MyRunnable());
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    
        class MyRunnable implements Runnable {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        }
    }
    出力結果は以下の通りです。
    ......
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-4
    pool-1-thread-5
    pool-1-thread-6
    pool-1-thread-7
    pool-1-thread-8
    pool-1-thread-9
    pool-1-thread-10
    newScheduledThreadPool
    public class Test {
        @org.junit.Test
        public void test() {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 100; i++) {
                executorService.execute(new MyRunnable());
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    
        class MyRunnable implements Runnable {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        }
    }
    
    出力結果は以下の通りです。
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    これは上记の结论を证明することができますので、次に私たちはソースの観点から、なぜこれらの情况が発生したのかを分析します。
     public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    
     public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    この3つの方法は、ThreadPoolExecutor、つまりスレッド池を構成しています。
     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    そのパラメータは、以下の通りである。−corePoolSize:スレッド池に保存されているコアスレッド数は、アイドルスレッドを含む。maximumPoolSize:池で許可される最大スレッド数。keepAliveTime:スレッド池におけるアイドルスレッドの持続時間は最長である。unit:継続時間の単位。workQueue:ジョブ実行前にジョブを保存するキューは、execute方法で提出されたRunnableタスクのみを保存します。ThreadPoolExecutorソースの前のセグメントの注釈によれば、excute方式でRunnableタスクをスレッドに追加しようとした場合、以下の手順で処理します。2、スレッド池のスレッド数がcorePoolSize以上である場合、バッファキューcorePoolSizeが満杯でない場合、新たに追加されたジョブをworkQueueにおいて、FIFOの原則に従って順次実行を待つ(スレッドが空きましたら、バッファキューの中のタスクを空きスレッドに順次渡す)。3、スレッド池のスレッド数がworkQueueに等しい場合、バッファキューcorePoolSizeがいっぱいになっているが、スレッド池のスレッド数がworkQueueより小さい場合、追加されたタスクを処理するために新しいスレッドを作成する。4、スレッド池のスレッド数がmaximumPoolSizeに等しい場合、4つの処理方法があります。(この構造方法は5つのパラメータを含む構造方法を呼び出して、最後の構造方法をmaximumPoolSizeタイプとしています。処理スレッドがオーバーした時には4つの方法があります。ここでは詳しくは言わないでください。理解するためには、自分でソースを読んでください。)。
    いくつかの行列の戦略を説明します。1、直接提出します。バッファキューはRejectedExecutionHandlerを採用しており、それらを保持せずにスレッドに直接タスクを渡す。タスクをすぐに実行するスレッドが存在しないと、バッファキューにタスクを追加しようとしても失敗します。新しいスレッドを作成して、新たに追加されたタスクを処理して、スレッドプールに追加します。直接提出することは、通常、新たに提出されたタスクを拒否することを避けるために無境界SynchronousQueueが要求される。maximumPoolSizes(Integer.MAX_VALUE)が採用しているのはこのような戦略である。2、無制限行列。無境界キューを使用すると(典型的には予め定義された容量を採用したnewCachedThreadPoolであり、理論的にはこのバッファキューは無限多のタスクに対してキューに入れることができる)LinkedBlockingQueueスレッドのすべてが動作している場合に、新しいタスクをバッファキューに追加することができる。このように作成されたスレッドはcorePoolSizeを超えない。したがって、corePoolSizeの値は無効になる。各タスクが他のタスクから完全に独立している場合、すなわちタスク実行が互いに影響しない場合、非境界キューの使用に適しています。maximumPoolSizeが採用しているのはこのような戦略である。3、有界行列。制限されたnewFixedThreadPoolを使用すると、境界行列(一般的なバッファ・キューはmaximumPoolSizesを使用して、キューの最大長さを制定する)は、資源の枯渇を防止するのに役立つが、キューサイズと最大池サイズは互いに折衷する必要があり、適切なパラメータを設定する必要がある。
    参照
    http://blog.csdn.net/ns_code/articale/detail/17465497