kafkaクラスタBrokerエンドはReactorモード要求処理プロセスに基づいて深く分析する-kafkaビジネス環境実戦

14004 ワード

本セットの技術コラムは著者(秦凱新)の普段の仕事の総括と昇華であり、実際のビジネス環境からケースを抽出して総括と分かち合い、ビジネス応用の最適化提案とクラスタ環境容量計画などの内容を提供することによって、本セットのブログに引き続き注目してください.IOT時代で最も戦闘力のあるチームに参加することを期待しています.QQメールアドレス:[email protected]学術交流があれば、いつでも連絡することができます.
1 Reactorシングルスレッドケースコードウォーミングアップ
  • は以下のように単一スレッドのJAVA NIOプログラミングモデルである.
  • まずサービス・エンドがServer SocketChannelオブジェクトを作成し、Selectに登録してOP_ACCEPTイベントは、サーバSocketChannelが指定したポートの接続要求をリスニングします.
  • クライアントはいったんServer SocketChannelに接続すると、Acceptorを起動してOP_を処理するACCEPTイベントは、クライアントからの接続のSocket Channelを作成し、非ブロックモードに設定し、そのSelectorにOP_を登録します.READまたはOP_WRITEは、最終的にクライアントとサービス側の接続確立とデータチャネルの開通を実現する.
  • クライアントが確立したSocketChannelに要求を送信すると、サービス側のSelectorはOP_を傍受するREADイベントは、対応する処理ロジックをトリガする.サービス側がクライアントにデータを書き込むと、サービス側SelectorのOP_がトリガーされます.WRITEイベントは、応答の処理ロジックを実行する.
  • ここで明らかな問題は、すべての時間の処理ロジックがAcceptor単一スレッドで完了しており、同時接続数が小さく、データ量が小さいシーンでは問題ありませんが...
  • Selectorは、複数のチャネルを操作する単一のスレッドを許可する.我々のアプリケーションで複数のチャネルを使用する場合、Selectorを使用することでこのような目的を容易に実現することができるが、1つのスレッドで複数のチャネルを使用するため、各チャネルの伝送効率の低下をもたらす.
  • 最適化点は、チャネル接続|読み出しまたは書き込み|トラフィック処理は、いずれも単一スレッドで処理されることである.スレッドプールまたはMessageQueue共有キューにより、同時性の高い処理要件をさらに最適化することで、同じ時間に大量のI/Oイベントが発生した場合、個別のSelectがイベントの配布時にブロック(または遅延)し、ボトルネックになる可能性があるという問題を解決します.
      public class NioEchoServer {
      private static final int BUF_SIZE = 256;
      private static final int TIMEOUT = 3000;
    
      public static void main(String args[]) throws Exception {
          //       Socket
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
          //    Selector
          Selector selector = Selector.open();
    
          //     Socket   8080  ,          
          serverSocketChannel.socket().bind(new InetSocketAddress(8080));
          serverSocketChannel.configureBlocking(false);
    
          //   channel     selector  .
          //             OP_ACCEPT   ,     OP_ACCEPT    ,      Channel   OP_READ
          //     Selector  .
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
          while (true) {
              //      select   ,       channel I/O    
              if (selector.select(TIMEOUT) == 0) {
                  System.out.print(".");
                  continue;
              }
    
              //    I/O       SelectionKey,    SelectionKey        Channel     I/O       .
              Iterator keyIterator = selector.selectedKeys().iterator();
    
              while (keyIterator.hasNext()) {
    
                  SelectionKey key = keyIterator.next();
    
                  //       SelectionKey  ,       ,           IO        .
                  keyIterator.remove();
    
                  if (key.isAcceptable()) {
                      //   OP_ACCEPT      ,       ServerSocketChannel       SocketChannel,
                      //         
                      //   ,   OP_ACCEPT    ,   key.channel()     Channel   ServerSocketChannel.
                      //    OP_WRITE   OP_READ  ,   key.channel()      SocketChannel.
                      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                      clientChannel.configureBlocking(false);
                      //  OP_ACCEPT    ,      Channel   OP_READ     Selector  .
                      //   ,            OP_READ   ,   interest set     OP_CONNECT   ,    select          .
                      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                  }
    
                  if (key.isReadable()) {
                      SocketChannel clientChannel = (SocketChannel) key.channel();
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      long bytesRead = clientChannel.read(buf);
                      if (bytesRead == -1) {
                          clientChannel.close();
                      } else if (bytesRead > 0) {
                          key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                          System.out.println("Get data length: " + bytesRead);
                      }
                  }
    
                  if (key.isValid() && key.isWritable()) {
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      buf.flip();
                      SocketChannel clientChannel = (SocketChannel) key.channel();
    
                      clientChannel.write(buf);
    
                      if (!buf.hasRemaining()) {
                          key.interestOps(OP_READ);
                      }
                      buf.compact();
                  }
              }
          }
      }
    

  • }
    2 Kafka Reactorモード設計構想
  • SelectionKey.OP_READ:Socketは、対応するデータ
  • をリモートから送信するイベントを読み取る.
  • SelectionKey.OP_WRITE:Socket書き込みイベント、すなわちリモートへのデータ送信
  • SelectionKey.OP_CONNECT:Socket接続イベントは、クライアントがリモートサーバと接続を確立する際にSelectorに登録するものであり、接続が確立すると、対応するSocketChannelが用意され、ユーザは対応するkeyからSocketChannelを取り出すことができる.
  • SelectionKey.OP_ACCEPT:Socket接続は、サーバ側がサーバ側のSocketChannelを介してポートの傍受をバインドし、SocketChannelに対応するsocketをサービス側のSelectorに登録し、そのOP_に注目するイベントを受け入れます.ACCEPTイベント.
  • Kafkaのネットワーク層エントリクラスはSocketServerです.知ってるわKafkaはKafka Brokerの入口類、kafkaです.Kafka.main()は、Kafkaサーバのmain()メソッド、すなわちKafka Brokerの起動エントリである.メソッドに沿ってスタックkafkaを呼び出すコードを追跡する.Kafka.main() -> KafkaServerStartable() -> KafkaServer().startupはmain()メソッドエントリからSocketServer、すなわちネットワーク層オブジェクトの作成まで追跡できます.これは、Kafka Serverが起動するとSocketServerが初期化され、起動することを意味します.
  • Acceptorの構成方法では、まずopenServerSocket()を介して自分の担当するEndPointのSocketを開き、すなわちポートを開き、リスニングを開始する.その後、Acceptorは、自分が管理する1つまたは複数のProcessorオブジェクトの構築を担当します.実際には、各プロセスは独立したスレッドです.
       private[kafka] class Acceptor(val endPoint: EndPoint,
                                        val sendBufferSize: Int,
                                        val recvBufferSize: Int,
                                        brokerId: Int,
                                        processors: Array[Processor],
                                        connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
      
        private val nioSelector = NSelector.open()
        val serverChannel = openServerSocket(endPoint.host, endPoint.port)//    ServerSocketChannel,  endPoint.host, endPoint.port   
      
        //Acceptor             processor  
        this.synchronized {
          //  processor        
          processors.foreach { processor =>
            Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
          }
        }
    
  • Acceptorスレッドのrun()メソッドは、対応するServerChannel上の接続要求を絶えず傍受し、新しい接続要求がある場合は、この要求を処理するためのProcessorを選択し、この新しい接続をProcessorに渡すメソッドAcceptorである.accept()
     def accept(key: SelectionKey, processor: Processor) {
         val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//  channel
         val socketChannel = serverSocketChannel.accept()//  socketChannel,             
         try {
           //socketChannel    
           processor.accept(socketChannel)// SocketChannel  process    
         } catch {
           //    
         }
       }
     
     //Processor.accept():
      /**
        * Queue up a new connection for reading
        */
       def accept(socketChannel: SocketChannel) {
         newConnections.add(socketChannel)
         wakeup()
       }
    
  • 各プロセスは、このプロセス上のすべてのchannelのリスニングのみを担当する個別のKSelectorオブジェクトを維持します.これにより、非同期IOの場合、1つのSelectorが何百ものsocketChannelのステータスモニタリングを担当しても、異なるProcessorスレッド間の完全な並列性とトラフィック分離が最大限に保証されます.
       override def run() {
          startupComplete()//           ,    CountDownLatch         ,  Processor         
          while (isRunning) {
            try {
              // setup any new connections that have been queued up
              configureNewConnections()//          OR_READ  
              // register any new responses for writing
              processNewResponses()//      ,       Handler         ,    RequestChannel.responseQueue.    unmute,      
              poll()  //  KSelector.poll(),         
              processCompletedReceives()//  mute,        
              processCompletedSends()
              processDisconnected()
            } catch {
              //      
          }
      
          debug("Closing selector - processor " + id)
          swallowError(closeAll())
          shutdownComplete()
       }
    
  • KSelector.register()メソッドは、リモートクライアントまたは他のサーバの読み取り要求(OP_READ)のバインドおよび処理を開始する.KSelect.register()メソッドは、サービス側のSocketChannelをサーバ側のnioSelectorに登録し、SelectionKeyに注目する.OP_READ、すなわち、リードリクエストが発生した場合、対応するチャンネルを取り出して処理することができる.ここでのChannelもKafkaがカプセル化された後のKafka Channelオブジェクト
      public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
              SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
              //   SocketServer             , channelBuilder @Code PlainTextChannelBuilder
              KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//    KafkaChannel
              key.attach(channel);// KafkaChannel  attach   registration,        SelectionKey.attachment()      
              this.channels.put(id, channel);//    Channel
          }
    
  • である.
  • Processor.プロセスCompletedReceives()は、completedReceivesを巡回することにより、受信が完了したデータごとにデータを解析してカプセル化し、RequestChannelに渡し、RequestChannelは特定のビジネス処理層に渡して処理する.
    *  completedReceived        ,   requestQueue.completRequets
     */
    private def processCompletedReceives() {
      selector.completedReceives.asScala.foreach { receive =>//   receive   NetworkReceivedui'xiagn
        try {
          //receive.source              ,KSelector   channel          SocketChannel       
          val channel = selector.channel(receive.source)
          val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
            channel.socketAddress)
          val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
          requestChannel.sendRequest(req)//     RequestChannel.requestQueue   Handler
          selector.mute(receive.source)//    Read  ,      ,          
        } catch {
          //      
        }
      }
    }
    
  • 詳細ソース剖析は以下のブログを参考にして、非常に詳細を説明してください.
      https://blog.csdn.net/zhanyuanlin/article/details/76556578
      https://blog.csdn.net/zhanyuanlin/article/details/76906583
    
  • RequestChannelは、メッセージがネットワーク層からビジネス層に転送され、ビジネス層の処理結果がネットワーク層に渡され、クライアントに返されることを担当する.各SocketServerにはRequestChannelオブジェクトが1つしかなく、SocketServerで構築されています.RequestChannel構築メソッドでは、ネットワーク・レイヤが受信したリクエストを格納するためにrequestQueueが初期化され、これらのリクエストはビジネス・レイヤに渡されて処理されます.同時に、responseQueesを初期化し、各プロセスに対してresponseキューを確立し、このプロセスの1つ以上のResponseを格納し、これらのresponseはネットワーク層に渡されてクライアントに返される.
      //  RequestChannel, totalProcessorThreads responseQueue  ,
        val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
      class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
        private var responseListeners: List[(Int) => Unit] = Nil
        //request     Processor        ,   requestQueue                 
        private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
        //responseQueues     Processor     response,    Processor    response queue
        private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
        for(i 
  • KafkaApisはKafkaのAPIインタフェース層であり、ツールクラスとして理解され、要求を解析して要求タイプを取得し、要求タイプに応じて対応するビジネス層
      class KafkaRequestHandlerPool(val brokerId: Int,
                                val requestChannel: RequestChannel,
                                val apis: KafkaApis,
                                numThreads: Int) extends Logging with KafkaMetricsGroup {
          
            /* a meter to track the average free capacity of the request handlers */
            private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
          
            this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
            val threads = new Array[Thread](numThreads)
            //    KafkaRequestHandler         
            val runnables = new Array[KafkaRequestHandler](numThreads)
            for(i 
  • に要求を渡す役割を果たす.
  • KafkaRequestHandler.run()メソッドは,リクエストをrequestQueueから絶えず取り出し,API層トラフィック処理ロジックを呼び出して処理する
       def run() {
          while(true) {
            try {
              var req : RequestChannel.Request = null
              while (req == null) {
              // 
              req = requestChannel.receiveRequest(300)// RequestChannel.requestQueue     
              // 
              apis.handle(req)//  KafkaApi.handle(),        
            } catch {}
          }
        }
    
  • .
    3パラメータチューニング設定
  • numProcessorThreads:num.network.threadsは、単一のAcceptorが管理するProcessorオブジェクトの数を構成します.
  • maxQueuedRequests:queued.max.requestsは、要求キューが許可する最大の未応答要求の数を構成し、ConnectionQuotasに要求限度制御を行い、Kafkaサーバに過大なネットワーク負荷を発生させないようにする.
  • totalProcessorThreads:numProcessorThreads*endpoints.size、すなわち1台の機械の総Processorの数;
  • max ConnectionsPerIp:構成項目はmax.connections.per.ipは、単一のIP上の最大接続数であり、ConnectionQuotasに接続数を制御するために使用される.
  • num.io.threads:KafkaRequestHanderが実際にキューから取得して実行するスレッドの数を示します.デフォルトは8です.

  • 4まとめ
  • は、Acceptor、Processor、RequestChannel、KafkaRequestHandlerおよびKafkaApisの複数のロールの解析により、クライアントから接続を確立し、Kafkaサーバに要求を送信するAcceptorというKafkaのメッセージフローの閉ループ全体の処理を完了し、さらにProcessor、KafkaサーバによってKafkaRequestHandlerの具体的な業務に要求を渡して処理し、業務は処理結果をネットワーク層に返し、ネットワーク層は、結果をNIOを介してクライアントに返します.
  • マルチプロセススレッド、およびKafkaRequestHandlerPollスレッドプールの存在により、ブロック待ちではなく配信-取得方式により、メッセージ処理全体が完全に非同期化され、各ロールがそれぞれの役割を果たし、モジュール間に結合がなく、スレッド間または相互にタスクを競合したり、上位層に処理部分タスクを手配されたりすることで、効率が非常に高く、構造もかなり明確である
  • .
  • 本文は大量の技術ブログを参考にして、個人の理解を加えて、ソースコードを歩いてこの学習ノートを完成することを通じて、苦労して文を成し遂げて、本当に容易ではありませんて、それぞれ大切にします.
  • 秦凱新深セン