Fork/Joinフレームワークの両端キュー
5786 ワード
概要
ForkJoinPoolはForkJoinWorkerThreadスレッドを管理しています.ForkJoinWorkerThreadスレッドの内部には、主に1つの配列queue、配列下付きqueueBase、配列上付きqueueTopの3つの値によって保証される2つのエンドキューがあります.
ForkJoinTask[]queue:配列の大きさは2のn次方でなければならず、型取りをシフト演算に変換するのに便利である.
int queueTop:次のpushまたはpopの位置を識別します.この値は現在のスレッドによってのみ変更されます.volatile修飾されていないためです.
volatile int queueBase:次は他のスレッドstealの位置で、他のスレッドがこの値を変更するため、volatile修飾で可視性を保証します.
初期化
スレッドのrunメソッドが起動すると、スレッドのonStart()メソッドが呼び出され、このメソッドではqueueが初期化され、長さは1<<13であり、このメソッドではqueueTop、queueBaseに値を付けず、デフォルト値0を採用します.
かくさんようりょう
スレッドにタスクを追加すると、次のコードに示すように、配列がいっぱいになる可能性があります.final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
growQueue();
}
}
ここで、sはqueueTopの値を表し、mは配列長-1であり、s==mの場合、つまりqueue配列にはタスクがいっぱい配置されているため、配列を拡張する必要がある.private void growQueue() {
ForkJoinTask<?>[] oldQ = queue;
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;
ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE;
Object x = UNSAFE.getObjectVolatile(oldQ, u);
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}
以上の拡張コードから分かるように、最大容量はMAXIMUM_を超えてはならないQUEUE_CAPACITY(1<<24)は、最小で初期値を下回ってはならない.拡張が以前のサイズの2倍になるたびに、元の配列を新しい配列にコピーし、古い配列をnullにします.拡張の過程でqueueBaseとqueueTopは変化する必要はありません.
エンキュー
スレッドキューにタスクを追加したり、スレッドプールにタスクを追加したりすると、このタスクがForkJoinTaskインスタンスの場合、キューに入る操作が行われます.前にこのコードがありますが、ここで簡単に分析します.long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
最初の行、配列内のqueueTopの位置を見つけます.
2行目、新しいタスクでqueueTopの位置を入力
3行目、queueTopに1を追加します.
キューから出る
ローカルスレッドはタスクを実行する必要があります
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
t.doExec();
if (queueTop == queueBase)
break;
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount;
currentSteal = null;
}
locallyFifoという属性に注意して、自分のキューにFIFOポリシーを採用するかどうか、デフォルトはfalse、すなわちデフォルトはqueueTop側からタスクを取ります.この値がfalseの場合、queueBase側からデータを取得します.この値はForkJoinPoolクラスのasyncModeプロパティで変更できます.final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}
重要な2つの言葉:
queueBase=b+1:FIFOポリシーqueueBaseからタスクを取得するたびに、1つずつqueueBaseが1増加します.--s,queueTop=s:LIFOポリシーqueueTopからタスクを取得するたびに、1つずつqueueTopが1減少します.
他のスレッドはタスク実行を盗む必要があります
以下はwork-stealingのコアコードですfor (;;) {
ForkJoinTask<?>[] q; int b, i;
if (joinMe.status < 0)
break outer;
if ((b = v.queueBase) == v.queueTop ||
(q = v.queue) == null ||
(i = (q.length-1) & b) < 0)
break; // empty
long u = (i << ASHIFT) + ABASE;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
break outer; // stale
if (t != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
helped = true;
}
}
1、i番目の位置を狙うこのタスク、i=(q.length-1)&b、iは実はqueueBaseが配列の中にある位置である.2、この位置のタスクをnullに設定し、queueBaseの値を増やし、stealHintを設定してあなたのものが私に盗まれたことを示します.
3、以前のcurrentSteal値を保存し、currentStealをこの盗まれたtaskに設定し、このtaskを実行し、実行が完了したらcurrentStealの値を復元します.
スレッドのrunメソッドが起動すると、スレッドのonStart()メソッドが呼び出され、このメソッドではqueueが初期化され、長さは1<<13であり、このメソッドではqueueTop、queueBaseに値を付けず、デフォルト値0を採用します.
かくさんようりょう
スレッドにタスクを追加すると、次のコードに示すように、配列がいっぱいになる可能性があります.final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
growQueue();
}
}
ここで、sはqueueTopの値を表し、mは配列長-1であり、s==mの場合、つまりqueue配列にはタスクがいっぱい配置されているため、配列を拡張する必要がある.private void growQueue() {
ForkJoinTask<?>[] oldQ = queue;
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;
ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE;
Object x = UNSAFE.getObjectVolatile(oldQ, u);
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}
以上の拡張コードから分かるように、最大容量はMAXIMUM_を超えてはならないQUEUE_CAPACITY(1<<24)は、最小で初期値を下回ってはならない.拡張が以前のサイズの2倍になるたびに、元の配列を新しい配列にコピーし、古い配列をnullにします.拡張の過程でqueueBaseとqueueTopは変化する必要はありません.
エンキュー
スレッドキューにタスクを追加したり、スレッドプールにタスクを追加したりすると、このタスクがForkJoinTaskインスタンスの場合、キューに入る操作が行われます.前にこのコードがありますが、ここで簡単に分析します.long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
最初の行、配列内のqueueTopの位置を見つけます.
2行目、新しいタスクでqueueTopの位置を入力
3行目、queueTopに1を追加します.
キューから出る
ローカルスレッドはタスクを実行する必要があります
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
t.doExec();
if (queueTop == queueBase)
break;
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount;
currentSteal = null;
}
locallyFifoという属性に注意して、自分のキューにFIFOポリシーを採用するかどうか、デフォルトはfalse、すなわちデフォルトはqueueTop側からタスクを取ります.この値がfalseの場合、queueBase側からデータを取得します.この値はForkJoinPoolクラスのasyncModeプロパティで変更できます.final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}
重要な2つの言葉:
queueBase=b+1:FIFOポリシーqueueBaseからタスクを取得するたびに、1つずつqueueBaseが1増加します.--s,queueTop=s:LIFOポリシーqueueTopからタスクを取得するたびに、1つずつqueueTopが1減少します.
他のスレッドはタスク実行を盗む必要があります
以下はwork-stealingのコアコードですfor (;;) {
ForkJoinTask<?>[] q; int b, i;
if (joinMe.status < 0)
break outer;
if ((b = v.queueBase) == v.queueTop ||
(q = v.queue) == null ||
(i = (q.length-1) & b) < 0)
break; // empty
long u = (i << ASHIFT) + ABASE;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
break outer; // stale
if (t != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
helped = true;
}
}
1、i番目の位置を狙うこのタスク、i=(q.length-1)&b、iは実はqueueBaseが配列の中にある位置である.2、この位置のタスクをnullに設定し、queueBaseの値を増やし、stealHintを設定してあなたのものが私に盗まれたことを示します.
3、以前のcurrentSteal値を保存し、currentStealをこの盗まれたtaskに設定し、このtaskを実行し、実行が完了したらcurrentStealの値を復元します.
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
growQueue();
}
}
private void growQueue() {
ForkJoinTask<?>[] oldQ = queue;
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;
ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE;
Object x = UNSAFE.getObjectVolatile(oldQ, u);
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}
スレッドキューにタスクを追加したり、スレッドプールにタスクを追加したりすると、このタスクがForkJoinTaskインスタンスの場合、キューに入る操作が行われます.前にこのコードがありますが、ここで簡単に分析します.
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
最初の行、配列内のqueueTopの位置を見つけます.
2行目、新しいタスクでqueueTopの位置を入力
3行目、queueTopに1を追加します.
キューから出る
ローカルスレッドはタスクを実行する必要があります
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
t.doExec();
if (queueTop == queueBase)
break;
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount;
currentSteal = null;
}
locallyFifoという属性に注意して、自分のキューにFIFOポリシーを採用するかどうか、デフォルトはfalse、すなわちデフォルトはqueueTop側からタスクを取ります.この値がfalseの場合、queueBase側からデータを取得します.この値はForkJoinPoolクラスのasyncModeプロパティで変更できます.final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}
重要な2つの言葉:
queueBase=b+1:FIFOポリシーqueueBaseからタスクを取得するたびに、1つずつqueueBaseが1増加します.--s,queueTop=s:LIFOポリシーqueueTopからタスクを取得するたびに、1つずつqueueTopが1減少します.
他のスレッドはタスク実行を盗む必要があります
以下はwork-stealingのコアコードですfor (;;) {
ForkJoinTask<?>[] q; int b, i;
if (joinMe.status < 0)
break outer;
if ((b = v.queueBase) == v.queueTop ||
(q = v.queue) == null ||
(i = (q.length-1) & b) < 0)
break; // empty
long u = (i << ASHIFT) + ABASE;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
break outer; // stale
if (t != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
helped = true;
}
}
1、i番目の位置を狙うこのタスク、i=(q.length-1)&b、iは実はqueueBaseが配列の中にある位置である.2、この位置のタスクをnullに設定し、queueBaseの値を増やし、stealHintを設定してあなたのものが私に盗まれたことを示します.
3、以前のcurrentSteal値を保存し、currentStealをこの盗まれたtaskに設定し、このtaskを実行し、実行が完了したらcurrentStealの値を復元します.
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
t.doExec();
if (queueTop == queueBase)
break;
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount;
currentSteal = null;
}
final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}
for (;;) {
ForkJoinTask<?>[] q; int b, i;
if (joinMe.status < 0)
break outer;
if ((b = v.queueBase) == v.queueTop ||
(q = v.queue) == null ||
(i = (q.length-1) & b) < 0)
break; // empty
long u = (i << ASHIFT) + ABASE;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
break outer; // stale
if (t != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
helped = true;
}
}