Netty 4.1.Xソース再読み込み1 Futureプロファイリング
8651 ワード
Apache MinaでFutureアクションが定義されています.Netty4.1.xによるFuture操作の再設計.
io.netty.util.concurrentパッケージにはpublic interface Future extends javaが定義されています.util.concurrent.JDK Futureインタフェースを実装するFutureインタフェース.
io.netty.util.concurrent.Futureインタフェースは、非同期の操作結果を表すインタフェースです.The result of an asynchronous operation.
public interface Future extends java.util.concurrent.Future {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future addListener(GenericFutureListener extends Future super V>> listener);
Future addListeners(GenericFutureListener extends Future super V>>... listeners);
Future removeListener(GenericFutureListener extends Future super V>> listener);
Future removeListeners(GenericFutureListener extends Future super V>>... listeners);
Future sync() throws InterruptedException;
Future syncUninterruptibly();
Future await() throws InterruptedException;
Future awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
boolean cancel(boolean mayInterruptIfRunning);
}
Futureインタフェースは,非同期操作結果の取得,Listenerおよび非同期操作の同期化などのインタフェースを定義する.
同時に、public interface Promise extends Future{}Special Future which is writableを定義した.Promiseは結果を書き込むことができるFutureを表す.
public interface Promise extends Future { /** * Marks this future as a success and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean trySuccess(V result); /** * Marks this future as a failure and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise setFailure(Throwable cause); /** * Marks this future as a failure and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a failure. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. */ boolean setUncancellable(); @Override Promise addListener(GenericFutureListener extends Future super V>> listener); @Override Promise addListeners(GenericFutureListener extends Future super V>>... listeners); @Override Promise removeListener(GenericFutureListener extends Future super V>> listener); @Override Promise removeListeners(GenericFutureListener extends Future super V>>... listeners); @Override Promise await() throws InterruptedException; @Override Promise awaitUninterruptibly(); @Override Promise sync() throws InterruptedException; @Override Promise syncUninterruptibly();}
Promiseインタフェースには主にPromise setSuccess(V result)があります. 、 Promise setFailure(Throwable cause); を行ないます.
FutureでGenericFutureListenerリスナーが定義されています.Future操作が完了すると、void operationComplete(F future)throws Exception;インタフェースメソッドが呼び出されます.
DefaultPromiseはFuture、Promiseインタフェースのデフォルト実装です.実装方式はApache MinaにおけるDefaultIoFuture実装原理と同様である.
public class DefaultPromise extends AbstractFuture implements Promise {
private static final AtomicReferenceFieldUpdater RESULT_UPDATER; private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
static { @SuppressWarnings("rawtypes") AtomicReferenceFieldUpdater updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result"); RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result") : updater; }
private volatile Object result; private final EventExecutor executor; /** * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}. * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified. * * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor. */ private Object listeners; /** * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). */ private short waiters; /** * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the * executor changes. */ private boolean notifyingListeners;
@Override public Promise awaitUninterruptibly() { if (isDone()) { return this; } checkDeadLock(); boolean interrupted = false; synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } catch (InterruptedException e) { //Interrupted while waiting. interrupted = true; } finally { decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; }
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setFailure0(Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
private void addListener0(GenericFutureListener extends Future super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener extends Future>) listeners, listener); } }
}
io.netty.util.concurrentパッケージにはpublic interface Future extends javaが定義されています.util.concurrent.JDK Futureインタフェースを実装するFutureインタフェース.
io.netty.util.concurrent.Futureインタフェースは、非同期の操作結果を表すインタフェースです.The result of an asynchronous operation.
public interface Future extends java.util.concurrent.Future {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future addListener(GenericFutureListener extends Future super V>> listener);
Future addListeners(GenericFutureListener extends Future super V>>... listeners);
Future removeListener(GenericFutureListener extends Future super V>> listener);
Future removeListeners(GenericFutureListener extends Future super V>>... listeners);
Future sync() throws InterruptedException;
Future syncUninterruptibly();
Future await() throws InterruptedException;
Future awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
boolean cancel(boolean mayInterruptIfRunning);
}
Futureインタフェースは,非同期操作結果の取得,Listenerおよび非同期操作の同期化などのインタフェースを定義する.
同時に、public interface Promise extends Future{}Special Future which is writableを定義した.Promiseは結果を書き込むことができるFutureを表す.
public interface Promise extends Future { /** * Marks this future as a success and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean trySuccess(V result); /** * Marks this future as a failure and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise setFailure(Throwable cause); /** * Marks this future as a failure and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a failure. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. */ boolean setUncancellable(); @Override Promise addListener(GenericFutureListener extends Future super V>> listener); @Override Promise addListeners(GenericFutureListener extends Future super V>>... listeners); @Override Promise removeListener(GenericFutureListener extends Future super V>> listener); @Override Promise removeListeners(GenericFutureListener extends Future super V>>... listeners); @Override Promise await() throws InterruptedException; @Override Promise awaitUninterruptibly(); @Override Promise sync() throws InterruptedException; @Override Promise syncUninterruptibly();}
Promiseインタフェースには主にPromise setSuccess(V result)があります. 、 Promise setFailure(Throwable cause); を行ないます.
FutureでGenericFutureListenerリスナーが定義されています.Future操作が完了すると、void operationComplete(F future)throws Exception;インタフェースメソッドが呼び出されます.
/**
* Listens to the result of a {@link Future}. The result of the asynchronous operation is notified once this listener
* is added by calling {@link Future#addListener(GenericFutureListener)}.
*/
public interface GenericFutureListener> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
DefaultPromiseはFuture、Promiseインタフェースのデフォルト実装です.実装方式はApache MinaにおけるDefaultIoFuture実装原理と同様である.
public class DefaultPromise extends AbstractFuture implements Promise {
private static final AtomicReferenceFieldUpdater RESULT_UPDATER; private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
static { @SuppressWarnings("rawtypes") AtomicReferenceFieldUpdater updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result"); RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result") : updater; }
private volatile Object result; private final EventExecutor executor; /** * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}. * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified. * * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor. */ private Object listeners; /** * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). */ private short waiters; /** * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the * executor changes. */ private boolean notifyingListeners;
@Override public Promise awaitUninterruptibly() { if (isDone()) { return this; } checkDeadLock(); boolean interrupted = false; synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } catch (InterruptedException e) { //Interrupted while waiting. interrupted = true; } finally { decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; }
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setFailure0(Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
private void addListener0(GenericFutureListener extends Future super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener extends Future>) listeners, listener); } }
}