Netty 4.1.Xソース再読み込み1 Futureプロファイリング

8651 ワード

Apache MinaでFutureアクションが定義されています.Netty4.1.xによるFuture操作の再設計.
io.netty.util.concurrentパッケージにはpublic interface Future extends javaが定義されています.util.concurrent.JDK Futureインタフェースを実装するFutureインタフェース.
io.netty.util.concurrent.Futureインタフェースは、非同期の操作結果を表すインタフェースです.The result of an asynchronous operation.
public interface Future extends java.util.concurrent.Future {
    boolean isSuccess();
    boolean isCancellable();
    Throwable cause();
    Future addListener(GenericFutureListener extends Future super V>> listener);
    Future addListeners(GenericFutureListener extends Future super V>>... listeners);
 
    Future removeListener(GenericFutureListener extends Future super V>> listener);
    Future removeListeners(GenericFutureListener extends Future super V>>... listeners);
  
    Future sync() throws InterruptedException;
    Future syncUninterruptibly();
    Future await() throws InterruptedException;
    Future awaitUninterruptibly();
  
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
   
    boolean await(long timeoutMillis) throws InterruptedException;
 
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
   
    boolean awaitUninterruptibly(long timeoutMillis);
 
    V getNow();
    boolean cancel(boolean mayInterruptIfRunning);
}
Futureインタフェースは,非同期操作結果の取得,Listenerおよび非同期操作の同期化などのインタフェースを定義する.
同時に、public interface Promise extends Future{}Special Future which is writableを定義した.Promiseは結果を書き込むことができるFutureを表す.
public interface Promise extends Future {   /**     * Marks this future as a success and notifies all     * listeners.     *     * If it is success or failed already it will throw an {@link IllegalStateException}.     */    Promise setSuccess(V result);   /**     * Marks this future as a success and notifies all     * listeners.     *     * @return {@code true} if and only if successfully marked this future as     *         a success. Otherwise {@code false} because this future is     *         already marked as either a success or a failure.     */    boolean trySuccess(V result);   /**     * Marks this future as a failure and notifies all     * listeners.     *     * If it is success or failed already it will throw an {@link IllegalStateException}.     */    Promise setFailure(Throwable cause);   /**     * Marks this future as a failure and notifies all     * listeners.     *     * @return {@code true} if and only if successfully marked this future as     *         a failure. Otherwise {@code false} because this future is     *         already marked as either a success or a failure.     */    boolean tryFailure(Throwable cause);   /**     * Make this future impossible to cancel.     *     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done     *         without being cancelled.  {@code false} if this future has been cancelled already.     */    boolean setUncancellable();    @Override    Promise addListener(GenericFutureListener extends Future super V>> listener);    @Override    Promise addListeners(GenericFutureListener extends Future super V>>... listeners);    @Override    Promise removeListener(GenericFutureListener extends Future super V>> listener);    @Override    Promise removeListeners(GenericFutureListener extends Future super V>>... listeners);    @Override    Promise await() throws InterruptedException;    @Override    Promise awaitUninterruptibly();    @Override    Promise sync() throws InterruptedException;    @Override    Promise syncUninterruptibly();}
Promiseインタフェースには主にPromise setSuccess(V result)があります. 、  Promise setFailure(Throwable cause); を行ないます.
FutureでGenericFutureListenerリスナーが定義されています.Future操作が完了すると、void operationComplete(F future)throws Exception;インタフェースメソッドが呼び出されます.
/**
 * Listens to the result of a {@link Future}.  The result of the asynchronous operation is notified once this listener
 * is added by calling {@link Future#addListener(GenericFutureListener)}.
 */
public interface GenericFutureListener> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

DefaultPromiseはFuture、Promiseインタフェースのデフォルト実装です.実装方式はApache MinaにおけるDefaultIoFuture実装原理と同様である.
public class DefaultPromise extends AbstractFuture implements Promise {
   private static final AtomicReferenceFieldUpdater RESULT_UPDATER;    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");    private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
static {        @SuppressWarnings("rawtypes")        AtomicReferenceFieldUpdater updater =                PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result");        RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class,                                                                                  Object.class, "result") : updater;    }
 private volatile Object result;    private final EventExecutor executor;   /**     * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.     * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.     *     * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.     */    private Object listeners;   /**     * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().     */    private short waiters;   /**     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the     * executor changes.     */    private boolean notifyingListeners;
   @Override    public Promise awaitUninterruptibly() {        if (isDone()) {            return this;        }        checkDeadLock();        boolean interrupted = false;        synchronized (this) {            while (!isDone()) {                incWaiters();                try {                    wait();                } catch (InterruptedException e) {                   //Interrupted while waiting.                    interrupted = true;                } finally {                    decWaiters();                }            }        }        if (interrupted) {            Thread.currentThread().interrupt();        }        return this;    }
  private boolean setSuccess0(V result) {        return setValue0(result == null ? SUCCESS : result);    }    private boolean setFailure0(Throwable cause) {        return setValue0(new CauseHolder(checkNotNull(cause, "cause")));    }    private boolean setValue0(Object objResult) {        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {            checkNotifyWaiters();            return true;        }        return false;    }
    private void notifyListeners() {        EventExecutor executor = executor();        if (executor.inEventLoop()) {            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();            final int stackDepth = threadLocals.futureListenerStackDepth();            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {                threadLocals.setFutureListenerStackDepth(stackDepth + 1);                try {                    notifyListenersNow();                } finally {                    threadLocals.setFutureListenerStackDepth(stackDepth);                }                return;            }        }        safeExecute(executor, new Runnable() {            @Override            public void run() {                notifyListenersNow();            }        });    }
    private void addListener0(GenericFutureListener extends Future super V>> listener) {        if (listeners == null) {            listeners = listener;        } else if (listeners instanceof DefaultFutureListeners) {            ((DefaultFutureListeners) listeners).add(listener);        } else {            listeners = new DefaultFutureListeners((GenericFutureListener extends Future>) listeners, listener);        }    }
}