Tomcatソース解析シリーズ(14)PollerとPoller Event
前書きこの文章では、NioEntの起動中に複数のPollerオブジェクトが作成され、Pollerスレッドが起動されたと述べています。前の文章でAccepttorのrun方法を紹介し、Accepttorの仕事はクライアントの接続を受けてPollerスレッド処理に渡すことであり、ここではPollerとPoller Eventを分析する。PollerとPoller Eventは共にNioEntの内部クラスです。
1.Poller Event Curn Acctortスレッドは、受け取った接続をPoller Eventオブジェクトにパッケージ化し、一つの列に参加してPollerスレッドの実行を待つ。Poller EventはRunnableインターフェースを実現していますので、run方法はその重要な方法です。
else文ブロックの論理も複雑ではなく、入ってきたinterestOps操作をSocketChanel関連のSelection Keyに付加するか、または関連付けられているSelective Keyをキャンセルするかです。
2.Poller_Pollr_PollerがRunnableを実現しました。そのrun方法が鍵です。
後はSelector.selectedKeys()を呼び出して、傍受されたSelection Keyのセットを取得し、processKey(sk,atachment)を呼び出します。これはnioプログラミングにおける通常の操作です。Selecticyのat tachmentはNioSocketWrapperの対象であり、この対象はPoller Eventを構成して入ってきたもので、Poller钮register方法にあります。
2.1.Poller((zhi processKeyrocessKey)方法はSelective Keyを処理する鍵となります。
processSendfileとは、FileChanel autメソッドを呼び出してデータを送るものです。この方法は重点ではないので、ここで詳しく解析しません。
processKey方法はprocessSocketを呼び出す方法で、それぞれOPを処理します。READとOP_WRITEイベントは、入ってきた二つ目のパラメータは、それぞれSocketEvent.OPEN(u)です。READとSocketEvent.OPEN_WRITE、三つ目のパラメータはtrueです。dispatchのtrueは別のスレッドで処理され、falseはPollerスレッドで処理されます。このprocessSocketはAbstract Endpointの中の方法です。
2.2.Abstract End point萼processSocket
SocketProcessorsorBaseの対象を取得した後、伝えられたdispatchはtrueであるため、このSocketProcessors Baseをexectorに捨てて処理します。SocketProcesssorBaseがRunnableを実現しました。exectorはAbstract Endpointメソッドで初期化されたもので、createExectorはこの文章で紹介されていますので、ここでは説明する必要がありません。
SocketProccessorBaseの内容は以下の通りです。
2.3.SocketProcessor葃doRun
続いて、2番目のif-else文ブロックです。handshakeの値によって異なる処理をします。handshakeの値がSelective Key.OP(u)であれば。READまたはセレクションKey.OP_WRITEは、socketWrapper.register ReadInterest()またはsocketWrapper.register WriteInterest()を呼び出して興味のあるイベントを再登録します。
二つ目のif-else文ブロックのifブロックでは、get Handler()を呼び出します。process(socketWrapper,event)で処理します。それから一つのSocketStateの対象stateを得て、もしstateの値がSocketStte.C.LOSEDならば、close(socket,key)の方法を実行します。
get Handler()はAbstractEndpointの方法です。
このHandlerはAbstractHttp 11 Protocolの構造方法で初期化されたConnection Handlerオブジェクトです。これはこの文章で述べられていますが、ここでは詳しく説明しません。Connection Handlerは次の文章で紹介します。ここでは多くの話をしません。
本稿では、Poller EventとPollerのrun方法を分析した。ここで、Poller Event芳ル方法は、SocketChanelの読みや書き込みを登録するPollerのselectorにある。まずキャッシュ・キューの中のPoller Eventを処理して、そしてselector.selectKeys()が返したSelectory、つまりSocketChanelの読み書きイベントを処理します。
1.Poller Event Curn Acctortスレッドは、受け取った接続をPoller Eventオブジェクトにパッケージ化し、一つの列に参加してPollerスレッドの実行を待つ。Poller EventはRunnableインターフェースを実現していますので、run方法はその重要な方法です。
private NioChannel socket;
private NioSocketWrapper socketWrapper;
@Override
public void run() {
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
interestOpsは構造方法で伝えられました。Poller Eventの構造方法は二つのところで使われています。一つはPoller_register方法で、つまり前の文章で述べられています。もう一つはPoller_add方法で、このadd方法の呼び出し点はいくつかあります。WRITEまたはSelection Key.OP_READif文ブロックの中で、socketは構造方法で伝えられたNioChanelの対象です。protected SocketChannel sc = null;
public SocketChannel getIOChannel() {
return sc;
}
NioChannel莹getIOChanelは、オブジェクトのSocketChanelに戻ってきました。このオブジェクトは、Nio Chanelを作成したオブジェクトが着信したもので、Accepttorスレッド内でendpoint.serverSocketAccept()を呼び出して取得したオブジェクトです。sockett.getPoller().get Selector()はPollerのSelectorタイプを取得する対象です。private Selector selector;
public Poller() throws IOException {
this.selector = Selector.open();
}
public Selector getSelector() { return selector;}
このselectorはPoller構造法で初期化されたもので、一つのPollerにはSelectorオブジェクトがあることが分かります。if文ブロックには、SocketChanelオブジェクトをPoller内部に登録するSelectorオブジェクトであり、NioSocketWrapperオブジェクトが付加されています。登録されている興味のあるイベントはSelecticy.OP_です。READ、つまり、このSelectorの対象はこのSocketChanelの読みイベントを傍受します。else文ブロックの論理も複雑ではなく、入ってきたinterestOps操作をSocketChanel関連のSelection Keyに付加するか、または関連付けられているSelective Keyをキャンセルするかです。
2.Poller_Pollr_PollerがRunnableを実現しました。そのrun方法が鍵です。
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
runメソッドではif(!close)ブロックを先に実行します。まずeventsメソッドを呼び出しました。/**
* Processes events in the event queue of the Poller.
*
* @return true
if some events were processed,
* false
if queue was empty
*/
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error(sm.getString("endpoint.nio.pollerEventError"), x);
}
}
return result;
}
events()の方法は、eventsというキューの中のPoller Eventのrun方法を実行し、その後、Poller Eventオブジェクトをevent Cacheに置いて、多重化しやすいようにします。Poller Event〓〓〓方法は上で言いました。その後、wakeupCounterの値からselector.selectNow()かselector.selector(selectorTimeout)かを判断します。wakeup Counterの値はPoller〓addEventの中で1のを増加します。その後if(close)ステートメントブロックに入り、events()メソッドを呼び出して、timeout(0,false)とselector.close()メソッドを呼び出します。後はSelector.selectedKeys()を呼び出して、傍受されたSelection Keyのセットを取得し、processKey(sk,atachment)を呼び出します。これはnioプログラミングにおける通常の操作です。Selecticyのat tachmentはNioSocketWrapperの対象であり、この対象はPoller Eventを構成して入ってきたもので、Poller钮register方法にあります。
2.1.Poller((zhi processKeyrocessKey)方法はSelective Keyを処理する鍵となります。
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
atachment.get SendfileData()がnullでないとprocessSendfile方法で処理されることが分かります。プロcessKeyメソッドを呼び出して処理します。processSendfileとは、FileChanel autメソッドを呼び出してデータを送るものです。この方法は重点ではないので、ここで詳しく解析しません。
processKey方法はprocessSocketを呼び出す方法で、それぞれOPを処理します。READとOP_WRITEイベントは、入ってきた二つ目のパラメータは、それぞれSocketEvent.OPEN(u)です。READとSocketEvent.OPEN_WRITE、三つ目のパラメータはtrueです。dispatchのtrueは別のスレッドで処理され、falseはPollerスレッドで処理されます。このprocessSocketはAbstract Endpointの中の方法です。
2.2.Abstract End point萼processSocket
/**
* External Executor based thread pool.
*/
private Executor executor = null;
public Executor getExecutor() { return executor; }
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
processSocket方法はまずprocessorCacheのキャッシュからSocketProcessorsorBaseのオブジェクトを取得します。processorCacheはNioEnt point(唳startInternal)で初期化されました。取得できない場合は、createSocketProcessorメソッドを呼び出して作成します。SocketProcesssorBaseオブジェクトを作成した時に、SocketWrapperBase(つまり、NioSocketWrapperオブジェクト)とSocketEventオブジェクトが入ってきました。createSocketProcessor方法はabstractのですが、今はNioEnd pointにあります。@Override
protected SocketProcessorBase createSocketProcessor(
SocketWrapperBase socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor extends SocketProcessorBase
NioEnd point荐createSocketProcessossor方法は簡単にSocketProcessorオブジェクトを作成することです。SocketProcessorはNioEndpointの内部類です。SocketProcessorsorBaseの対象を取得した後、伝えられたdispatchはtrueであるため、このSocketProcessors Baseをexectorに捨てて処理します。SocketProcesssorBaseがRunnableを実現しました。exectorはAbstract Endpointメソッドで初期化されたもので、createExectorはこの文章で紹介されていますので、ここでは説明する必要がありません。
SocketProccessorBaseの内容は以下の通りです。
public abstract class SocketProcessorBase implements Runnable {
protected SocketWrapperBase socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that processing
// does not occur in parallel. The test below ensures that if the
// first event to be processed results in the socket being closed,
// the subsequent events are not processed.
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
protected abstract void doRun();
}
SocketProcessors Base((zhu run)メソッドは簡単で、抽象的な方法doRun()を呼び出します。そのために重要なのはSocketProcessor〓〓〓doRun方法です。2.3.SocketProcessor葃doRun
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
doRunメソッドでは、最初のif-else文ブロックでhandshak変数の値を決定します。まずsockete.ishandschake Complettee()を呼び出します。つまりNio Chanel嗳ishandayhakeCompletterです。public boolean isHandshakeComplete() {
return true;
}
直接trueに戻ります。理論上のelseの文は全部実行されなくなりました。実はhandshakeはHTTPSの内容で、NioChanelはhandshakeを処理しませんが、Nio ChanelのサブクラスSecureNio Chanelで処理します。SecureNio Chanelは本文の重点ではないので、ここでは多く紹介しません。最初のif-else文のブロックでは、handshakeの値はすでに0です。続いて、2番目のif-else文ブロックです。handshakeの値によって異なる処理をします。handshakeの値がSelective Key.OP(u)であれば。READまたはセレクションKey.OP_WRITEは、socketWrapper.register ReadInterest()またはsocketWrapper.register WriteInterest()を呼び出して興味のあるイベントを再登録します。
@Override
public void registerReadInterest() {
getPoller().add(getSocket(), SelectionKey.OP_READ);
}
@Override
public void registerWriteInterest() {
getPoller().add(getSocket(), SelectionKey.OP_WRITE);
}
この二つの方法は、つまりPoller钾add方法を呼び出すということです。/**
* Add specified socket and associated pool to the poller. The socket will
* be added to a temporary array, and polled first after a maximum amount
* of time equal to pollTime (in most cases, latency will be much lower,
* however).
*
* @param socket to add to the poller
* @param interestOps Operations for which to register this socket with
* the Poller
*/
public void add(final NioChannel socket, final int interestOps) {
PollerEvent r = eventCache.pop();
if ( r==null) r = new PollerEvent(socket,null,interestOps);
else r.reset(socket,null,interestOps);
addEvent(r);
if (close) {
NioEndpoint.NioSocketWrapper ka = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
processSocket(ka, SocketEvent.STOP, false);
}
}
Poller_addとは、Poller Eventオブジェクトを作成し、このオブジェクトを加入するキャッシュ・キューの中でPollerスレッドの処理を待っています。SecureNio Channelでは、handshakeはSecureNio Channelの処理によってSelection Key.OP_に戻るかもしれません。READまたはセレクションKey.OP_WRITEでも、Nio Channelの中でhandshakは0だけです。二つ目のif-else文ブロックのifブロックでは、get Handler()を呼び出します。process(socketWrapper,event)で処理します。それから一つのSocketStateの対象stateを得て、もしstateの値がSocketStte.C.LOSEDならば、close(socket,key)の方法を実行します。
get Handler()はAbstractEndpointの方法です。
private Handler handler = null;
public Handler getHandler() { return handler; }
Handlerは汎型Sを持っています。この汎型はAbstractEndpointのSです。HandlerもAbstractEntpointの内部インターフェースです。NioEntendpointとその父のAbstractJsseEndpointの声明の中でこの汎型Sの具体的なタイプがNioChanelであることが分かります。このHandlerはAbstractHttp 11 Protocolの構造方法で初期化されたConnection Handlerオブジェクトです。これはこの文章で述べられていますが、ここでは詳しく説明しません。Connection Handlerは次の文章で紹介します。ここでは多くの話をしません。
本稿では、Poller EventとPollerのrun方法を分析した。ここで、Poller Event芳ル方法は、SocketChanelの読みや書き込みを登録するPollerのselectorにある。まずキャッシュ・キューの中のPoller Eventを処理して、そしてselector.selectKeys()が返したSelectory、つまりSocketChanelの読み書きイベントを処理します。