Java同時プログラミングの基本知識

34617 ワード

コンカレントベース
同時プログラミングの原則
  • 原子性
  • 原子性とは、1つの操作でcpuが途中で一時停止してからスケジューリングすることができず、1つの操作または複数の操作がすべて実行され、実行されたプロセスがいかなる要因によっても中断されないか、または実行されないことを意味する.
  • 可視性
  • 可視性の場合、Javaはvolatileキーワードを提供して可視性を保証します.共有変数がvolatileによって修飾されると、変更された値がすぐにプライマリ・メモリに更新され、他のスレッドが読み取りを必要とすると、メモリから新しい値が読み出されます.一方、通常の共有変数は可視性を保証できません.通常の共有変数が変更された後、いつホストメモリに書き込まれるかは不確定であり、他のスレッドが読み取られると、メモリに元の古い値が残る可能性があるため、可視性を保証できません.また、synchronizedとLockによっても可視性が保証され、synchronizedとLockは、同じ時点で1つのスレッドだけがロックを取得して同期コードを実行することを保証し、ロックを解除する前に変数の変更をプライマリ・メモリにリフレッシュすることができる.
  • 秩序性
  • Javaメモリモデルでは、コンパイラとプロセッサが命令を再ソートできますが、再ソートプロセスは単一スレッドプログラムの実行には影響しませんが、マルチスレッド同時実行の正確性に影響します.
    RunnableとThreadここではRunnableインタフェースを実現することとThreadクラスを継承することの違いを説明します.10枚の切符を売るタスクを例にとると、Threadクラスを継承すると、3つのスレッドを起動することは3つのウィンドウを開くことに相当し、各ウィンドウに10枚の切符を売るタスクがあり、それぞれ販売されています.Runnableインタフェースを実現すれば、3つのスレッドを起動してかなり3つのウィンドウを開いて切符を売って、この3つのウィンドウは全部で10枚の切符を売っています.
    synchronizedキーワード
    1.synchronizedオブジェクトロック
    synchronized(this)メソッドとsynchronizedメソッドは、現在のオブジェクトをロックし、synchronized(obj)は臨界オブジェクトをロックします.synchronizedを使用する場合は、臨界オブジェクトをロックすることが望ましい.任意の複数のスレッドの任意の複数のユーザがアクセスする際に問題がないようにするには、現在のオブジェクトをロックする方法を考えてみましょう.現在のオブジェクトのレベルが重いので、一般的には使用しません.
    次のSyncクラスの2つのメソッドtest_01とtest_02()ロックされているのはプログラムによって作成されたSyncオブジェクトで、細粒度制御推奨用test_02().
    public synchronized void test_01() {
        System.out.println("     ");
    }
    public void test_02() {
        synchronized (this) {
            System.out.println("     ");
        }
    }

    次の方法はSyncオブジェクトのobjectオブジェクト(つまり臨界オブジェクト)をロックします.
    public void test_03() {
        synchronized (object) {
            System.out.println("     ");
        }
    }

    2.synchronized静的メソッドで現在のクラスをロックするには
    静的同期メソッドは、Syncクラスのstatic test_のような現在のタイプのクラスオブジェクトをロックします.04()メソッドに同期ロックsynchronizedが付加すると、synchronizedはsyncである.class.
    //次の2つの方法は静的同期方法です
    public static synchronized void test_04() {
        System.out.println(" Sync.class");
    }
    public static void test_05() {
        synchronized (Sync.class)     {
            System.out.println(" Sync.class ");
        }
    }

    3.synchronizedが静的および非静的に作用する方法の違い
    synchronizedの役割は非静的方法であり、単一のオブジェクトをロックすることに相当し、異なるオブジェクト間に競争関係はない.静的メソッドとして作用する場合、ロックロードクラス、すなわちclassをロックします.この場合、すべてのオブジェクトが同じロックを競合することに相当します.
  • 同期コードブロックに異常が投げ出され、ロックが解放する
  • .
    次の例では、スレッド1はi=5のときに異常を放出し、このときスレッド1ロックが解放され、スレッド2はメソッドの呼び出しを開始する.
    public class Test {
        static class Test02 implements Runnable {
            private int i = 0;
            @Override
            public synchronized void run() {
                while (true) {
                    System.out.println(Thread.currentThread().getName() + "_" + i++);
                    if (i == 5) { //  i==5     ,    
                        i = 1 / 0;
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    }catch (InterruptedException ignored) { }
                }
            }
        }
    public static void main(String[] args) {
        Test02 test02 = new Test02();
        new Thread(test02, "LQ").start();
        new Thread(test02, "WH").start();
    }

    }
  • インスタンス分析
  • 次のコードでは,objectはLQによりロックされ,WHはブロックされる.
    public class Test {static Object object = new Object();void m() {System.out.println(Thread.currentThread().getName() + "start...");synchronized (object){while (true) {try {TimeUnit.SECONDS.sleep(1);} catch (Exception ignored) {}System.out.println(Thread.currentThread().getName() + "-"+ object.hashCode());}}}
    static class Test01 implements Runnable {
        @Override
        public void run() {
            new Test().m();
        }
    }
    
    static class Test02 implements Runnable {
        @Override
        public void run() {
            new Test().m();
        }
    }
    
    public static void main(String[] args) {
        Test01 test01 = new Test01();
        Thread thread = new Thread(test01, "LQ");
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception ignored) {}
        Test02 test02 = new Test02();
        thread = new Thread(test02, "WH");
        thread.start();
    }

    }WHスレッドに新しいObjectが作成され、WHは正常に動作します.
    public class Test {
        static Object object = new Object();
        void m() {
            System.out.println(Thread.currentThread().getName() + " start...");
            synchronized (object) {
                while (true) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (Exception ignored){}
                    System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
                }
            }
        }
        static class Test01 implements Runnable {
            @Override
            public void run() {
                new Test().m();
            }
        }
        static class Test02 implements Runnable {
            @Override
            public void run() {
                object = new Object();
                new Test().m();
            }
        }
    
        public static void main(String[] args) {
            Test01 test01 = new Test01();
            Thread thread = new Thread(test01, "LQ");
            thread.start();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception ignored) {}
            Test02 test02 = new Test02();
            thread = new Thread(test02, "WH");
            thread.start();
        }
    }

    上のコードでは、WHスレッドが起動すると1つだけ待機状態になります.objectはLQスレッドにロックされているためですが、WHスレッドで新たにnew Object()をnew Object()として付与すると、WHスレッドは正常に動作します.なぜなら、同期ロックはメモリ内のオブジェクトに対してロックされているので、LQロックは1回目のnewのオブジェクトで、WHロックは2回目のnewのオブジェクトで、下図の通りです.
     
    定数:String a="aaa"とString b="aaa"は同じオブジェクトであるため、Aメソッドがaをロックし、Bメソッドがbをロックし、LQスレッド呼び出しAメソッドを起動し、WHスレッド呼び出しBメソッドを起動すると、LQスレッドが終了するまでWHスレッドが実行される.したがって、同期コードブロックを定義するときは、ロックのターゲットオブジェクトとして定数を使用しないでください.
    volatileキーワードコンピュータにはCPU、メモリ、キャッシュがあり、CPUが実行されるとデフォルトでキャッシュ中のデータが見つかります.CPUが中断した場合、オペレーティングシステムによるCPUの管理特性により、キャッシュをクリアし、メモリのデータをキャッシュに読み直したり、キャッシュをクリアしない場合があり、キャッシュのデータを使用して後続の計算を行うことができます.CPUが中断しない場合、デフォルトのCPUはキャッシュデータのみを探します.volatileというキーワードはキャッシュデータの特性を変えるものではなく、メモリのデータ特性を直接変えるものであり、1つのオブジェクトにvolatileキーワード修飾を加えた場合、最下位OSオペレーティングシステムに通知し、計算を行うたびにメモリデータが変更されたかどうかをCPUに伝えることに相当する.これがメモリの可視性である.volatileキーワードは、メモリの可視性を保証するためです.
    次のコードでデッドロックが発生します.
    public class Volatile01 {
        private static boolean b = true;
        private void m() {
            System.out.println("start...");
            while (b) {}
            System.out.println("end...");
        }
        static class Volatile_01 implements Runnable {
            @Override
            public void run() {
                new Volatile01().m();
            }
        }
        public static void main(String[] args) {
            Volatile_01 = new Volatile_01();
            new Thread(volatile_01).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            }catch (InterruptedException ignored) {}
            b = false;
        }
    }

    上記コードブロック中の共有変数bをvolatileで修飾すると(可視性が保証される)、ループから飛び出すことができる.
    public class Volatile01 {
        private static volatile boolean b = true;
        private void m() {
            System.out.println("start...");
            while (b){}
            System.out.println("end...");
        }
        static class Volatile_01 implements Runnable {
            @Override
            public void run(){
                new Volatile01().m();
            }
        }
        public static void main(String[] args) {
            Volatile_01 = new Volatile_01();
            new Thread(volatile_01).start();
            try{
                TimeUnit.SECONDS.sleep(1);
            }catch (InterruptedException ignored){}
            b = false;
        }
    }

    join()メソッドは、joinを呼び出すスレッドの実行が完了するまで、複数のスレッドを結合し、スレッドをブロックします.
    次のプログラムで印刷した結果は100000で、join()を使わないと印刷した結果は100000よりはるかに小さくなります.join()を使用すると、データの正確さを保証するために、スレッドのセットが実行されるのを待ってから後続の論理処理を行うことができます.
    public class Test {
        private static volatile int count = 0;
    
        private void m() {
            for (int i = 0; i < 10000; i++) {
                count++;
            }
        }
    
        static class Test02 implements Runnable {
            @Override
            public synchronized void run() {
                new Test().m();
            }
        }
    
        public static void main(String[] args) {
            Test02 test02 = new Test02();
            List threads = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                threads.add(new Thread(test02));
            }
            for (Thread thread : threads) {
                thread.start();
            }
            for (Thread thread : threads) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(count);
        }
    }

    上記のコードにはsynchronizedキーワードを用いて原子性を実現しているが、synchronizedを用いずにAtomicIntegerオブジェクトを用いてもよい.AtomicIntegerは原子的な操作オブジェクトであるため、コードは以下の通りである.
    public class Test{
        private static AtomicInteger count = new AtomicInteger();
        private void m(){
            for (int i = 0; i < 10000; i++){
                count.incrementAndGet();
            }
        }
        static class Test02 implements Runnable{
            @Override
            public void run(){
                new Test().m();
            }
        }
        public static void main(String[] args){
            Test02 test02 = new Test02();
            List threads = new ArrayList<>();
            for (int i = 0; i < 10; i++){
                threads.add(new Thread(test02));
            }
            for (Thread thread : threads){
                thread.start();
                try{
                    thread.join();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            System.out.println(count);
        }
    }

    CountDownLatchオブジェクトCountDownLatchは1つのラッチに相当し、ラッチオブジェクトを作成するときにロックの個数を指定することができ、あるメソッドがラッチのawait()メソッドを呼び出すと、そのメソッドがawait()に実行されると、ラッチが解放されるのを待つのをブロックされ、ラッチにロックがなく、つまりラッチが開放されているときに実行を継続します.ラッチの施錠方法を減らすときcountDown()です.
    以下の例では、m 1においてawait()が呼び出され、m 2においてcountDown()が呼び出されるので、m 2の論理によれば、m 2の実行が完了するとラッチ上のロック数は0となり、m 1メソッドは実行を継続することができる.
    public class Test {
        private CountDownLatch countDownLatch = new CountDownLatch(5);
    
        private void m1() {
            try {
                countDownLatch.await(); //       
            } catch (Exception ignored) {
            }
            System.out.println("method m1.");
        }
    
        private void m2() {
            while (countDownLatch.getCount() != 0) {
                countDownLatch.countDown(); //       
                System.out.println("method m2");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        }
    public static void main(String[] args) {
        Test count01 = new Test();
        new Thread(count01::m2).start();
        new Thread(count01::m1).start();
    }

    }

    , , , , 。

    wait()、notify() notifyAll()
    wait(): wait(), , notify() notifyAll() 。

    notify(): 。

    notifyAll(): notifyAll() , waiting 。

    ( ) , 10, , 。

    public class DeviceSingleton {
        private DeviceSingleton() {
        }
    
        private final int max = 10;
        private int count = 0;
        private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();
    
        public static DeviceSingleton getInstance() {
            return DEVICE_SINGLETON;
        }
    
        private final List devices = new ArrayList<>();
    
        /**
         *   
         */
        public synchronized void add(E data) {
            //              
            while (devices.size() == max) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("add: " + data);
            ThreadUtils.sleep(1000);
            devices.add(data);
            count++;
            this.notify();
        }
    
        /**
         *   
         */
        public synchronized E get() {
            E data = null;
            while (devices.size() == 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            ThreadUtils.sleep(1000);
            data = devices.remove(0);
            count--;
            this.notifyAll();
            return data;
        }
    
        /**
         *     
         */
        public synchronized int size() {
            return count;
        }
    
        @Data
        static class Device {
            private int id;
            private String name;
    
            public Device(int id, String name) {
                this.id = id;
                this.name = name;
            }
        }
    
        static class ThreadUtils {
            public static void sleep(int millis) {
                try {
                    Thread.sleep(millis);
                } catch (Exception ignore) {}
            }
        }
    
    }
    public class Test {
        public static void main(String[] args) throws InterruptedException {
            DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
            for (int i = 0; i < 10; i++) {
                new Thread(() ->
                {
                    for (int j = 0; j < 5; j++) {
                        System.out.println(deviceSingleton.get());
                    }
                }, "consumer-" + i).start();
            }
            Thread.sleep(2000);
    
            for (int i = 0; i < 2; i++) {
                new Thread(() ->
                {
                    for (int j = 0; j < 25; j++) {
                        deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
                    }
                }, "producer").start();
            }
        }
    
    }

    ReentrantLock

    synchronized , , synchronized 。 , (lock.unlock())。 :

    public class ReentrantLockTest {
        private final Lock lock = new ReentrantLock();
    
        private void m1() {
            lock.lock(); //   
            for (int i = 0; i < 10; i++) {
                System.out.println("method m1() " + i);
                ThreadUtils.sleep(1000);
            }
            lock.unlock(); //   
        }
    
        private void m2() {
            lock.lock(); //   
            System.out.println("method m2()");
            lock.unlock(); //   
        }
    
        public static void main(String[] args) {
            ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
            new Thread(reentrantLockTest::m1).start();
            new Thread(reentrantLockTest::m2).start();
        }
    
    }
    1. lock.tryLock()

    false, , , true, 。 :

    public class ReentrantLockTest {
        private Lock lock = new ReentrantLock();
    
        private void m1() {
            lock.lock(); //   
            for (int i = 0; i < 10; i++) {
                ThreadUtils.sleep(1000);
                System.out.println("method m1() " + i);
            }
            lock.unlock(); //   
        }
    private void m2() {
            boolean isLocked = false;
            try {
                    /*
                       ,    ,        ,  false,    true
                              ,            ,     
                             ,           
                                 ,                 
                           ,                   ,    ,   ,    。 isLocked = lock.tryLock(5, TimeUnit.SECONDS);  5        (5                   ),         。
                     */
                    isLocked = lock.tryLock();
                    System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
            } catch (Exception e) {
                    e.printStackTrace();
            } finally {
                    //                          
                    if (isLocked) {
                            lock.unlock();
                    }
            }
    }
    public static void main(String[] args) {
            ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
            new Thread(reentrantLockTest::m1).start();
            new Thread(reentrantLockTest::m2).start();
    }
    
    }
    1. lock.lockInterruptibly()

    interrupt , interrupted , , InterruptedException , 。 :

    public class ReentrantLockTest {
        private Lock lock = new ReentrantLock();
    
        private void m1() {
            lock.lock(); //   
            for (int i = 0; i < 5; i++) {
                ThreadUtils.sleep(1000);
                System.out.println("method m1() " + i);
            }
            lock.unlock(); //   
        }
    
        private void m2() {
            try {
                /*
                    ,     ,              
                 */
                lock.lockInterruptibly(); //      
                System.out.println("method m2()");
            } catch (InterruptedException e) {
                System.out.println("    ");
            } finally {
                try {
                    lock.unlock();
                } catch (Exception ignored) {
                }
            }
        }
    public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        Thread thread1 = new Thread(reentrantLockTest::m1);
        thread1.start();
        ThreadUtils.sleep(1000);
        Thread thread2 = new Thread(reentrantLockTest::m2);
        thread2.start();
        ThreadUtils.sleep(1000);
        thread2.interrupt(); //       
    }

    }
    : ReentrantLock , , ,notifyAll 。( thread.interruped(); object.notifyAll())。

    Windows , , ( )。

    。 , , , <=10, , , 。

    public class ReentrantLockTest {
        static class TestReentrantLock extends Thread {
            //    ReentrantLock        true        
            private ReentrantLock lock = new ReentrantLock(true);
    
            public void run() {
                for (int i = 0; i < 5; i++) {
                    lock.lock();
                    try {
                        System.out.println(Thread.currentThread().getName() + " get lock.");
                        ThreadUtils.sleep(1000);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    
    public static void main(String[] args) {
        TestReentrantLock lock = new TestReentrantLock();
        lock.start();
        new Thread(lock).start();
        new Thread(lock).start();
    }

    }

    1. Condition

    Lock , , 、 。 Condition 。

    public class DeviceContainer {
    private DeviceContainer() {
    }

    private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();
    
    public static DeviceContainer getInstance() {
        return DEVICE_CONTAINER;
    }
    
    private final List list = new LinkedList<>();
    
    private final int max = 10;
    private int count = 0;
    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();
    
    public void add(T t) {
        lock.lock();
        try {
            while (this.size() == max) {
                System.out.println(Thread.currentThread().getName() + "   ");
                //       max   ,         ,     
                //            
                producer.await();
            }
            System.out.println(Thread.currentThread().getName() + "   ");
            list.add(t);
            count++;
            //             
            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public T get() {
        T t = null;
        lock.lock();
        try {
            while (this.size() == 0) {
                System.out.println(Thread.currentThread().getName() + "   ");
                //               
                consumer.await();
            }
            System.out.println(Thread.currentThread().getName() + "   ");
            t = list.remove(0);
            count--;
            //            
            producer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }
    
    private int size() {
        return count;
    }

    }

    public class Test {
    public static void main(String[] args) throws InterruptedException {
    DeviceContainer deviceSingleton = DeviceContainer.getInstance();
    for (int i = 0; i < 10; i++) {
    new Thread(() ->
    {
    for (int j = 0; j < 5; j++) {
    System.out.println(deviceSingleton.get());
    }
    }, "consumer-" + i).start();
    }
    ThreadUtils.sleep(1000);
    for (int i = 0; i < 2; i++) {
    new Thread(() ->
    {
    for (int j = 0; j < 25; j++) {
    deviceSingleton.add(new Device(j, "device " + j));
    }
    }, "producer-" + i).start();
    }

    }

    }

    Java

    1. Map/Set

    ConcurrentHashMap/ConcurrentHashSet: Map/Set, , , synchronized 。key value null( HashMap HashSet)

    ConcurrentSkipListMap/ConcurrentSkipListSet: Map/Set, , , ConcurrentHashMap/ConcurrentHashSet 。

    CopyOnWriteArraySet: , , , 。

    1. List

    CopyOnWriteArrayList: , , , 。

    1. Queue

    ConcurrentLinkedQueue/ ConcurrentLinkedDeue: , ,ConcurrentLinkedQueue ,ConcurrentLinkedDeue , ***。

    ArrayBlockingQueue/LinkedBlockingQueue: , , 0 。ArrayBlockingQueue , ;LinkedBlockingQueue , ***。ArrayBlockingQueue API , 。 。add ;put ;offer , false, true; offer , , true, , false。LinkedBlockingQueue add ;offer false, true; offer , , true, , false。

    PriorityQueue: , ,***。

    PriorityBlockingQueue: , ,***。

    LinkedTransferQueue: , transfer 。 add , 。transfer TransferQueue , (take() )。 , transfer 。 。

    SynchronousQueue: , 。 0 , TransferQuque。 。add , , 。put , , put 。

    DelayQueue: ,***。 , 。 : , 。

     


    , JVM , , JVM , 。 shutdown , 。

    Executor
    。Executor execute, 。 Runnable , Runnable。

    public class Executor01 {
    public static void main(String[] args) {
    new Executor_01().execute(() ->
    System.out.println(Thread.currentThread().getName() + " test executor.")
    );
    }
    static class Executor_01 implements Executor {br/>@Override
    public void execute(@NotNull Runnable command) {
    new Thread(command).start();
    }
    }
    }
    ExecutorService
    Executor , Executor , Future submit。

    Executors
    Executor , , , , 。 :void execute(),Future submit(Callable),Future submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。

    public class Test {
    public static void main(String[] args) throws InterruptedException {
    // 5
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 6; i++) {
    executorService.execute(() -> {
    System.out.println(Thread.currentThread().getName() + " executor.");
    ThreadUtils.sleep(1000);
    });
    }
    System.out.println(executorService);

        //     
        executorService.shutdown();
        //       ,            ,      ,      ,   false
        System.out.println(executorService.isTerminated());
        //       ,      shutdown  
        System.out.println(executorService.isShutdown());
        System.out.println(executorService);
    
        ThreadUtils.sleep(1000);
    
        //       5 ,         ,       ,   true
        System.out.println(executorService.isTerminated());
        System.out.println(executorService.isShutdown());
        System.out.println(executorService);
    }

    }
    Future
    , 。 get 。

    :get()、get(long, TimeUnit) isDown()。

    get(): ;

    get(long, TimeUnit): , , , 。

    isDown(): call , , isDown ExecutorService isShutdown ,isShutdown 。

    public class ExecutorServiceTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    testExecutorService();
    }

    private static void testExecutorService() throws ExecutionException, InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(1);
        Future future = service.submit(() -> {
            ThreadUtils.sleep(1000);
            return Thread.currentThread().getName() + " submit.";
        });
    
        //                 call        ,
        //      ,        , ExecutorService  isShutDowm  , isShutdowm         , shutdown      
        System.out.println(future.isDone());
        //   call      
        System.out.println(future.get()); // false
    
        System.out.println(future.isDone());
        System.out.println(future.get()); // true
    
        //      
        service.shutdown();
    }

    }
    Callable
    。 Runnable , 。

    :call(), Runnable run , call 。

    Callable Runnable : , Callable, 。

    ThreadPoolExecutor
    new ThreadPoolExecutor , ThreadPoolExecutor :

    corePoolSize 

    maximumPoolSize 

    keepAliveTime 

    unitTimeUnit 

    workQueueBlockingQueue 

    threadFactoryThreadFactory 

    handlerRejectedExecutionHandler 
     

    corePoolSize , corePoolSize , ; , workQueueBlockingQueue , , ; , , maximumPoolSize , , , 。 corePoolSize , keepAliveTime, , corePoolSize。

    public class ExecutorThreadPoolTest {
    public static void main(String[] args) {
    testExecutorThreadPool();
    }

    private static void testExecutorThreadPool() {
        //      ,      2,      4,       10
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                new MyTreadFactory(),
                new MyIgnorePolicy());
        //         ,        
        executor.prestartAllCoreThreads();
    
        //        
        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(String.valueOf(i));
            executor.execute(task);
        }
    }
    
    static class MyTreadFactory implements ThreadFactory {
    
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
    
        @Override
        public Thread newThread(Runnable runnable) {
            Thread t = new Thread(runnable, "  【" + mThreadNum.getAndIncrement() + "】");
            System.out.println(t.getName() + "    ");
            return t;
        }
    }
    
    public static class MyIgnorePolicy implements RejectedExecutionHandler {
    
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            doLog(runnable, executor);
        }
    
        private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
            System.err.println(runnable.toString() + "    ");
        }
    }
    
    @Data
    static class MyTask implements Runnable {
        private String name;
    
        public MyTask(String name) {
            this.name = name;
        }
    
        @Override
        public void run() {
            System.out.println(this.toString() + "     ");
            ThreadUtils.sleep(1000);
        }
    
        @Override
        public String toString() {
            return "  【" + name + "】";
        }
    }

    }
    FixedThreadPool
    , Executors , , 。 :

    , nThreads, 0, LinkedBlockingQueue, Integer.MAX_VALUE。

    public class Test {
    public static void main(String[] args) {
    new Test().test();
    }

    public void test() {
        //      10 FixedThreadPool   
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            service.execute(()-> System.out.println(Thread.currentThread().getName()));
        }
        //      
        service.shutdown();
    }

    }
    CachedThreadPool
    , Executors , Integer.MAX_VALUE, , ( FixedThreadPool ,FixedThreadPool shutdown )。 :

    , 0, Integer.MAX_VALUE, 60 , SynchronousQueue。

    public class Test {
    public static void main(String[] args) {
    new Test().test();
    }

    public void test() {
        //        
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        for (int i = 0; i < 5; i++) {
            service.execute(() -> {
                ThreadUtils.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " executor.");
            });
        }
        System.out.println(service);
        ThreadUtils.sleep(65);
        System.out.println(service);
    }

    }
    ScheduledThreadPool
    , , Executors , 。 , 、 。 :

    , Integer.MAX_VALUE, 0, DelayedWorkQuquq。

    :scheduledAtFixedRate、schedule、execute 。

    public class Test {
    public static void main(String[] args) {
    new Test().test();
    }

    public void test() {
        //          
        ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
        System.out.println(service);
        //     ,     500          ,   300      
        service.scheduleAtFixedRate(() -> {
            ThreadUtils.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " executor.");
        }, 500, 300, TimeUnit.MILLISECONDS);
        System.out.println(service);
        service.shutdown();
    }

    }
    SingleThreadExecutor
    。 。 。 , 。 :

    1, LinkedBlockingQueue。

    public class Test {
    public static void main(String[] args) {
    new Test().test();
    }

    public void test() {
        //          
        ExecutorService service = Executors.newSingleThreadExecutor();
        System.out.println(service);
        for (int i = 0; i < 5; i++) {
            service.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " executor.");
                ThreadUtils.sleep(1000);
            });
        }
        service.shutdown();
    }

    }
    ForkJoinPool
    , 。 CPU 。

    ForkJoinPool , , ,, 。 , fork join ( )。

    ForkJoinTask (RecursiveTask RecursiveAction, , ), 。

    ForkJoinTask RecursiveTask RecursiveAction,RecursiveTask ,RecursiveAction ( Callable Runnable )。

    ForkJoinTask compute , 。

    、 。

    public class Test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long result = 0L;
        for (int NUMBER : NUMBERS) {
            result += NUMBER;
        }
        System.out.println(result);
    
        ForkJoinPool pool = new ForkJoinPool();
        //       
        AddTask task = new AddTask(0, NUMBERS.length);
        //     
        Future future = pool.submit(task);
        System.out.println(future.get());
    }
    
    private static final int[] NUMBERS = new int[1000000];
    private static final int MAX_SIZE = 50000;
    private static final Random RANDOM = new Random();
    
    static {
        for (int i = 0; i < NUMBERS.length; i++) {
            NUMBERS[i] = RANDOM.nextInt(1000);
        }
    }
    
    static class AddTask extends RecursiveTask {
        int begin, end;
    
        AddTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            if ((end - begin) < MAX_SIZE) {
                long sum = 0L;
                for (int i = begin; i < end; i++) {
                    sum += NUMBERS[i];
                }
                return sum;
            } else {
                //                      
                int middle = begin + (end - begin) / 2;
                AddTask task1 = new AddTask(begin, middle);
                AddTask task2 = new AddTask(middle, end);
                //      ,            
                task1.fork();
                task2.fork();
                // join    ,        ,       ,         
                return task1.join() + task2.join();
            }
        }
    }

    }

    , , , 。 , , , 。

    public class Test {

    public static void main(String[] args) {
        new Test().test();
    }
    
    public void test() {
        ThreadGroup group = new ThreadGroup("LQ");
        Thread thread = new Thread(group, () ->
                System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
        );
        thread.start();
    }

    }
    、 , , , 。