JAva同時実行のArrayBlockingQueue詳細
4977 ワード
JAva同時実行のArrayBlockingQueue詳細
ArrayBlockingQueueはよく使用されるスレッドの集合であり、オンライン・スレッド・プールでもタスク・キューとして使用されることが多い.使用頻度が特に高い.彼はメンテナンスされている循環キュー(配列ベースの実装)であり、循環構造はデータ構造ではよく見られるが、ソースコード実装では珍しい.
スレッドセキュリティの実装
スレッドセキュリティキューは、基本的にロックから離れられません.ArrayBlockingQueueはReentrantLockを使用しており,2つのConditionを組み合わせて集合スレッドのセキュリティ動作を実現している.ここで少し良い習慣を言います.次はメンバー変数の宣言です.
付与された操作は基本的に構造関数で行われます.これにより、コード実行が制御可能になるというメリットがあります.メンバー変数の初期化も構造メソッドに統合して実行されますが、実行順序はよく吟味し、構造メソッドに初期化と書いてあれば問題ありません.
行列をブロックする常用場所は生産者消費者である.一般的には生産者が入れ、消費者は最初からデータを取ります.次に、この2つの操作に重点を置きます.
この2つの操作は、ロックによってスレッドの安全を保証します.
せいさんそうさ
putなどを入れて操作すると、まずロックを取得し、データがいっぱいになったことを発見したら、notFullのconditionでスレッドをブロックします.ここでの条件判定はifではなくwhileであるに違いないが,マルチスレッドの場合,呼び覚まされてまたいっぱいになったことがわかる.
これはキューに入る操作です.まず、メンテナンスされた配列を取得します.putindexは操作を入れるフラグです.この操作はずっと加算されます.所定の長さになると0になって最初からカウントします.このように挿入される操作は循環的な操作であり、countはカウントに用いられ、データを挿入できるかどうかの基準として、データを挿入した後、notEmptyのconditionを通じて消費スレッドを呼び覚ます信号を発する.
消費操作
消費の仕方もそうです.まずロックを取得し,条件判断を行い,データがなければスレッドをブロックする.注意点はputと同じです.
データを取るときもtakeIndexに頼って、これはフラグで、この数値もずっと増加して、取った最初のデータの位置を表します.このマークが最後まで歩いて0になったら、最初からやり直します.これにより、取り出したデータがfifoの順序であることが保証されます.削除時に反復が検出されると、反復器のループが変更されます.次に、notFullのconditionによって本番スレッドを起動します.
削除アクション
remove操作は面倒ですが、まずロックを取得した後、2つのフラグビットをローカライズし、削除する要素の位置を見つけます.removeAtを呼び出します.ここで削除するにはフラグビットを変更する必要があります.
削除した要素がtakeindexと同じ場所である場合.それは直接削除して、削除フラグを後ろに移動させることができます.そうでない場合は、削除した場所から、前のデータに向かって上書きする操作を行います.putindexの前の位置に出会うまで.そしてその位置のデータをnullに設定します.さらにputindexの位置を1つ前に移動し、反復中にデータを削除し、本番スレッドを起動します.
読書に感謝して、みんなを助けることができることを望んで、みんなの当駅に対する支持に感謝します!
ArrayBlockingQueueはよく使用されるスレッドの集合であり、オンライン・スレッド・プールでもタスク・キューとして使用されることが多い.使用頻度が特に高い.彼はメンテナンスされている循環キュー(配列ベースの実装)であり、循環構造はデータ構造ではよく見られるが、ソースコード実装では珍しい.
スレッドセキュリティの実装
スレッドセキュリティキューは、基本的にロックから離れられません.ArrayBlockingQueueはReentrantLockを使用しており,2つのConditionを組み合わせて集合スレッドのセキュリティ動作を実現している.ここで少し良い習慣を言います.次はメンバー変数の宣言です.
private static final long serialVersionUID = -817911632652898426L;
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null;
付与された操作は基本的に構造関数で行われます.これにより、コード実行が制御可能になるというメリットがあります.メンバー変数の初期化も構造メソッドに統合して実行されますが、実行順序はよく吟味し、構造メソッドに初期化と書いてあれば問題ありません.
行列をブロックする常用場所は生産者消費者である.一般的には生産者が入れ、消費者は最初からデータを取ります.次に、この2つの操作に重点を置きます.
この2つの操作は、ロックによってスレッドの安全を保証します.
せいさんそうさ
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
putなどを入れて操作すると、まずロックを取得し、データがいっぱいになったことを発見したら、notFullのconditionでスレッドをブロックします.ここでの条件判定はifではなくwhileであるに違いないが,マルチスレッドの場合,呼び覚まされてまたいっぱいになったことがわかる.
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
これはキューに入る操作です.まず、メンテナンスされた配列を取得します.putindexは操作を入れるフラグです.この操作はずっと加算されます.所定の長さになると0になって最初からカウントします.このように挿入される操作は循環的な操作であり、countはカウントに用いられ、データを挿入できるかどうかの基準として、データを挿入した後、notEmptyのconditionを通じて消費スレッドを呼び覚ます信号を発する.
消費操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
消費の仕方もそうです.まずロックを取得し,条件判断を行い,データがなければスレッドをブロックする.注意点はputと同じです.
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
データを取るときもtakeIndexに頼って、これはフラグで、この数値もずっと増加して、取った最初のデータの位置を表します.このマークが最後まで歩いて0になったら、最初からやり直します.これにより、取り出したデータがfifoの順序であることが保証されます.削除時に反復が検出されると、反復器のループが変更されます.次に、notFullのconditionによって本番スレッドを起動します.
削除アクション
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
remove操作は面倒ですが、まずロックを取得した後、2つのフラグビットをローカライズし、削除する要素の位置を見つけます.removeAtを呼び出します.ここで削除するにはフラグビットを変更する必要があります.
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
削除した要素がtakeindexと同じ場所である場合.それは直接削除して、削除フラグを後ろに移動させることができます.そうでない場合は、削除した場所から、前のデータに向かって上書きする操作を行います.putindexの前の位置に出会うまで.そしてその位置のデータをnullに設定します.さらにputindexの位置を1つ前に移動し、反復中にデータを削除し、本番スレッドを起動します.
読書に感謝して、みんなを助けることができることを望んで、みんなの当駅に対する支持に感謝します!