Netty 5ソースコード分析(7)--非同期実行FutureとPromise
12504 ワード
java.util.co ncurrent.FuturrentはJavaが提供するインターフェースであり、非同期的に実行されている状態を表しています。Futureのget方法は、タスクが実行されたかどうかを判断し、完了すれば結果に戻ります。
Default PromiseのaddListener方法を見て、非同期タスクが実行されている状態を判断し、実行が完了すれば、傍受者に通知することを理解します。そうでなければ、傍受者の列に参加します。
モニターに通知すると、スレッドを探して、モニターを呼び出すコールバック関数を実行します。
1.この過程でまずChanel.newPromiseを通じて非同期的なタスクを作成し、Promiseのインスタンスをregister方法に伝えます。
2.register方法はスレッドを探してregister 0の方法を実行し、第一歩の方法はタスクをスレッドに渡すと戻ってきます。これは非同期的に実行するプロセスである。
3.新規スレッドにPromiseの状態を設定します。Promiseが書いてもいいという意味のFutureを見ることができます。
4.元スレッドは、init AndRegisterの返却結果を得て実行を継続します。この場合は、2つのスレッドが実行されます。
5.元スレッドはPromiseの状態を判断して登録が完了したかどうかを判断し、登録が完了したら後続のdoBind 0を実行し、完了していない場合は、コールバックを追加する方法で非同期的に実行する。
final ChanelFuture regFuture=init AndRegister()
final Channel=regFuture.channel()
if(reg Future.cause)!null){
return reg Future;
}
final ChanelPromise promise;
if(reg Future.isDune){
promise=chanel.newPromise()
doBind 0(regFuture、chanel、local Address、promise);
} else{
// Registration future is almost always fulfilled already、but just in case it's not.
promise=new Default ChanelPromise;
regFuture.addListener(new ChanelFuture Listener){ @オーバーライド public void operation Coplete(ChanelFuture future)throws Exception{ doBind 0(regFuture、chanel、local Address、promise); } });
}
return promise
}
// Java FutureTask.get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
NettyはJavaのFutureを拡張して、最も主要な改善はモニターListenerインターフェースを増加して、モニターを通じて非同期実行をより効率的にすることができます。getを通じて非同期実行が終了するのを待つ必要はなく、モニターフィードバックによって非同期実行が終了する時点を正確に制御します。public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
boolean cancel(boolean mayInterruptIfRunning);
}
ChanelFutureインターフェースは、NettyのFutureインターフェースを拡張し、値を返していない非同期呼び出しを表し、同時にChanelに関連して、Channelと結合しています。public interface ChannelFuture extends Future<Void> {
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
}
PromiseインターフェースもFutureインターフェースを拡張しています。書き込み可能なFutureを表しています。非同期実行の結果を設定できます。public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
}
ChanelPromiseインターフェースはPromiseとChanelFutureを拡張して、Channelを結び付けて、また非同期の実行の構造を書くことができて、また傍受者の機能を備えて、Nettyが実際にプログラミングして使う非同期の実行を表すインターフェースです。public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
Channel channel();
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
}
Default ChanelPromiseはChanelPromiseの実装クラスであり、実際に実行された時のPromoiseの例である。ChannelインターフェースはnewPromiseインターフェースを提供して、Channelが非同期的に実行する動作を作成すると表しています。public interface Channel extends AttributeMap, Comparable<Channel> {
ChannelPromise newPromise();
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public ChannelPromise newPromise() {
return new DefaultChannelPromise(this);
}
}
Nettyは、addListenerを使用して非同期実行の結果をフィードバックすることを推奨し、この方法はFuture.getよりも優れており、非同期実行終了の時間をより正確に把握することができる。Default PromiseのaddListener方法を見て、非同期タスクが実行されている状態を判断し、実行が完了すれば、傍受者に通知することを理解します。そうでなければ、傍受者の列に参加します。
モニターに通知すると、スレッドを探して、モニターを呼び出すコールバック関数を実行します。
// DefaultPromise.addListener
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
notifyListener(executor(), this, listener);
return this;
}
synchronized (this) {
if (!isDone()) {
if (listeners == null) {
listeners = listener;
} else {
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> firstListener =
(GenericFutureListener<? extends Future<V>>) listeners;
listeners = new DefaultFutureListeners(firstListener, listener);
}
}
return this;
}
}
notifyListener(executor(), this, listener);
return this;
}
protected static void notifyListener(
final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) {
if (eventExecutor.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
LISTENER_STACK_DEPTH.set(stackDepth + 1);
try {
notifyListener0(future, l);
} finally {
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}
try {
eventExecutor.execute(new Runnable() {
@Override
public void run() {
notifyListener0(future, l);
}
});
} catch (Throwable t) {
logger.error("Failed to notify a listener. Event loop shut down?", t);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
また、傍受者のインターフェースを見てみると、異ステップのタスクの実行が完了したら、Futureの結果を得て、コールバックの論理を実行する方法があります。public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
一例を見ると、サーババインディング中に、init AndRegisterを呼び出してChannelを作成し登録します。1.この過程でまずChanel.newPromiseを通じて非同期的なタスクを作成し、Promiseのインスタンスをregister方法に伝えます。
2.register方法はスレッドを探してregister 0の方法を実行し、第一歩の方法はタスクをスレッドに渡すと戻ってきます。これは非同期的に実行するプロセスである。
3.新規スレッドにPromiseの状態を設定します。Promiseが書いてもいいという意味のFutureを見ることができます。
4.元スレッドは、init AndRegisterの返却結果を得て実行を継続します。この場合は、2つのスレッドが実行されます。
5.元スレッドはPromiseの状態を判断して登録が完了したかどうかを判断し、登録が完了したら後続のdoBind 0を実行し、完了していない場合は、コールバックを追加する方法で非同期的に実行する。
final ChannelFuture initAndRegister() {
Channel channel;
try {
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
<pre name="code" class="java"> public final void register(final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
<strong> eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});</strong>
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
}
}
prvate ChanelFuture doBind(final SocketAddres local Address){final ChanelFuture regFuture=init AndRegister()
final Channel=regFuture.channel()
if(reg Future.cause)!null){
return reg Future;
}
final ChanelPromise promise;
if(reg Future.isDune){
promise=chanel.newPromise()
doBind 0(regFuture、chanel、local Address、promise);
} else{
// Registration future is almost always fulfilled already、but just in case it's not.
promise=new Default ChanelPromise;
regFuture.addListener(new ChanelFuture Listener){ @オーバーライド public void operation Coplete(ChanelFuture future)throws Exception{ doBind 0(regFuture、chanel、local Address、promise); } });
}
return promise
}