Javaにおけるキューとスレッドの同期探究

9986 ワード

1つのAndroidアプリプロジェクトでは、連続2つのデータの送信間隔が300 msを下回ってはならないことをデバイスに送信する必要があります.最初の方法は、連続的にデータを送信する可能性のあるすべての場所で手動でThread.sleep(300);を追加してスレッドをブロックすることであり、このような問題は明らかです.1つは連続的に送信される場所を漏らす可能性があります.二つ目はメンテナンスが悪いことです.3つ目は、UIスレッドで呼び出されると、UIが詰まってしまうことです.これらの問題を解決するために、送信する必要があるデータをキューで保存し、専用のスレッドでキューからデータを取り出し、送信、遅延する計画です.スレッドとキューの具体的な使い方を探るために、テストエンジニアリングを書きました.基本的なフレームワークは次のとおりです.
public class Main {

    public static abstract class Sender extends Thread {
        abstract void append(int data);
    }

    static Date lastTime = new Date();

    static void send(int value) {
        Date now = new Date();
        long timeSpan = now.getTime() - lastTime.getTime();
        System.out.println("" + value + ": " + timeSpan);
        lastTime = now;
    }

    public static void main(String[] args) {
        Sender sender = new Sender();//   Sender,      Sender   
        sender.start();
        sender.append(0);
        try {
            Thread.sleep(800);
        }
        catch(InterruptedException e) {
            e.printStackTrace();
        }
        sender.append(1);
        sender.append(2);
    }
}

