java併発のArayBlocking Que詳細紹介
java併発のArayBlocking Que詳細紹介
ArayBlockingQueはよく使われるスレッドの集合であり、オンラインのプログラムでも常にタスクの行列として使われています。使用頻度は特に高い。彼は維持されている循環行列(配列に基づいて実現されます)であり、循環構造はデータ構造の中では一般的ですが、ソースの実現にはまだまれです。
スレッド安全の実現
スレッド安全キューは、基本的にロックなしです。ArayBlockingQueはRentrant Lockを使用しており、2つのContditionに合わせて、集合スレッドの安全操作を実現しています。ここでちょっといい習慣を言います。下にメンバー変数の声明があります。
行列をブロックするのによく使われるのが生産者消費者です。一般的には生産者が入れて、消費者はデータを一から取ります。この二つの操作を重点的に説明します。
この二つの操作はいずれもロックによってスレッドの安全を保証しています。
生産の操作
消費操作
削除操作
読んでくれてありがとうございます。みなさんのご協力をお願いします。ありがとうございます。
ArayBlockingQueはよく使われるスレッドの集合であり、オンラインのプログラムでも常にタスクの行列として使われています。使用頻度は特に高い。彼は維持されている循環行列(配列に基づいて実現されます)であり、循環構造はデータ構造の中では一般的ですが、ソースの実現にはまだまれです。
スレッド安全の実現
スレッド安全キューは、基本的にロックなしです。ArayBlockingQueはRentrant Lockを使用しており、2つのContditionに合わせて、集合スレッドの安全操作を実現しています。ここでちょっといい習慣を言います。下にメンバー変数の声明があります。
private static final long serialVersionUID = -817911632652898426L;
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null;
賦の操作は基本的にコンストラクタで行われます。このように利点があります。コード実行は制御可能です。メンバー変数の初期化も構造方法に統合されますが、実行順序についてはよく考えてください。構造方法に書いて初期化すれば、問題はありません。行列をブロックするのによく使われるのが生産者消費者です。一般的には生産者が入れて、消費者はデータを一から取ります。この二つの操作を重点的に説明します。
この二つの操作はいずれもロックによってスレッドの安全を保証しています。
生産の操作
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
putなどを入れて操作します。まずロックを取って、データがいっぱいであることを発見したら、notFullのconditionでスレッドをブロックします。ここの条件判定は必ずifではなくwhileを使います。マルチスレッドの場合は、呼び覚まされたらまたいっぱいになります。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
これは列に入る操作です。まず、メンテナンスの配列を取得します。puttingdexは操作を入れるマークです。この操作はずっと入れます。所定の長さになると0から数え始めます。このように挿入する操作はサイクルの操作です。countはカウントをして、データを挿入できるかどうかの基準として、データを挿入したらnotEmptyのconditionで消費スレッドを起動する信号を送ります。消費操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
消費の方法もそうです。まずロックを取得し、条件判定を行い、データがないとスレッドをブロックします。注意点はputと同じです。
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
データを取る時も、take Indexに頼っています。これはフラグです。この数値もずっと増えています。最初のデータの位置を表します。このマークが最後に来たら、0になって、また初めから来ます。このようにして取り出したデータはすべてfifoの順序であることを保証します。削除時に反復が発見された場合は、ディケンサの巡回が変更されます。そしてnotFullのconditionで生産スレッドを呼び覚まします。削除操作
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
removeの操作が面倒です。まずロックを取ってから、二つのマークをローカライズして、削除する要素の位置を見つけます。RemoveAtを呼び出します。ここで削除するには、マークの位置を変更する必要があります。
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
削除された要素が、taeindexと同じ位置にある場合。そのまま削除して、マークの位置を削除して後ろに移動します。そうでない場合は、削除された位置から開始し、前に向かうデータの上書きを行う操作を行います。putindexの前の位置に出会うまで。その位置のデータをnullに設定します。そして、puttidexの位置を前に1マス移動し、繰り返している間にデータを削除し、生産スレッドを起動します。読んでくれてありがとうございます。みなさんのご協力をお願いします。ありがとうございます。