JAva同時実行のArrayBlockingQueue詳細

4977 ワード

JAva同時実行のArrayBlockingQueue詳細
ArrayBlockingQueueはよく使用されるスレッドの集合であり、オンライン・スレッド・プールでもタスク・キューとして使用されることが多い.使用頻度が特に高い.彼はメンテナンスされている循環キュー(配列ベースの実装)であり、循環構造はデータ構造ではよく見られるが、ソースコード実装では珍しい.
スレッドセキュリティの実装
スレッドセキュリティキューは、基本的にロックから離れられません.ArrayBlockingQueueはReentrantLockを使用しており,2つのConditionを組み合わせて集合スレッドのセキュリティ動作を実現している.ここで少し良い習慣を言います.次はメンバー変数の宣言です.

 private static final long serialVersionUID = -817911632652898426L;
  final Object[] items;
  int takeIndex;
  int putIndex;
  int count;
  final ReentrantLock lock;
  private final Condition notEmpty;
  private final Condition notFull;
  transient Itrs itrs = null;

付与された操作は基本的に構造関数で行われます.これにより、コード実行が制御可能になるというメリットがあります.メンバー変数の初期化も構造メソッドに統合して実行されますが、実行順序はよく吟味し、構造メソッドに初期化と書いてあれば問題ありません.
行列をブロックする常用場所は生産者消費者である.一般的には生産者が入れ、消費者は最初からデータを取ります.次に、この2つの操作に重点を置きます.
この2つの操作は、ロックによってスレッドの安全を保証します.
せいさんそうさ

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (count == items.length)
        notFull.await();
      enqueue(e);
    } finally {
      lock.unlock();
    }
  }

putなどを入れて操作すると、まずロックを取得し、データがいっぱいになったことを発見したら、notFullのconditionでスレッドをブロックします.ここでの条件判定はifではなくwhileであるに違いないが,マルチスレッドの場合,呼び覚まされてまたいっぱいになったことがわかる.

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
      putIndex = 0;
    count++;
    notEmpty.signal();
  }

これはキューに入る操作です.まず、メンテナンスされた配列を取得します.putindexは操作を入れるフラグです.この操作はずっと加算されます.所定の長さになると0になって最初からカウントします.このように挿入される操作は循環的な操作であり、countはカウントに用いられ、データを挿入できるかどうかの基準として、データを挿入した後、notEmptyのconditionを通じて消費スレッドを呼び覚ます信号を発する.
消費操作

 public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (count == 0)
        notEmpty.await();
      return dequeue();
    } finally {
      lock.unlock();
    }
  }

消費の仕方もそうです.まずロックを取得し,条件判断を行い,データがなければスレッドをブロックする.注意点はputと同じです. 

  private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    if (itrs != null)
      itrs.elementDequeued();
    notFull.signal();
    return x;
  }

データを取るときもtakeIndexに頼って、これはフラグで、この数値もずっと増加して、取った最初のデータの位置を表します.このマークが最後まで歩いて0になったら、最初からやり直します.これにより、取り出したデータがfifoの順序であることが保証されます.削除時に反復が検出されると、反復器のループが変更されます.次に、notFullのconditionによって本番スレッドを起動します.
削除アクション

 public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      if (count > 0) {
        final int putIndex = this.putIndex;
        int i = takeIndex;
        do {
          if (o.equals(items[i])) {
            removeAt(i);
            return true;
          }
          if (++i == items.length)
            i = 0;
        } while (i != putIndex);
      }
      return false;
    } finally {
      lock.unlock();
    }
  }

remove操作は面倒ですが、まずロックを取得した後、2つのフラグビットをローカライズし、削除する要素の位置を見つけます.removeAtを呼び出します.ここで削除するにはフラグビットを変更する必要があります.  

 void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
      items[takeIndex] = null;
      if (++takeIndex == items.length)
        takeIndex = 0;
      count--;
      if (itrs != null)
        itrs.elementDequeued();
    } else {
      final int putIndex = this.putIndex;
      for (int i = removeIndex;;) {
        int next = i + 1;
        if (next == items.length)
          next = 0;
        if (next != putIndex) {
          items[i] = items[next];
          i = next;
        } else {
          items[i] = null;
          this.putIndex = i;
          break;
        }
      }
      count--;
      if (itrs != null)
        itrs.removedAt(removeIndex);
    }
    notFull.signal();
  }

削除した要素がtakeindexと同じ場所である場合.それは直接削除して、削除フラグを後ろに移動させることができます.そうでない場合は、削除した場所から、前のデータに向かって上書きする操作を行います.putindexの前の位置に出会うまで.そしてその位置のデータをnullに設定します.さらにputindexの位置を1つ前に移動し、反復中にデータを削除し、本番スレッドを起動します.
読書に感謝して、みんなを助けることができることを望んで、みんなの当駅に対する支持に感謝します!