JAvaスレッドプール---独自のスレッドプールの作成


スレッドプールとは
スレッドプールは、1つまたは複数のスレッド[ループ実行]で複数のアプリケーションロジックのスレッド集合である.
一般的に、スレッド・プールには次のセクションがあります.
  • メインタスクの1つ以上のスレッドを完了します.
  • スケジューリング管理のための管理スレッド.
  • が実行を要求するタスクキュー.
  • スレッドプールの役割:
    スレッドプールの役割は、システムで実行されるスレッドの数を制限することです.システムの環境状況に応じて、スレッドの数を自動的にまたは手動で設定し、実行の最適な効果を達成することができます.システム資源の浪費が少なく、システムの混雑効率が高くない.スレッドプールでスレッド数を制御し、他のスレッドが並んで待機します.1つのタスクの実行が完了し、キューから先頭のタスクを取り出して実行を開始します.キューにプロセスが待機していない場合、スレッドプールのこのリソースは待機しています.新しいタスクが実行される必要がある場合、スレッドプールに待機している作業スレッドがあれば、実行を開始できます.そうでなければ待機キューに入ります.
    スレッドプールの実装
    スレッド・プールの理解に基づいて、独自の単純なスレッド・プールを作成します.
    簡単なスレッドプールインタフェース:
    public interface ThreadPool<Job extends Runnable>{
       //      (Job),  Job    Runnable
       void execute(Job job);
      //     
      void shutdown();
      //       ,          
      void addWorkers(int num);
      //       
      void removeWorker(int num);
      //             
      void getJobSize();
    }

    クライアントはexecute(Job)メソッドによってJobをスレッドプールにコミットして実行することができ、クライアントはJobの実行が完了するのを待つ必要はありません.スレッドプールインタフェースは、execute(Job)メソッドに加えて、作業者スレッドを増加/減少させ、スレッドプールを閉じる方法を提供します.各クライアントがコミットしたJobは、作業者スレッドの処理を待つワークキューに入ります.
    スレッドプールインタフェースのデフォルト実装
    public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{
    
        //                
        private static final int MAX_WORKER_NUMBERS = 10;
        //               
        private static final int DEFAULT_WORKER_NUMBERS = 5;
        //                
        private static final int MIN_WORKER_NUMBERS = 1;
        //         ,            
        private final LinkedList<Job> jobs = new LinkedList<Job>();
        //         
        private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
        //         
        private int workerNum;
        //            
        private AtomicLong threadNum = new AtomicLong();
    
     //     
    public DefaultThreadPool() {
            this.workerNum = DEFAULT_WORKER_NUMBERS;
            initializeWorkers(this.workerNum);
        }
    
        public DefaultThreadPool(int num) {
            if (num > MAX_WORKER_NUMBERS) {
                this.workerNum =DEFAULT_WORKER_NUMBERS;
            } else {
                this.workerNum = num;
            }
            initializeWorkers(this.workerNum);
        }
    //          
    private void initializeWorkers(int num) {
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                //           
                workers.add(worker);
                //       
                Thread thread = new Thread(worker);
                thread.start();
            }
        }
    
    public void execute(Job job) {
            if (job != null) {
            //     "  /    "     jobs  
                synchronized (jobs) {
                    jobs.addLast(job);
                    jobs.notify();
                }
            }
        }
        //               
         public void shutdown() {
            for (Worker w : workers) {
                w.shutdown();
            }
        }
          //       
            public void addWorkers(int num) {
            //  ,                                
            synchronized (jobs) {
                if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                    num = MAX_WORKER_NUMBERS - this.workerNum;
                }
                initializeWorkers(num);
                this.workerNum += num;
            }
        }
        //       
    public void removeWorker(int num) {
            synchronized (jobs) {
            if(num>=this.workerNum){
                      throw new IllegalArgumentException("          ");
                      }
                for (int i = 0; i < num; i++) {
                    Worker worker = workers.get(i);
                    if (worker != null) {
                    //            
                        worker.shutdown();
                        workers.remove(i);
                    }
                }
                this.workerNum -= num;
            }
        }
    
    public int getJobSize() {
            // TODO Auto-generated method stub
            return workers.size();
        }
    //       
    class Worker implements Runnable {
            //        worker
            private volatile boolean running = true;
    
            public void run() {
                while (running) {
                    Job job = null;
                    //     /    
                    synchronized (jobs) {
                        if (jobs.isEmpty()) {
                            try {
                                jobs.wait();//      
                            } catch (InterruptedException e) {
                                //              ,  
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                        //     job
                        job = jobs.removeFirst();
                    }
                    //  job
                    if (job != null) {
                        job.run();
                    }
                }
            }
    
            //      
            public void shutdown() {
                running = false;
            }
        }
    }

    スレッドプールの実装から分かるように、クライアントがexecute(Job)メソッドを呼び出すと、ジョブリストjobsにJobが絶えず追加され、各ワーカースレッドは読まずにjobsからJobを取得して実行され、jobsが空の場合、ワーカースレッドはWAITING状態に入る.
    ジョブを追加すると、作業キューjobsに対してnotify()メソッドを呼び出して作業者スレッドを起動します.ここではnotifyAll()を呼び出さず、待機キュー内のスレッドをすべてブロックキューに移動してリソースの浪費を回避します.
    スレッドプールの本質は、スレッドが安全なワークキューを使用して作業者スレッドとクライアントスレッドを接続することです.クライアントスレッドはタスクをワークキューに入れると戻り、作業者スレッドはワークキューからワークを取り出して実行します.ワークキューが空の場合、ワーカースレッドはWAITING状態に入り、クライアントがタスクを送信すると、任意のワーカースレッドを通過し、大量のタスクの送信に伴い、より多くのワーカースレッドが起動します.
    参考:『java同時プログラミングの芸術』方騰飛