ArtemisのJMSクライアントのCompletionHandlerがartemis core apiでどのように伝達および呼び出されたか
ActiveMQChannelHandler
NettyConnectorは、開示されているstartメソッドでは、ChannelのpipelineにActiveMQChannelHandler(io.netty.channel.Channel.ChannelDuplexHandlerに続く)コールバックハンドルを設定している.ActiveMQChannelHandlerの構造関数は次のように定義されています.
BufferHandlerオブジェクトが受信されていることがわかります.そのchannelReadというcallbackメソッドでは、このBufferHandlerオブジェクトbufferReceivedメソッドが呼び出されます.
このように、Netty非同期通信が完了した後、いくつかのコールバックを処理するには、BufferHandlerを実現し、NettyのChannelのpipelineに適切な位置に設定するだけです.
BufferHandler
ClientSessionFactoryImplは、その保護方法createConnectorでNettyConnectorオブジェクトを作成し、DelegatingBufferHandlerに転送します.DelegatingBufferHandlerはBufferHandlerを実現し、Nettyコールバックを処理するために使用できます.
DelegatingBufferHandler
DelegatingBufferHandlerは、ClientSessionFactoryImplクラスで定義されています.
つまり、Nettyがコールバックを実行すると、ClientSessionFactoryのメンバーオブジェクトconnection(タイプ:RemotingConnection)のbufferReceivedメソッドが呼び出されてデータが処理されます.
実はRemotingConnectionもBufferHandlerです
RemotingConnection
RemotingConnection(Impl)は、入力されたbufferに基づいてdecodeがpackageを生成するbufferReceived(connectionID,buffer)メソッドを実現する.bufferReceived=>doBufferReceived(以下、ChannelImpl対応の例は、decodeから出てきたpackage対応のchannelIDからRemotingConnectionImplに含まれるchannelの集合に取得)=>ChannelImpl::doBufferReceived=>ChannelImpl::handlePacket=>ChannelImpl::clearUpTo=>commandConfirmationHandler.commandConfirmed(packet)
例:Artemisで実装されたJMS仕様におけるProducerの非同期配信メッセージ後のコールバック関数がどのように呼び出されるか
ArtemisMQMessageProducerを例に挙げます.彼のsendメソッドでは、最後にcore apiのClientProducerのsendメソッドを呼び出し、JMSのCompletionListenerをパッケージしたcore apiのhandlerであるCompletionListener Wrapper(S e n d AcknowledgementHandlerタイプに継承される)が渡される. ClientProducerのsendメソッドに移動し、doSendメソッドを呼び出します. そしてsendRegularMessageメソッドを呼び出し、sessionContextを呼び出した.sendFullMessageメソッド. セッションContext.sendFullMessageメソッドでは、handlerがpacketに包装され、sessionChannelに伝えられていることがわかります.sendBatched(packet)メソッドは非同期送信に行きました. サーバが返すpacketにも、このhandlerが付いていて、BufferHandlerの実装者RemotingConnection(Impl)のbufferReceivedメソッドがコールバックされ、サーバが返信するpacketのhandlerを解析して実行されます.
packetは、SessionSendMessageタイプのメッセージの別名sessionContextである.sendFullMessageメソッドでは、SendAcknowledgementHandlerをSessionSendMessageタイプのpacketにパッケージし、サーバサーバから返されるpacketに送信します.まずSessionSendMessageタイプに変換し、そこに含まれるSendAcknowledgementHandlerタイプのコールバックhandlerを取得してコールバックを実行します.
CompletionListenerWrapperクラス定義:
NettyConnectorは、開示されているstartメソッドでは、ChannelのpipelineにActiveMQChannelHandler(io.netty.channel.Channel.ChannelDuplexHandlerに続く)コールバックハンドルを設定している.ActiveMQChannelHandlerの構造関数は次のように定義されています.
ActiveMQChannelHandler(final ChannelGroup group,
final BufferHandler handler,
final BaseConnectionLifeCycleListener listener)
BufferHandlerオブジェクトが受信されていることがわかります.そのchannelReadというcallbackメソッドでは、このBufferHandlerオブジェクトbufferReceivedメソッドが呼び出されます.
このように、Netty非同期通信が完了した後、いくつかのコールバックを処理するには、BufferHandlerを実現し、NettyのChannelのpipelineに適切な位置に設定するだけです.
BufferHandler
ClientSessionFactoryImplは、その保護方法createConnectorでNettyConnectorオブジェクトを作成し、DelegatingBufferHandlerに転送します.DelegatingBufferHandlerはBufferHandlerを実現し、Nettyコールバックを処理するために使用できます.
DelegatingBufferHandler
DelegatingBufferHandlerは、ClientSessionFactoryImplクラスで定義されています.
private class DelegatingBufferHandler implements BufferHandler {
@Override
public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
RemotingConnection theConn = connection;
if (theConn != null && connectionID.equals(theConn.getID())) {
theConn.bufferReceived(connectionID, buffer);
} else {
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
}
}
}
つまり、Nettyがコールバックを実行すると、ClientSessionFactoryのメンバーオブジェクトconnection(タイプ:RemotingConnection)のbufferReceivedメソッドが呼び出されてデータが処理されます.
実はRemotingConnectionもBufferHandlerです
RemotingConnection
RemotingConnection(Impl)は、入力されたbufferに基づいてdecodeがpackageを生成するbufferReceived(connectionID,buffer)メソッドを実現する.bufferReceived=>doBufferReceived(以下、ChannelImpl対応の例は、decodeから出てきたpackage対応のchannelIDからRemotingConnectionImplに含まれるchannelの集合に取得)=>ChannelImpl::doBufferReceived=>ChannelImpl::handlePacket=>ChannelImpl::clearUpTo=>commandConfirmationHandler.commandConfirmed(packet)
例:Artemisで実装されたJMS仕様におけるProducerの非同期配信メッセージ後のコールバック関数がどのように呼び出されるか
ArtemisMQMessageProducerを例に挙げます.
packetは、SessionSendMessageタイプのメッセージの別名sessionContextである.sendFullMessageメソッドでは、SendAcknowledgementHandlerをSessionSendMessageタイプのpacketにパッケージし、サーバサーバから返されるpacketに送信します.まずSessionSendMessageタイプに変換し、そこに含まれるSendAcknowledgementHandlerタイプのコールバックhandlerを取得してコールバックを実行します.
CompletionListenerWrapperクラス定義:
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler {
private final CompletionListener completionListener;
private final Message jmsMessage;
private final ActiveMQMessageProducer producer;
/**
* @param jmsMessage
* @param producer
*/
private CompletionListenerWrapper(CompletionListener listener,
Message jmsMessage,
ActiveMQMessageProducer producer) {
this.completionListener = listener;
this.jmsMessage = jmsMessage;
this.producer = producer;
}
@Override
public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
if (jmsMessage instanceof StreamMessage) {
try {
((StreamMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof BytesMessage) {
try {
((BytesMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
try {
producer.connection.getThreadAwareContext().setCurrentThread(true);
completionListener.onCompletion(jmsMessage);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
}
}
@Override
public String toString() {
return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
}
}