ThreadPoolExecutorスレッドプールの実装

21451 ワード

ThreadPoolExecutorはAbstractExecutorServiceから継承されている.AbstractExecutorServiceは、ExecutorServiceインターフェースを実装する.
名前の通り、スレッドプールは一連のスレッドを保存する「コンテナ」です.
ThreadPoolExecutorの実装では、これらのスレッドを1つのHashSetに保存します.
     private final HashSet<Worker> workers = new HashSet<Worker>();

Workerは内部クラスで、後述します.
スレッドプールにコミットされたタスクを保存するBlockingQueueもあります
private final BlockingQueue<Runnable> workQueue;

corePoolSizeとmaximumPoolSizeは、スレッドプールのコアスレッド数と最大スレッド数を表すThreadPoolExecutorの2つのフィールドです.
スレッドプールにタスクをコミットすると、スレッドプールは次のように判断します.
  • スレッドプール内のスレッド数がcorePoolSizeより小さい場合、スレッド実行タスクを作成します.
  • それ以外の場合、タスクキューが満たされていない場合は、タスクをタスクキューに保存します.
  • それ以外の場合、スレッドプール内のスレッド数がmaximumPoolSizeより小さい場合、スレッド実行タスクを作成します.
  • それ以外の場合、ポリシーに従って実行できないタスクを実行します.



  •  
    以下に、ThreadPoolExecutorの構成方法を示します.
    public ThreadPoolExecutor(int corePoolSize,
    
                                  int maximumPoolSize,
    
                                  long keepAliveTime,
    
                                  TimeUnit unit,
    
                                  BlockingQueue<Runnable> workQueue,
    
                                  ThreadFactory threadFactory,
    
                                  RejectedExecutionHandler handler) {
    
            if (corePoolSize < 0 ||
    
                maximumPoolSize <= 0 ||
    
                maximumPoolSize < corePoolSize ||
    
                keepAliveTime < 0)
    
                throw new IllegalArgumentException();
    
            if (workQueue == null || threadFactory == null || handler == null)
    
                throw new NullPointerException();
    
            this.corePoolSize = corePoolSize;
    
            this.maximumPoolSize = maximumPoolSize;
    
            this.workQueue = workQueue;
    
            this.keepAliveTime = unit.toNanos(keepAliveTime);
    
            this.threadFactory = threadFactory;
    
            this.handler = handler;
    
        }

    Executorsに戻ってスレッドプールを構築する方法
  • 固定サイズのスレッドプールの構築
    public static ExecutorService More ...newFixedThreadPool(int nThreads) {
    
        return new ThreadPoolExecutor(nThreads, nThreads,
    
                                            0L, TimeUnit.MILLISECONDS,
    
                                            new LinkedBlockingQueue<Runnable>());
    
    } 

  • バッファプールを構築し、必要に応じてスレッドを新規作成します.新規作成されたスレッドは回収されず、後で再利用されます.
     
    
    public static ExecutorService More ...newCachedThreadPool() {
    
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    
                                           60L, TimeUnit.SECONDS,
    
                                           new SynchronousQueue<Runnable>());
    
    }


  •  
    ThreadPoolExecuterのexecute()メソッドを参照してください.このメソッドはExecuterインタフェース定義メソッドです(ExecuterServiceインタフェースはExecuterから継承されます):
        public void execute(Runnable command) {
    
            if (command == null)
    
                throw new NullPointerException();
    
            /*
    
             * Proceed in 3 steps:
    
             *
    
             * 1. If fewer than corePoolSize threads are running, try to
    
             * start a new thread with the given command as its first
    
             * task.  The call to addWorker atomically checks runState and
    
             * workerCount, and so prevents false alarms that would add
    
             * threads when it shouldn't, by returning false.
    
             *        corePoolSize     ,     
    
             *
    
             * 2. If a task can be successfully queued, then we still need
    
             * to double-check whether we should have added a thread
    
             * (because existing ones died since last checking) or that
    
             * the pool shut down since entry into this method. So we
    
             * recheck state and if necessary roll back the enqueuing if
    
             * stopped, or start a new thread if there are none.
    
             *           workQueue,          
    
             *
    
             * 3. If we cannot queue task, then we try to add a new
    
             * thread.  If it fails, we know we are shut down or saturated
    
             * and so reject the task.
    
             *           workQueue,   reject  
    
             */
    
            int c = ctl.get();
    
            if (workerCountOf(c) < corePoolSize) {
    
                if (addWorker(command, true))
    
                    return;
    
                c = ctl.get();
    
            }
    
            if (isRunning(c) && workQueue.offer(command)) {
    
                int recheck = ctl.get();
    
                if (! isRunning(recheck) && remove(command))
    
                    reject(command);
    
                else if (workerCountOf(recheck) == 0)
    
                    addWorker(null, false);
    
            }
    
            else if (!addWorker(command, false))
    
                reject(command);
    
        }    

    ここでreject(command)は,タスクが実行できず,予め定義された方法で実行できないタスクを実行する.
    addWorkerは、タスクをコミットするコアメソッドです.
        private boolean addWorker(Runnable firstTask, boolean core) {
    
            retry:
    
            for (;;) {
    
                int c = ctl.get();
    
                int rs = runStateOf(c);
    
    
    
                // Check if queue empty only if necessary.
    
                if (rs >= SHUTDOWN &&
    
                    ! (rs == SHUTDOWN &&
    
                       firstTask == null &&
    
                       ! workQueue.isEmpty()))
    
                    return false;
    
    
    
                for (;;) {
    
                    int wc = workerCountOf(c);
    
                    if (wc >= CAPACITY ||
    
                        wc >= (core ? corePoolSize : maximumPoolSize))
    
                        return false;
    
                    if (compareAndIncrementWorkerCount(c))
    
                        break retry;
    
                    c = ctl.get();  // Re-read ctl
    
                    if (runStateOf(c) != rs)
    
                        continue retry;
    
                    // else CAS failed due to workerCount change; retry inner loop
    
                }
    
            }
    
    
    
            boolean workerStarted = false;
    
            boolean workerAdded = false;
    
            Worker w = null;
    
            try {
    
                final ReentrantLock mainLock = this.mainLock;
    
                w = new Worker(firstTask);
    
                final Thread t = w.thread;
    
                if (t != null) {
    
                    mainLock.lock();
    
                    try {
    
                        // Recheck while holding lock.
    
                        // Back out on ThreadFactory failure or if
    
                        // shut down before lock acquired.
    
                        int c = ctl.get();
    
                        int rs = runStateOf(c);
    
    
    
                        if (rs < SHUTDOWN ||
    
                            (rs == SHUTDOWN && firstTask == null)) {
    
                            if (t.isAlive()) // precheck that t is startable
    
                                throw new IllegalThreadStateException();
    
                            workers.add(w);
    
                            int s = workers.size();
    
                            if (s > largestPoolSize)
    
                                largestPoolSize = s;
    
                            workerAdded = true;
    
                        }
    
                    } finally {
    
                        mainLock.unlock();
    
                    }
    
                    if (workerAdded) {
    
                        t.start();
    
                        workerStarted = true;
    
                    }
    
                }
    
            } finally {
    
                if (! workerStarted)
    
                    addWorkerFailed(w);
    
            }
    
            return workerStarted;
    
        }

     
    ラインオフプールのワークスレッドをもう一度見てください.
        private final class Worker
    
            extends AbstractQueuedSynchronizer
    
            implements Runnable
    
        {
    
            /**
    
             * This class will never be serialized, but we provide a
    
             * serialVersionUID to suppress a javac warning.
    
             */
    
            private static final long serialVersionUID = 6138294804551838833L;
    
    
    
            /** Thread this worker is running in.  Null if factory fails. */
    
            final Thread thread;
    
            /** Initial task to run.  Possibly null. */
    
            Runnable firstTask;
    
            /** Per-thread task counter */
    
            volatile long completedTasks;
    
    
    
            /**
    
             * Creates with given first task and thread from ThreadFactory.
    
             */
    
            Worker(Runnable firstTask) {
    
                setState(-1); // inhibit interrupts until runWorker
    
                this.firstTask = firstTask;
    
                this.thread = getThreadFactory().newThread(this);
    
            }
    
    
    
            /** Delegates main run loop to outer runWorker  */
    
            public void run() {
    
                runWorker(this);
    
            }
    
    
    
            // Lock methods
    
            //
    
            // The value 0 represents the unlocked state.
    
            // The value 1 represents the locked state.
    
    
    
            protected boolean isHeldExclusively() {
    
                return getState() != 0;
    
            }
    
    
    
            protected boolean tryAcquire(int unused) {
    
                if (compareAndSetState(0, 1)) {
    
                    setExclusiveOwnerThread(Thread.currentThread());
    
                    return true;
    
                }
    
                return false;
    
            }
    
    
    
            protected boolean tryRelease(int unused) {
    
                setExclusiveOwnerThread(null);
    
                setState(0);
    
                return true;
    
            }
    
    
    
            public void lock()        { acquire(1); }
    
            public boolean tryLock()  { return tryAcquire(1); }
    
            public void unlock()      { release(1); }
    
            public boolean isLocked() { return isHeldExclusively(); }
    
    
    
            void interruptIfStarted() {
    
                Thread t;
    
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    
                    try {
    
                        t.interrupt();
    
                    } catch (SecurityException ignore) {
    
                    }
    
                }
    
            }
    
        }