Minaのpolling
21600 ワード
pollingには、ポーリングポリシーに基づくselect呼び出しまたは他のタイプのI/Oポーリングシステム呼び出しを実現するベースクラスが以下に含まれる.まず抽象クラスAbstractPollingIoAcceptorを見て、メンバーは以下の通りです.
registerQueueは登録キュー、cancelQueueは登録解除キューです.boundHandlesは、サーバsocketへのアドレスのマッピングテーブルを保存します.bindの処理手順を見てみましょう.
クライアントリクエストを本当に受け入れる作業はAcceptorスレッドで完了し、runメソッドは次のとおりです.
次にregisterHandlesでサーバsocketハンドルを登録します.
ProcessHandlesメソッドは、セッションを処理するために使用され、準備ができている場合にのみここで処理されます.
AbstractPollingIoConnectorはクライアント接続のポーリングポリシーを実現するために使用され、下位層のsocketは絶えず検出され、いずれかのsocketが処理される必要がある場合、起動されて処理されます.
接続の処理方法は、実際の操作もスレッドで行われます.
実際の処理スレッド:
実はここのコードはほとんどはっきりしていません(前のものと似ていて、余計なことは言いません).
public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H> extends AbstractIoAcceptor {
private final IoProcessor<T> processor;
private final boolean createdProcessor;
private final Object lock = new Object();
private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
private volatile boolean selectable;
private Acceptor acceptor;
}
registerQueueは登録キュー、cancelQueueは登録解除キューです.boundHandlesは、サーバsocketへのアドレスのマッピングテーブルを保存します.bindの処理手順を見てみましょう.
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
// BIND REQUEST Future Operation。
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
//
registerQueue.add(request);
// Acceptor ,
startupAcceptor();
// select , bind request 。
wakeup();
// request
request.awaitUninterruptibly();
if (request.getException() != null) {
throw request.getException();
}
//
Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
for (H handle:boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
}
return newLocalAddresses;
}
クライアントリクエストを本当に受け入れる作業はAcceptorスレッドで完了し、runメソッドは次のとおりです.
public void run() {
int nHandles = 0;
while (selectable) {
try {
// keys
int selected = select();
// , Selector OP_ACCEPT,
// , 。
nHandles += registerHandles();
if (selected > 0) {
// OP_ACCEPT socket
processHandles(selectedHandles());
}
//
nHandles -= unregisterHandles();
// nHandles 0, 。
if (nHandles == 0) {
synchronized (lock) {
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
acceptor = null;
break;
}
}
}
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
// 。
if (selectable && isDisposing()) {
selectable = false;
try {
if (createdProcessor) {
processor.dispose();
}
} finally {
try {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
}
}
}
}
次にregisterHandlesでサーバsocketハンドルを登録します.
private int registerHandles() {
for (;;) {
// acceptor services
AcceptorOperationFuture future = registerQueue.poll();
if (future == null) {
return 0;
}
// map
Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
List<SocketAddress> localAddresses = future.getLocalAddresses();
try {
//
for (SocketAddress a : localAddresses) {
H handle = open(a);// , socket
newHandles.put(localAddress(handle), handle);// -socket
}
boundHandles.putAll(newHandles);//
future.setDone();//
return newHandles.size();
} catch (Exception e) {
future.setException(e);
} finally {
//
if (future.getException() != null) {
for (H handle : newHandles.values()) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
wakeup();
}
}
}
}
ProcessHandlesメソッドは、セッションを処理するために使用され、準備ができている場合にのみここで処理されます.
//
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
T session = accept(processor, handle);//
if (session == null) {
break;
}
initSession(session, null, null);
session.getProcessor().add(session);//
}
}
AbstractPollingIoConnectorはクライアント接続のポーリングポリシーを実現するために使用され、下位層のsocketは絶えず検出され、いずれかのsocketが処理される必要がある場合、起動されて処理されます.
public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
private final Object lock = new Object();
private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();//
private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>(); //
private final IoProcessor<T> processor;
private final boolean createdProcessor;
private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
private volatile boolean selectable;
private Connector connector;
}
接続の処理方法は、実際の操作もスレッドで行われます.
protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
H handle = null;
boolean success = false;
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {//
ConnectFuture future = new DefaultConnectFuture();
T session = newSession(processor, handle);//
initSession(session, future, sessionInitializer);
session.getProcessor().add(session);// IoProcessor
success = true;
return future;
}
success = true;
} catch (Exception e) {
return DefaultConnectFuture.newFailedFuture(e);
} finally {
if (!success && handle != null) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
connectQueue.add(request);//
startupWorker();//
wakeup();// select
return request;
}
実際の処理スレッド:
public void run() {
int nHandles = 0;
while (selectable) {
try {
int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);//
int selected = select(timeout);//
nHandles += registerNew();// ,
if (selected > 0) {
nHandles -= processConnections(selectedHandles());//
}
processTimedOutSessions(allHandles());//
nHandles -= cancelKeys();//
if (nHandles == 0) {
synchronized (lock) {
if (connectQueue.isEmpty()) {
connector = null;
break;
}
}
}
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
if (selectable && isDisposing()) {
selectable = false;
try {
if (createdProcessor) {
processor.dispose();
}
} finally {
try {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
}
}
}
}
}
実はここのコードはほとんどはっきりしていません(前のものと似ていて、余計なことは言いません).