『java.util.concurrentパッケージソース読解』15スレッドプールシリーズのScheduledThreadPoolExecutor第2部
16528 ワード
この文章は主にDelayedWorkQueueについて述べている.
ScheduledThreadPoolExecutorではDelayedWorkQueueを使用して実行するタスクを保存します.これらのタスクは遅延があり、実行するたびに最初のタスクを実行するため、DelayedWorkQueueでは必ず遅延時間の短さから長さでソートされます.
DelayedWorkQueueはスタックを使用して実現されます.
BlockingQueueの実装クラスを以前に解析したのと同様に,まずoffer法を見ると,基本的にはスタックに要素を追加する論理である.
ここでsiftUpを見てみると、スタックの実現に詳しい友达は、要素を既存のスタックに追加するアルゴリズムであることを簡単に理解できるはずです.
ではpollを見てみましょう
pollメソッドは、実行時点に達したタスクのみを返すため、キューが遅延実行をどのように実現するかを理解するのに意味がないため、takeメソッドに重点を置いてみましょう.
takeメソッドのポイントはleaderスレッドです.遅延時間があるので、タスクを手に入れてもスレッドは待つ必要があります.leaderスレッドはその最初にタスクを実行するスレッドです.
スレッドがタスクを取得した後も遅延実行の時間を待つ必要があるため、タイムアウト待ちのpollメソッドには少し意味があります.
上記のコードを分析することによって、基本的にマネージャはDelayedWorkQueueの実行遅延の原理を明らかにしました.
1.実行遅延が短い順から長い順にタスクをスタックに格納する.
2.リーダースレッドを介してタスクを取得したスレッドを所定の時点まで待たせてタスクを実行する.
ScheduledThreadPoolExecutorではDelayedWorkQueueを使用して実行するタスクを保存します.これらのタスクは遅延があり、実行するたびに最初のタスクを実行するため、DelayedWorkQueueでは必ず遅延時間の短さから長さでソートされます.
DelayedWorkQueueはスタックを使用して実現されます.
BlockingQueueの実装クラスを以前に解析したのと同様に,まずoffer法を見ると,基本的にはスタックに要素を追加する論理である.
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// , , ,
if (i >= queue.length)
grow();
size = i + 1;
//
if (i == 0) {
queue[0] = e;
// i RunnableScheduledFuture heapIndex
setIndex(e, 0);
} else {
//
siftUp(i, e);
}
// , ,
// , Condition
if (queue[0] == e) {
// , ,
//
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
ここでsiftUpを見てみると、スタックの実現に詳しい友达は、要素を既存のスタックに追加するアルゴリズムであることを簡単に理解できるはずです.
private void siftUp(int k, RunnableScheduledFuture key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
ではpollを見てみましょう
public RunnableScheduledFuture poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// , ,
// poll
RunnableScheduledFuture first = queue[0];
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
pollメソッドは、実行時点に達したタスクのみを返すため、キューが遅延実行をどのように実現するかを理解するのに意味がないため、takeメソッドに重点を置いてみましょう.
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// ,
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
//
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// ,
if (delay <= 0)
//
return finishPoll(first);
// , (leader
// , , ),
// take ,
else if (leader != null)
available.await();
// , leader
// Condition ,
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// signal , leader
// leader leader
// , signal awaitNanos
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
takeメソッドのポイントはleaderスレッドです.遅延時間があるので、タスクを手に入れてもスレッドは待つ必要があります.leaderスレッドはその最初にタスクを実行するスレッドです.
スレッドがタスクを取得した後も遅延実行の時間を待つ必要があるため、タイムアウト待ちのpollメソッドには少し意味があります.
public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
//
if (first == null) {
// nanos 0 :
// 1.
// 2.
if (nanos <= 0)
return null;
else
// ,
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
// leader nanos delay ,
// nanos , delay
// , leader signal
// , delay
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
//
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
上記のコードを分析することによって、基本的にマネージャはDelayedWorkQueueの実行遅延の原理を明らかにしました.
1.実行遅延が短い順から長い順にタスクをスタックに格納する.
2.リーダースレッドを介してタスクを取得したスレッドを所定の時点まで待たせてタスクを実行する.