まずSenderクラス継承Threadを定義し、キューにデータを追加するappendメソッドを追加します.Senderは、appendメソッドを実装し、実行時にキューからデータをループして取得し、class Mainのsendメソッドを呼び出して送信データをシミュレートする抽象クラスです.sendメソッドは,今回呼び出した時間と前回呼び出した時間を減算して印刷し,間隔が300 msより大きいか否かを判断する.main()関数でSender(ここでは直接new Sender()を使用することはできませんが、Senderのサブクラスを使用する必要があります)をインスタンス化し、append関数を呼び出して送信するデータを追加します.1回目と2回目の追加の間隔は800 msです.実際の実行では、2回目の送信データは1回目より800 ms、3回目は2回目より300 ms遅れなければなりません.まず、Senderの最初の実装を見てみましょう.
    public static class SimpleSender extends Sender {

        Queue queue = new LinkedList();

        @Override
        public void append(int data) {
            queue.add(data);
        }

        @Override
        public void run() {
            super.run();
            while(true) {
                Integer value = queue.poll();
                if(value != null) {
                    send(value);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

キュー(Queue)は先進先出で、ここではチェーンテーブルを使います(LinkedList)実装.append関数でキューを呼び出すaddメソッドは、その末尾にデータを追加し、ループ中にpollを使用してキューヘッダのデータを取得すると同時にキューから移動し、現在のキューが空であればnullに戻る.main関数ではSimpleSenderでSender:Sender sender = new SimpleSender();をインスタンス化する.実際に実行すると、1つのデータしか出力されていないことに気づいて引っかかる(異なる環境で実行すると異なる場合があります):
0: 1

理由は明らかです.キューのaddメソッドとpollメソッドが異なるスレッドから呼び出され、両方のメソッドがキューを変更し、競合を引き起こす可能性があります.そのため、キュー操作にロックをかける必要があります.改善されたSenderスレッドの実装を見てみましょう.
    public static class SynchronizedSender extends Sender {

        Queue queue = new LinkedList();

        @Override
        public void append(int data) {
            synchronized (queue) {//     
                queue.add(data);
            }
        }

        @Override
        public void run() {
            super.run();
            while(true) {
                Integer value = null;
                synchronized (queue) {//     
                    value = queue.poll();
                }
                if(value != null) {
                    send(value);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

これにより、データの追加と取得は相互に影響しません.実行効果は予想通りです.
0: 1
1: 802
2: 301

しかし、キューが空の場合、スレッドのwhileサイクルが停止せずに実行され、リソースが消費され、全体のパフォーマンスに影響を与えるという問題があります.次に、スレッドのsuspendメソッドを使用して、キューが空の場合にスレッドをブロックし、データがある場合はresumeメソッドで実行を再開します.
    public static class SynchronizedSuspendSender extends Sender {

        Queue queue = new LinkedList();

        @Override
        public void append(int data) {
            synchronized (queue) {
                queue.add(data);
            }
            resume();//              
        }

        @Override
        public void run() {
            super.run();
            while(true) {
                System.out.println("loop running");
                Integer value = null;
                synchronized (queue) {
                    value = queue.poll();
                }
                if(value != null) {
                    send(value);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                else
                    suspend();//         
            }
        }
    }

実行時に印刷される内容は以下のとおりです.サイクルは5回しか実行されていません.そのうち2回はデータ送信がなく、スレッドがブロックされています.
loop running
0: 1
loop running
loop running
1: 802
loop running
2: 304
loop running

目的は達成されたものの、suspendメソッドとresumeメソッドはデッドロックを引き起こす可能性があるため、すでに廃棄とマークされている(参考:Java Thread Primitive Depresation).オブジェクトのwait/notifyメソッドで同様の効果を達成することができる:
    public static class SynchronizedWaitSender extends Sender {

        Queue queue = new LinkedList();

        @Override
        public void append(int data) {
            synchronized (queue) {
                queue.add(data);
                queue.notify();//   queue          
            }
        }

        @Override
        public void run() {
            super.run();
            while(true) {
                Integer value = null;
                synchronized (queue) {
                    value = queue.poll();
                    if(value == null)
                        try {
                            queue.wait();//         queue  
                        }
                        catch(InterruptedException e) {
                            e.printStackTrace();
                        }
                }
                if(value != null) {
                    send(value);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

wait/nofityメソッドはOjbectクラスに属し、任意のオブジェクトに適用され、synchronizedコードブロックで使用する必要があります.waitメソッドはスレッドをブロックすると同時にオブジェクトのロックを解放しますが、オブジェクトはsynchronizedコードブロックでのみロックされます.参照:javaでのスレッドのブロック、一時停止、有効化.実行効果はsuspend/resumeの使用と一致します.
loop running
0: 1
loop running
loop running
1: 804
loop running
2: 301
loop running

実は以上の同期とブロックは自分で書く必要はなく、Javaに内蔵されたBlockingQueueで実現できます.BlockingQueueはQueueインタフェースのサブインタフェースで、Queueよりもtakeメソッドが1つ多く、pollメソッドと同じ機能を持っていますが、キューが空の場合nullを返さず、現在の呼び出しをブロックします.キューにデータまたはスレッドがinterrupt呼び出しによって中断されるまで(中断されるとInterruptedExceptionが放出されます)、ロックの問題を心配する必要はありません.BlockingQueueはスレッドが安全です.以下はBlockingQueueで実現されるSenderです.
    public static class BlockingSender extends Sender {

        BlockingQueue queue = new LinkedBlockingQueue<>();//   queue        

        @Override
        public void append(int data) {
            queue.add(data);//BlockingQueue      ,      
        }

        @Override
        public void run() {
            super.run();
            while(true) {
                System.out.println("loop running");
                Integer value = null;
                try {
                    value = queue.take();//        
                }
                catch(InterruptedException e) {
                    e.printStackTrace();
                }
                if(value != null) {
                    send(value);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

運転してみると、4回しか循環していないことがわかりました.
loop running
0: 5
loop running
1: 800
loop running
2: 302
loop running

実際に最後のループはまだ実行されておらず、次のデータを受信すると処理が続行されます.BlockingQueueは私たちの問題を完璧に解決しました.しかし、最後にもう一つの要件は、接続が切断された後、データ送信スレッドを停止することです.ブロックメカニズムがなければ、while()文で判断される変数を制御することでループを終了し、スレッドを終了させることができます.ブロックがある場合も簡単です.スレッドを呼び出すinterruptメソッドがブロックを中断し、異常処理でループを終了すればいいです.
    public static class BlockingInterruptSender extends Sender {

        BlockingQueue queue = new LinkedBlockingQueue<>();

        @Override
        public void append(int data) {
            queue.add(data);
        }

        @Override
        public void run() {
            super.run();
            while(true) {
                Integer value = null;
                try {
                    value = queue.take();
                }
                catch(InterruptedException e) {
                    break;//         
                }
                System.out.println("keep going");
                if(value != null) {
                    send(value);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        
        //         ( )
        
        //      
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        sender.interrupt();
    }

実行効果は次のとおりです.
keep going
0: 3
keep going
1: 804
keep going
2: 300

Process finished with exit code 0

プロセスが正常に終了しているのが見えますが、以前の各プロセスでは、送信スレッドがデッドサイクルまたはブロックされているため、手動でテストプログラムを中止するしかありません.
参照先:
  • コードファイル