Javaの渋滞行列の実現原理と応用シーンを分析する


私たちが普段使っているよくある列の中には、PriorityQue、Linked Listなどの非ブロック列があります。Dequeueインターフェースを実現しました。
非ブロッキングの列を使う時に大きな問題があります。現在のスレッドにブロッキングが発生しないと、消費者-生産者のようなモデルに対しては、追加的に同期戦略とスレッド間起動戦略を実現しなければならないので、これが実現するのは大変です。しかし、ブロック列があると違って、現在のスレッドに対してブロックが発生します。例えば、スレッドが空のブロック列から要素を取って、スレッドがブロックされて列の中に元素があります。キューに要素があると、ブロックされたスレッドが自動的に呼び覚まされます。このように非常に便利さを提供します。
一.いくつかの主要なブロック列
Java 1.5以降、java.util.co ncurrentパッケージの下でいくつかのブロック列が提供され、主に以下のようなものがある。
ArayBlockingQue:配列に基づいて実現されるブロック列は、ArayBlockingQueオブジェクトを作成する時に容量の大きさを指定しなければなりません。また、公平性と非公平性を指定することができ、デフォルトでは非公平であり、待ち時間が最も長い列が優先的にキューにアクセスできることを保証しない。
Linked Blocking Que:チェーンに基づいて実現されるブロック列は、Linked Blocking Queオブジェクトを作成する時に容量の大きさを指定しないと、デフォルトのサイズはInteger.MAX_です。VALE。
PriorityBlockingQue:以上の2つの列は先着順に列を出るが、PriorityBlockingQueはそうではない。元素の優先順位によって元素を並べ替えて、優先順位によって列を出る。毎回列を出る要素は優先度が一番高い元素である。注意してください。このブロック列は無境界ブロック列で、つまり容量に上限がないです。
DelayQue:PriorityQueに基づいて、待ち時間が列を塞いでいます。DelayQueの中の要素は指定された遅延時間が来たら、列からこの元素を獲得することができます。DelayQueも無制限の列ですので、列にデータを挿入する操作(生産者)は永遠にブロックされません。データを取得する操作(消費者)だけがブロックされます。
二.行列をブロックする方法VS行列をブロックしない方法
1.非閉塞行列のいくつかの主要な方法:
  • add(E e):要素eを列の最後に挿入し、挿入に成功したらtrueに戻る。挿入に失敗した場合(列がいっぱいになった場合)は、例外を投げます。
  • Remove():チームの最初の要素を除去し、削除に成功したらtrueに戻ります。削除に失敗した場合(キューが空です)、異常を投げます。
  • offer(E e):列の最後に要素eを挿入し、挿入に成功したらtrueに戻ります。挿入に失敗した場合(列がいっぱいである場合)はfalseに戻ります。
  • poll():チームの最初の要素を除去して取得し、成功すればチームの最初の要素に戻ります。nullに戻ります
  • peek():チームの最初の要素を取得し、成功すればチームの先頭要素に戻ります。そうでないとnull
  • に戻ります。
     
    ブロックされていない列については、一般的に、offer、poll、peekの3つの方法を使用することを勧め、addとremoveの方法を使用することを推奨しない。offer、poll、peekの3つの方法を使うと、戻り値で操作が成功するかどうかを判断できます。addとremoveの方法を使うと、このような効果は得られません。ブロックされていない列の中の方法は、同期されていません。
    2.行列をブロックするいくつかの主要な方法:
    ブロッキング・キューは、非ブロッキング・キューの大部分の方法を含み、前述の5つの方法は、ブロッキング・キューの中に存在するが、これらの5つの方法は、ブロッキング・キューの中で同期されたものであることに留意されたい。加えて、ブロック列は、他の4つの非常に有用な方法を提供する。
  • put(E e)
  • take()
  • offer(E e,long timeout,TimeUnit unit)
  • poll(long timeout、TimeUnit unit)
  •   
  • put方法は列の最後に元素を入れます。もし列がいっぱいになったら待ちます。
  • take方法はチームから元素を取ります。もし列が空いたら待ちます。
  • offer方法は列の最後に要素を保存するために用いられます。列がいっぱいになると、一定の時間が待っています。期限が到来した時、まだ挿入が成功していない場合、falseに戻ります。trueに戻ります
  • poll方法はチームから元素を取ります。もし列が空いたら、一定の時間を待っています。時間が来たら、取ったらnullに戻ります。取得した要素を返します。
  • 三.行列をブロックする実現原理
    もし列が空なら、消費者はずっと待っています。生産者が元素を追加する時、消費者はどうやって現在の列に元素があると知っていますか?渋滞の列を設計してもらうと、生産者と消費者が効率よく通信できるように設計しますか?まず、JDKがどのように実現されているかを見てみましょう。
    通知モードを使用して実現します。通知モードとは、生産者がいっぱいの列に要素を追加すると生産者がブロックされ、消費者が行列の中の要素を消費すると、生産者に現在のキューが利用できるように通知します。JDKのソースコードを確認すると、ArayBlockingQueはCondationを使って実現されました。コードは以下の通りです。
    
    private final Condition notFull;
    private final Condition notEmpty;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        //      
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
      }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          while (count == items.length)
            notFull.await();
          insert(e);
        } finally {
          lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          while (count == 0)
            notEmpty.await();
          return extract();
     } finally {
          lock.unlock();
        }
    }
    
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
      }
    
    
    私達が行列に要素を挿入する時、もし行列が使えないなら、生産者をブロックするのは主にLockSupport.parkを通ります。叶えて
    
    public final void await() throws InterruptedException {
          if (Thread.interrupted())
            throw new InterruptedException();
          Node node = addConditionWaiter();
          int savedState = fullyRelease(node);
          int interruptMode = 0;
          while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
          }
          if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
          if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
          if (interruptMode != 0)
    
    reportInterruptAfterWait(interruptMode);
        }
    
    
    ソースを入力してください。setBlockerを呼び出して、先にブロックされるスレッドを保存してください。その後、unsafe.parkを呼び出して、現在のスレッドをブロックします。
    
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
      }
    
    unsafe.parkはnative方法です。コードは以下の通りです。
    
    public native void park(boolean isAbsolute, long time);
    
    この方法は現在のスレッドをブロックし、次の4つの場合の1つだけが発生した場合に戻ります。
    parkに対応するunparkが実行または実行された場合。注意:既に実行されているとは、unparkが先に実行してから実行するparkのことです。
    スレッドが中断されている場合。
    パラメータのtimeがゼロでない場合、指定したミリ秒数を待っている場合。
    異常現象が発生した場合。これらの異常は事前に確認できません。
    私達は引き続きJVMがどのようにpark方法を実現しているかを見てみます。parkは異なるオペレーティングシステムで異なる方式で実現しています。linuxではシステム方法のpthread_を使用しています。cond_wait実現。JVMソースコードの経路src/os/linux/vm/os_linux.cppの中のos:PlatformEvent::park方法、コードは以下の通りです。
    
    void os::PlatformEvent::park() {   
          int v ;
       for (;;) {
     v = _Event ;
       if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
       }
       guarantee (v >= 0, "invariant") ;
       if (v == 0) {
       // Do this the hard way by blocking ...
       int status = pthread_mutex_lock(_mutex);
       assert_status(status == 0, status, "mutex_lock");
       guarantee (_nParked == 0, "invariant") ;
       ++ _nParked ;
       while (_Event < 0) {
       status = pthread_cond_wait(_cond, _mutex);
       // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
       // Treat this the same as if the wait was interrupted
       if (status == ETIME) { status = EINTR; }
       assert_status(status == 0 || status == EINTR, status, "cond_wait");
       }
       -- _nParked ;
       
       // In theory we could move the ST of 0 into _Event past the unlock(),
       // but then we'd need a MEMBAR after the ST.
       _Event = 0 ;
       status = pthread_mutex_unlock(_mutex);
       assert_status(status == 0, status, "mutex_unlock");
       }
       guarantee (_Event >= 0, "invariant") ;
       }
    
       }
    
    
    pthread_cond_waitはマルチスレッドの条件変数関数であり、condはconditionの略語であり、文字通りスレッドが条件発生を待っているという意味であり、この条件はグローバル変数である。この方法は二つのパラメータを受け取ります。一つの共有変数_cond,一つの反発量_mutexunpark方法はlinuxでpthread_を使います。cond_signalが実現しました。parkはwindowsでWaitForSingleObjectを使って実現しました。
    列がいっぱいになると生産者はブロック列に要素を挿入し、生産者スレッドはWAITING状態に入ります。jstack dumpを使ってブロックされた生産者スレッドを見ることができます。
    
    "main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
        at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
    
    
    四.例と使用シーン
    まずObject.wait()とObject.notify()、非閉塞行列を使って生産者-消費者モードを実現します。
    
    public class Test {
      private int queueSize = 10;
      private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
       
      public static void main(String[] args) {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
      }
       
      class Consumer extends Thread{
         
        @Override
        public void run() {
          consume();
        }
         
        private void consume() {
          while(true){
            synchronized (queue) {
              while(queue.size() == 0){
                try {
                  System.out.println("   ,    ");
                  queue.wait();
                } catch (InterruptedException e) {
                  e.printStackTrace();
                  queue.notify();
                }
              }
              queue.poll();     //        
              queue.notify();
              System.out.println("         ,    "+queue.size()+"   ");
            }
          }
        }
      }
       
      class Producer extends Thread{
         
        @Override
        public void run() {
          produce();
        }
         
        private void produce() {
          while(true){
            synchronized (queue) {
              while(queue.size() == queueSize){
                try {
                  System.out.println("   ,       ");
                  queue.wait();
                } catch (InterruptedException e) {
                  e.printStackTrace();
                  queue.notify();
                }
              }
              queue.offer(1);    //        
              queue.notify();
              System.out.println("           ,      :"+(queueSize-queue.size()));
            }
          }
        }
      }
    }
    
     これは経典の生産者-消費者モードで、列とObject.wait()とObject.notify()をブロックすることによって実現され、wait()とnotify()は主にスレッド間通信を実現するために用いられます。
    具体的なスレッド間通信方式(waitとnotifyの使用)は、後続の質問章で説明されます。
    次は閉塞行列を使用して実現される生産者-消費者モードです。
    
    public class Test {
      private int queueSize = 10;
      private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
       
      public static void main(String[] args) {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
      }
       
      class Consumer extends Thread{
         
        @Override
        public void run() {
          consume();
        }
         
        private void consume() {
          while(true){
            try {
              queue.take();
              System.out.println("         ,    "+queue.size()+"   ");
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }
      }
       
      class Producer extends Thread{
         
        @Override
        public void run() {
          produce();
        }
         
        private void produce() {
          while(true){
            try {
              queue.put(1);
              System.out.println("           ,      :"+(queueSize-queue.size()));
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }
      }
    }
    
     待ち行列コードを使うのはもっと簡単で、同期とスレッド間通信の問題を単独で考える必要がないことが分かりましたか?
    同時プログラミングでは、一般的に渋滞行列の使用が推奨されています。このようにすると、プログラムに予期しないエラーが発生することをできるだけ避けることができます。
    ブロック列は最も古典的なシーンで、クライアントデータの読み取りと解析を行い、データを読み込むスレッドは常にキューにデータを入れて、解析スレッドはキューからデータを取って解析します。他にも似たような場面があります。生産者の消費者モデルに合うものなら、ブロック列が使えます。