Netty 5ソースコード分析(7)--非同期実行FutureとPromise

12504 ワード

java.util.co ncurrent.FuturrentはJavaが提供するインターフェースであり、非同期的に実行されている状態を表しています。Futureのget方法は、タスクが実行されたかどうかを判断し、完了すれば結果に戻ります。
// Java FutureTask.get()

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
NettyはJavaのFutureを拡張して、最も主要な改善はモニターListenerインターフェースを増加して、モニターを通じて非同期実行をより効率的にすることができます。getを通じて非同期実行が終了するのを待つ必要はなく、モニターフィードバックによって非同期実行が終了する時点を正確に制御します。
public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();

    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> sync() throws InterruptedException;

    Future<V> syncUninterruptibly();

    Future<V> await() throws InterruptedException;

    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    boolean await(long timeoutMillis) throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    boolean awaitUninterruptibly(long timeoutMillis);

    V getNow();

    boolean cancel(boolean mayInterruptIfRunning);
}
ChanelFutureインターフェースは、NettyのFutureインターフェースを拡張し、値を返していない非同期呼び出しを表し、同時にChanelに関連して、Channelと結合しています。
public interface ChannelFuture extends Future<Void> {

    Channel channel();

    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();
}
PromiseインターフェースもFutureインターフェースを拡張しています。書き込み可能なFutureを表しています。非同期実行の結果を設定できます。
public interface Promise<V> extends Future<V> {

    Promise<V> setSuccess(V result);

    boolean trySuccess(V result);

    Promise<V> setFailure(Throwable cause);

    boolean tryFailure(Throwable cause);
} 
ChanelPromiseインターフェースはPromiseとChanelFutureを拡張して、Channelを結び付けて、また非同期の実行の構造を書くことができて、また傍受者の機能を備えて、Nettyが実際にプログラミングして使う非同期の実行を表すインターフェースです。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();
}
Default ChanelPromiseはChanelPromiseの実装クラスであり、実際に実行された時のPromoiseの例である。ChannelインターフェースはnewPromiseインターフェースを提供して、Channelが非同期的に実行する動作を作成すると表しています。
public interface Channel extends AttributeMap, Comparable<Channel> {
    ChannelPromise newPromise();
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(this);
    }
}


Nettyは、addListenerを使用して非同期実行の結果をフィードバックすることを推奨し、この方法はFuture.getよりも優れており、非同期実行終了の時間をより正確に把握することができる。
Default PromiseのaddListener方法を見て、非同期タスクが実行されている状態を判断し、実行が完了すれば、傍受者に通知することを理解します。そうでなければ、傍受者の列に参加します。
モニターに通知すると、スレッドを探して、モニターを呼び出すコールバック関数を実行します。
// DefaultPromise.addListener
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listener == null) {
            throw new NullPointerException("listener");
        }

        if (isDone()) {
            notifyListener(executor(), this, listener);
            return this;
        }

        synchronized (this) {
            if (!isDone()) {
                if (listeners == null) {
                    listeners = listener;
                } else {
                    if (listeners instanceof DefaultFutureListeners) {
                        ((DefaultFutureListeners) listeners).add(listener);
                    } else {
                        @SuppressWarnings("unchecked")
                        final GenericFutureListener<? extends Future<V>> firstListener =
                                (GenericFutureListener<? extends Future<V>>) listeners;
                        listeners = new DefaultFutureListeners(firstListener, listener);
                    }
                }
                return this;
            }
        }

        notifyListener(executor(), this, listener);
        return this;
    }

protected static void notifyListener(
            final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) {

        if (eventExecutor.inEventLoop()) {
            final Integer stackDepth = LISTENER_STACK_DEPTH.get();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                LISTENER_STACK_DEPTH.set(stackDepth + 1);
                try {
                    notifyListener0(future, l);
                } finally {
                    LISTENER_STACK_DEPTH.set(stackDepth);
                }
                return;
            }
        }

        try {
            eventExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    notifyListener0(future, l);
                }
            });
        } catch (Throwable t) {
            logger.error("Failed to notify a listener. Event loop shut down?", t);
        }
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }
また、傍受者のインターフェースを見てみると、異ステップのタスクの実行が完了したら、Futureの結果を得て、コールバックの論理を実行する方法があります。
public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    void operationComplete(F future) throws Exception;
}
一例を見ると、サーババインディング中に、init AndRegisterを呼び出してChannelを作成し登録します。
1.この過程でまずChanel.newPromiseを通じて非同期的なタスクを作成し、Promiseのインスタンスをregister方法に伝えます。
2.register方法はスレッドを探してregister 0の方法を実行し、第一歩の方法はタスクをスレッドに渡すと戻ってきます。これは非同期的に実行するプロセスである。
3.新規スレッドにPromiseの状態を設定します。Promiseが書いてもいいという意味のFutureを見ることができます。
4.元スレッドは、init AndRegisterの返却結果を得て実行を継続します。この場合は、2つのスレッドが実行されます。
5.元スレッドはPromiseの状態を判断して登録が完了したかどうかを判断し、登録が完了したら後続のdoBind 0を実行し、完了していない場合は、コールバックを追加する方法で非同期的に実行する。
final ChannelFuture initAndRegister() {
        Channel channel;
        try {
            channel = createChannel();
        } catch (Throwable t) {
            return VoidChannel.INSTANCE.newFailedFuture(t);
        }

        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelPromise regFuture = channel.newPromise();
        channel.unsafe().register(regFuture);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
      because register(), bind(), and connect() are all bound to the same thread.

      return regFuture;
    }

<pre name="code" class="java"> public final void register(final ChannelPromise promise) {
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                   <strong> eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });</strong>
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    promise.setFailure(t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
            }
        }
prvate ChanelFuture doBind(final SocketAddres local Address){
        final ChanelFuture regFuture=init AndRegister()
        final Channel=regFuture.channel()
        if(reg Future.cause)!null){
            return reg Future;
        }
        final ChanelPromise promise;
       
if(reg Future.isDune){
            promise=chanel.newPromise()
            doBind 0(regFuture、chanel、local Address、promise);
        } else{
            // Registration future is almost always fulfilled already、but just in case it's not.
            promise=new Default ChanelPromise;
           
regFuture.addListener(new ChanelFuture Listener){                @オーバーライド                public void operation Coplete(ChanelFuture future)throws Exception{                    doBind 0(regFuture、chanel、local Address、promise);                }             });
        }
        return promise
    }