Javaスレッドプールの紹介と基本事例


Javaスレッドプール
スレッドプールの概念:まずいくつかのスレッドを作成し、彼らの集合はスレッドプールとなり、サーバが顧客要求を受信した後、スレッドプールから空きスレッドを取り出してサービスし、サービスが終わった後、スレッドを閉じるのではなく、スレッドをスレッドプールに返します.スレッドプールの2つの主要な役割:大量のスレッドの作成によるシステムクラッシュを回避するスレッドの数を制御する(破棄スレッドの頻繁な作成を回避する)スレッドの再利用
1.Callable内のcallメソッドには戻り値Runnable runメソッドに戻り値なしExecutorServiceインタフェース継承とExecutor submitメソッド独自戻りFuture Executors操作Executorツールクラスがある
2. ThreadPool (Executors.newFixedThreadPool(5))
public class T05_ThreadPool {
  public static void main(String[] args) throws InterruptedException {
     //          
     ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i <= 5; i++){
            fixedThreadPool.execute(() -> {try {TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {e.printStackTrace();}
                System.out.println("       :"+ Thread.currentThread().getName());
            });   }
        System.out.println(fixedThreadPool);
        fixedThreadPool.shutdown();
        System.out.println(fixedThreadPool.isTerminated());
        System.out.println(fixedThreadPool.isShutdown());
        TimeUnit.SECONDS.sleep(5);
        System.out.println(fixedThreadPool.isTerminated());
        System.out.println(fixedThreadPool.isShutdown());
        System.out.println(fixedThreadPool);       }}

3. FutureTaskPool
public class T06_FutureTaskPool {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask(() -> {
            TimeUnit.MILLISECONDS.sleep(500);
            return 10086;});  //   Callable call  
        new Thread(futureTask).start(); //    
        System.out.println(futureTask.get()); //                     
        System.out.println("***************************************");
        //     
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        Future submit = fixedThreadPool.submit(() -> {
            TimeUnit.MILLISECONDS.sleep(500);
            return 10010;});
        fixedThreadPool.shutdown();
        System.out.println(submit.get());
        System.out.println(submit.isDone());
        System.out.println(submit.isCancelled());     }}

4.ParallelComputing//並行実行
public class T07_ParallelComputing {
    //  1000000     
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ArrayList integers = get(1, 1000000);
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        //  Callable    
        MyTask myTask = new MyTask(1, 20000);
        MyTask myTask1 = new MyTask(200001, 400000);
        MyTask myTask2 = new MyTask(400001, 600000);
        MyTask myTask3 = new MyTask(600001, 800000);
        MyTask myTask4 = new MyTask(800001, 1000000);

        ExecutorService pool = Executors.newFixedThreadPool(5);
        Future> submit = pool.submit(myTask);
        Future> submit1 = pool.submit(myTask1);
        Future> submit2 = pool.submit(myTask2);
        Future> submit3 = pool.submit(myTask3);
        Future> submit4 = pool.submit(myTask4);
        start = System.currentTimeMillis();
        submit.get();
        submit1.get();
        submit2.get();
        submit3.get();
        submit4.get();
        end = System.currentTimeMillis();
        System.out.println(end - start);
        pool.shutdown();   }
    static class MyTask implements Callable> {
        int start, end;
        public MyTask(int start, int end) {
            this.start = start;
            this.end = end;    }
     @Override
     public List call() throws Exception {
            return get(start, end);} }
     public static boolean isPrime(int num) {
        for (int i = 2; i <= num / 2; i++)
            if (num % 2 == 0) return false;
        return true;}
    public static ArrayList get(int start, int end) {
        ArrayList arrayList = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) arrayList.add(i);  }
        return arrayList;    }}

5.CachedPoolキャッシュスレッドプール(Executors.newCachedThreadPool())は、スレッドプールに空きスレッドがないタスクがある場合に作成されます(デフォルトは上限なし)が、実行済みスレッドが空き60 Sで自動的に閉じるように設定できます
public class T08_CachedPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        System.out.println(cachedThreadPool);
   for (int i = 0; i < 3; i++) 
   {cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName())); }
        System.out.println(cachedThreadPool);
        TimeUnit.SECONDS.sleep(70);
        System.out.println(cachedThreadPool);
        cachedThreadPool.shutdown(); }}

6.SinglePool(Executors.newSingleThreadExecutor()スレッドプール内のスレッド
7.ScheduledPool (Executors.newScheduledThreadPool(3))
public class T10_ScheduledPool {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
 //        ,     :  ,    ,    ,    
        pool.scheduleAtFixedRate(() -> {
        try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {e.printStackTrace();}
            System.out.println(Thread.currentThread().getName());
        },0,300, TimeUnit.MILLISECONDS);    }
}

8.WorkStealingPool(Executors.newWorkStealingPool()はこのスレッドプールを盗み取り、各スレッドは自分のタスクキューを維持し、自分のキューの中のスレッドを実行すると他のスレッド内のタスク実行可伝intパラメータを盗み、デフォルトではCPUコア数に基づいて起動ライン数は伝達されない.これは精霊スレッドdaemoで、内部はパッケージされたForkJoinPoolである
9.ForkJoinPool以下2つとも再帰的な考え方で分割するRecursiveAction//戻り値なしRecursiveTask//戻り値あり[例:T 12_ForkJoinPool]public class T 12_ForkJoinPool {
static int[] nums = new int[100000];
static int maxNum = 5000;
static Random random = new Random();
static {
    //     
    for (int i = 0; i < nums.length; i++) nums[i] = random.nextInt(100);
    System.out.println(Arrays.stream(nums).sum());
}
public static void main(String[] args) throws Exception {
   ForkJoinPool joinPool = new ForkJoinPool();
    /* MyTask task = new MyTask(0, nums.length);
    joinPool.execute(task);  //    
    System.out.println("*******************************");
    */
    MyTask2 task2 = new MyTask2(0, nums.length);
    ForkJoinTask joinTask = joinPool.submit(task2);
    System.out.println(joinTask.get());  //    
    joinPool.shutdown();
    //System.in.read();  //             
}

static class MyTask extends RecursiveAction {
    int start, end;
    public MyTask(int start, int end) {
        this.start = start;
        this.end = end;       }
    @Override
    protected void compute() {
        //      
        if (end - start <= 5000) {
            long sum = 0;
            for (int i = start; i < end; i++) sum += nums[i];
            System.out.println("start:" + start + " end:" + end + " sum:" + sum);
        } else {
            int middle = start + (end - start) / 2;
            MyTask task1 = new MyTask(start, middle);
            MyTask task2 = new MyTask(middle, end);
            task1.fork();
            task2.fork();     }}}
static class MyTask2 extends RecursiveTask {
    int start, end;
    public MyTask2(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        //      
        if (end - start <= 5000) {
            long sum = 0;
            for (int i = start; i < end; i++) sum += nums[i];
            return sum;  } 
        else {
            int middle = start + (end - start) / 2;
            MyTask2 task1 = new MyTask2(start, middle);
            MyTask2 task2 = new MyTask2(middle, end);
            task1.fork();
            task2.fork();
            return task1.join() + task2.join();    }}}}