スレッド通信のいくつかの方式

8032 ワード

一、問題
2つのスレッドがあり、Aスレッドは1つのセットに要素「abc」文字列を順次追加し、合計10回追加し、5回目に追加した場合、BスレッドにAスレッドの通知を受け取って、Bスレッドに関連するビジネス操作を実行してほしい.スレッド間通信のモデルには,共有メモリとメッセージングの2つがあり,以下の方法はいずれも基本的な2つのモデルで実現される.
二、volatileキーワードの使用
volatileキーワードに基づいてスレッド間の相互通信を実現することは共有メモリを使用する考え方である.複数のスレッドが同時に1つの変数をリスニングし、この変数が変化すると、スレッドは対応するトラフィックを感知して実行することができるという意味です.これも最も簡単な実現方法です
public class TestSync {
    //           ,   volatile  ,          
    static volatile boolean notice = false;

    public static void main(String[] args) {
        List  list = new ArrayList<>();
        //  A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("  A    ,  list size :" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    notice = true;
            }
        });
        //  B
        Thread threadB = new Thread(() -> {
            while (true) {
                if (notice) {
                    System.out.println("  B    ,         ...");
                    break;
                }
            }
        });
        //       B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //      A
        threadA.start();
    }
}

三、Objectクラスを使用するwait()/notify()
Objectクラスはスレッド間通信の方法を提供している:wait()、notify()、notifyAll()であり、これらはマルチスレッド通信の基礎であり、このような実現方式の思想は自然にスレッド間通信である.
注意:wait/notifyはsynchronizedと組み合わせて使用する必要があります.waitメソッドはロックを解放し、notifyメソッドはロックを解放しません.waitとは、同期ロックに入ったスレッド内で、このロックを待っている他のスレッドが同期ロックを取得して実行できるように、同期ロックを一時的に譲渡することであり、notify()notify , wait() , , , を呼び出した他のスレッドのみが、wait()を呼び出した1つ以上のスレッドがwait状態を解除し、競合オブジェクトロックに再参加し、プログラムが再びロックを得ることができれば、下に進むことができます.
public class TestSync {
    public static void main(String[] args) {
        //       
        Object lock = new Object();
        List  list = new ArrayList<>();
        //   A
        Thread threadA = new Thread(() -> {
            synchronized (lock) {
                for (int i = 1; i <= 10; i++) {
                    list.add("abc");
                    System.out.println("  A    ,  list size :" + list.size());
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (list.size() == 5)
                        lock.notify();//  B  
                }
            }
        });
        //  B
        Thread threadB = new Thread(() -> {
            while (true) {
                synchronized (lock) {
                    if (list.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("  B    ,         ...");
                }
            }
        });
        //       B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //     A
        threadA.start();
    }
}

出力結果から、スレッドAがnotify()起動通知を発行した後も、自分のスレッドのトラフィックが完了した後も、スレッドBは実行を開始し、notify()がロックを解放せず、wait()がロックを解放することを示す.
四、JUCツール類CountDownLatchの使用
jdk 1.5以降java.util.concurrentパッケージの下に多くの同時プログラミング関連ツールクラスが提供され、同時プログラミングコードの書き込みが簡略化され、CountDownLatchはAQSフレームワークに基づいて、スレッド間共有変数stateを維持することに相当する.
public class TestSync {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        List  list = new ArrayList<>();
        //  A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("  A    ,  list size :" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    countDownLatch.countDown();
            }
        });
        //  B
        Thread threadB = new Thread(() -> {
            while (true) {
                if (list.size() != 5) {
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("  B    ,         ...");
                break;
            }
        });
        //       B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //     A
        threadA.start();
    }
}

五、ReentrantLock結合Conditionを使用する
public class TestSync {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        List list = new ArrayList<>();
        //  A
        Thread threadA = new Thread(() -> {
            lock.lock();
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("  A    ,  list size :" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    condition.signal();
            }
            lock.unlock();
        });
        //  B
        Thread threadB = new Thread(() -> {
            lock.lock();
            if (list.size() != 5) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("  B    ,         ...");
            lock.unlock();
        });
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadA.start();
    }
}

この方式はあまりよく用いられず,コード記述が複雑であり,スレッドBはAによって起動された後もロックが取得されていないため直ちに実行できない,すなわち,Aは起動操作後もロックを解放しない.この方法はObjectのwait()/notify()と同じである.
六、基本的なLockSupportはスレッド間のブロックと起動を実現する
LockSupportは、スレッド間のブロックと起動を非常に柔軟に実現するツールであり、スレッドを待つか起動するかに注目する必要はありませんが、スレッドの名前を知る必要があります.
public class TestSync {
    public static void main(String[] args) {
        List list = new ArrayList<>();
        //  B
        final Thread threadB = new Thread(() -> {
            if (list.size() != 5) {
                LockSupport.park();
            }
            System.out.println("  B    ,         ...");
        });
        //  A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("  A    ,  list size :" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    LockSupport.unpark(threadB);
            }
        });
        threadA.start();
        threadB.start();
    }
}