ArtemisのJMSクライアントのCompletionHandlerがartemis core apiでどのように伝達および呼び出されたか


ActiveMQChannelHandler
NettyConnectorは、開示されているstartメソッドでは、ChannelのpipelineにActiveMQChannelHandler(io.netty.channel.Channel.ChannelDuplexHandlerに続く)コールバックハンドルを設定している.ActiveMQChannelHandlerの構造関数は次のように定義されています.
ActiveMQChannelHandler(final ChannelGroup group,
                       final BufferHandler handler,
                       final BaseConnectionLifeCycleListener listener)

BufferHandlerオブジェクトが受信されていることがわかります.そのchannelReadというcallbackメソッドでは、このBufferHandlerオブジェクトbufferReceivedメソッドが呼び出されます.
このように、Netty非同期通信が完了した後、いくつかのコールバックを処理するには、BufferHandlerを実現し、NettyのChannelのpipelineに適切な位置に設定するだけです.
BufferHandler
ClientSessionFactoryImplは、その保護方法createConnectorでNettyConnectorオブジェクトを作成し、DelegatingBufferHandlerに転送します.DelegatingBufferHandlerはBufferHandlerを実現し、Nettyコールバックを処理するために使用できます.
DelegatingBufferHandler
DelegatingBufferHandlerは、ClientSessionFactoryImplクラスで定義されています.
private class DelegatingBufferHandler implements BufferHandler {

      @Override
      public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
         RemotingConnection theConn = connection;

         if (theConn != null && connectionID.equals(theConn.getID())) {
            theConn.bufferReceived(connectionID, buffer);
         } else {
            logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
         }
      }
   }

つまり、Nettyがコールバックを実行すると、ClientSessionFactoryのメンバーオブジェクトconnection(タイプ:RemotingConnection)のbufferReceivedメソッドが呼び出されてデータが処理されます.
実はRemotingConnectionもBufferHandlerです
RemotingConnection
RemotingConnection(Impl)は、入力されたbufferに基づいてdecodeがpackageを生成するbufferReceived(connectionID,buffer)メソッドを実現する.bufferReceived=>doBufferReceived(以下、ChannelImpl対応の例は、decodeから出てきたpackage対応のchannelIDからRemotingConnectionImplに含まれるchannelの集合に取得)=>ChannelImpl::doBufferReceived=>ChannelImpl::handlePacket=>ChannelImpl::clearUpTo=>commandConfirmationHandler.commandConfirmed(packet)
例:Artemisで実装されたJMS仕様におけるProducerの非同期配信メッセージ後のコールバック関数がどのように呼び出されるか
ArtemisMQMessageProducerを例に挙げます.
  • 彼のsendメソッドでは、最後にcore apiのClientProducerのsendメソッドを呼び出し、JMSのCompletionListenerをパッケージしたcore apiのhandlerであるCompletionListener Wrapper(S e n d AcknowledgementHandlerタイプに継承される)が渡される.
  • ClientProducerのsendメソッドに移動し、doSendメソッドを呼び出します.
  • そしてsendRegularMessageメソッドを呼び出し、sessionContextを呼び出した.sendFullMessageメソッド.
  • セッションContext.sendFullMessageメソッドでは、handlerがpacketに包装され、sessionChannelに伝えられていることがわかります.sendBatched(packet)メソッドは非同期送信に行きました.
  • サーバが返すpacketにも、このhandlerが付いていて、BufferHandlerの実装者RemotingConnection(Impl)のbufferReceivedメソッドがコールバックされ、サーバが返信するpacketのhandlerを解析して実行されます.

  • packetは、SessionSendMessageタイプのメッセージの別名sessionContextである.sendFullMessageメソッドでは、SendAcknowledgementHandlerをSessionSendMessageタイプのpacketにパッケージし、サーバサーバから返されるpacketに送信します.まずSessionSendMessageタイプに変換し、そこに含まれるSendAcknowledgementHandlerタイプのコールバックhandlerを取得してコールバックを実行します.
    CompletionListenerWrapperクラス定義:
    private static final class CompletionListenerWrapper implements SendAcknowledgementHandler {
    
          private final CompletionListener completionListener;
          private final Message jmsMessage;
          private final ActiveMQMessageProducer producer;
    
          /**
           * @param jmsMessage
           * @param producer
           */
          private CompletionListenerWrapper(CompletionListener listener,
                                            Message jmsMessage,
                                            ActiveMQMessageProducer producer) {
             this.completionListener = listener;
             this.jmsMessage = jmsMessage;
             this.producer = producer;
          }
    
          @Override
          public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
             if (jmsMessage instanceof StreamMessage) {
                try {
                   ((StreamMessage) jmsMessage).reset();
                } catch (JMSException e) {
                   // HORNETQ-1209 XXX ignore?
                }
             }
             if (jmsMessage instanceof BytesMessage) {
                try {
                   ((BytesMessage) jmsMessage).reset();
                } catch (JMSException e) {
                   // HORNETQ-1209 XXX ignore?
                }
             }
    
             try {
                producer.connection.getThreadAwareContext().setCurrentThread(true);
                completionListener.onCompletion(jmsMessage);
             } finally {
                producer.connection.getThreadAwareContext().clearCurrentThread(true);
             }
          }
    
          @Override
          public String toString() {
             return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
          }
       }