kafkaクラスタController選挙と責任設計構想アーキテクチャ詳細-kafkaビジネス環境実戦

9211 ワード

本セットの技術コラムは著者(秦凱新)の普段の仕事の総括と昇華であり、実際のビジネス環境からケースを抽出して総括と分かち合い、ビジネス応用の最適化提案とクラスタ環境容量計画などの内容を提供することによって、本セットのブログに引き続き注目してください.IOT時代で最も戦闘力のあるチームに参加することを期待しています.QQメールアドレス:[email protected]学術交流があれば、いつでも連絡することができます.
1何でもできるコントローラ
  • あるbrokerが選ばれて特殊な役割を果たしたのがコントローラControllerです.
  • LeaderはzookeeperにWatcherを登録し、他のbrokerはzookeeperの状態変化をほとんど傍受しない.
  • Controllerクラスタは、Kafkaクラスタを管理および調整するために使用され、具体的には、クラスタ内のすべてのパーティションの状態およびパーティション対応レプリカの状態を管理するために使用される.
  • Kafkaクラスタはいつでも1つのコントローラしかありません.クラスタが起動すると、すべてのbrokerがコントローラの選挙に参加し、最終的には1つのbrokerしか勝つことができません.
  • Controllerメンテナンスの状態は2種類に分けられます:1:各Broker上の対応するパーティションコピーを管理します.2:各Topicパーティションの状態を管理する.
  • KafkaControllerコアコード(コピーステートマシンとパーティションステートマシン
      class KafkaController(val config : KafkaConfig, zkClient: ZkClient, 
      val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
          this.logIdent = "[Controller " + config.brokerId + "]: "
          private var isRunning = true
          private val stateChangeLogger = KafkaController.stateChangeLogger
          val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
    
          val partitionStateMachine = new PartitionStateMachine(this)
          val replicaStateMachine = new ReplicaStateMachine(this)
          
          private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
          onControllerResignation, config.brokerId)
          // have a separate scheduler for the controller to be able to start and stop independently of the
          // kafka server
          private val autoRebalanceScheduler = new KafkaScheduler(1)
          var deleteTopicManager: TopicDeletionManager = null
          val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
          private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
          private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
          private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
          private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
          
          private val partitionReassignedListener = new PartitionsReassignedListener(this)
          private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
    
  • を含む)
  • KafkaControllerは5種類のselector選挙器
      1、ReassignedPartitionLeaderSelector
          ISR        leader,    ISR    ISR,             LeaderAndIsr       。
      2、PreferredReplicaPartitionLeaderSelector
         assignedReplicas            leader  ,     ,             leader。
      3、ControlledShutdownLeaderSelector
       ISR                 ,       ISR  ,           leader,     AR    LeaderAndIsr     。
      4、NoOpLeaderSelector
               ,     leader isr。
      5、OfflinePartitionLeaderSelector
          ISR     broker  leader,  ISR        ,  assignedReplicas         leader,leader        Zookeeper ,        。
    
  • を定義した.
  • kafka修正パーティションとコピー数
      ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe  --topic test1
      
      Topic:test1       PartitionCount:3        ReplicationFactor:2     Configs:
      Topic: test1      Partition: 0    Leader: 2       Replicas: 2,4   Isr: 2,4
      Topic: test1      Partition: 1    Leader: 3       Replicas: 3,5   Isr: 3,5
      Topic: test1      Partition: 2    Leader: 4       Replicas: 4,1   Isr: 4,1
    
  • topicパーティション拡張
      ./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
    
  • 2 ReplicaStateMachine(ZK永続化レプリカ割当スキーム)
  • Replicaには7つの状態がある:
      1 NewReplica:  partition reassignment  KafkaController  New replica
      
      2 OnlineReplica:    replica    parition assingned replicas 
           OnlineReplica,       OnlineReplica
      
      3 Online   parition     leader isr    
      
      4 OfflineReplica:    broker down ,    replica   die,       Onffline;
      ReplicaDeletionStarted:    replica        ,      ReplicaDeletionStarted
      
      5 ReplicaDeletionSuccessful: Replica     ,      ReplicaDeletionSuccessful
      
      6 ReplicaDeletionIneligible: Replica     ,      ReplicaDeletionIneligible
      
      7 NonExistentReplica:  Replica     ,  ReplicaDeletionSuccessful     NonExistentReplica  
    
  • Replica StateMachineのファイル:core/src/main/scala/kafka/controller/Replica StateMachine.scala
  • startup:ReplicaStateMachine
  • を起動する
  • initializeReplicaState:各replicaの状態を初期化し、replicaが存在するbrokerがlive状態である場合、このreplicaの状態はOnlineReplicaである.
  • 処理Online状態に遷移可能なReplica,handleStateChanges(controllerContext.allLiveReplicas()、OnlineReplica)を処理し、LeaderAndIsrRequestを各broker nodes:handleStateChanges(controllerContext.allLiveReplicas()、OnlineReplica)
  • に送信する
  • topicを作成すると、そのtopicの下のすべてのパーティションのすべてのコピーがNonExistentになります.
  • controllerがZookeeperのtopicの各パーティションのすべてのコピー情報をメモリにロードすると、コピーのステータスがNewに変更されます.
  • 以降、controllerは、パーティションコピーリストの最初のコピーをパーティションのleaderコピーとして選択し、すべてのコピーをISRに設定し、Zookeeperで決定を永続化する.
  • パーティションのリーダーおよびISRが決定されると、controllerはこれらのメッセージをすべてのコピーに要求的に送信する.
  • は、これらのレプリカ状態をクラスタのすべてのbrokerに同期して認識させる.
  • 最後にcontrollerは、パーティションのすべてのコピーステータスをオンラインに設定します.

  • 3 partitionStateMachine(レプリカ割り当てスキームに基づいてパーティションを作成)
  • Partitionには、次の4つの状態がある
      NonExistentPartition:   partition                 ;
      NewPartition:   parition   , replicas     , leader/isr    ;
      OnlinePartition:   partition leader  ;
      OfflinePartition:   partition leader  ,  parition   OfflinePartition;
    
  • .
  • Topicを作成すると、controllerはパーティションオブジェクトを作成します.まず、すべてのパーティションステータスをNonExistentに設定します.
  • 以降、Zookeeperレプリカ割当スキームを読み出し、パーティション状態をNewPartionに設定する.
  • NewPartion状態のパーティションにはまだリーダーとISRがないため、ControllerはリーダーとISR情報を初期化し、パーティションの状態をOnlinePartionに設定し、パーティションは正常に動作します.
  • 本セットの技術コラムは著者(秦凱新)の普段の仕事の総括と昇華であり、実際のビジネス環境からケースを抽出することによって総括と共有を行い、ビジネス応用の最適化提案とクラスタ環境容量計画などの内容を提供し、本セットのブログに引き続き注目してください.IOT時代で最も戦闘力のあるチームに参加することを期待しています.QQメールアドレス:[email protected]学術交流があれば、いつでも連絡することができます.

  • 4 Controller職責所在(znode状態変化を傍受して実行する)
  • UpdateMetadataRequest:メタデータ要求の更新(たとえば、topicに何個のパーティションがあるか、各パーティションのleaderがどのbroker上にあるか、パーティションのコピーリスト)は、クラスタの実行に伴っていつでも変更され、変更が発生すると、controllerは最新のメタデータを生存しているすべてのbrokerにブロードキャストします.具体的には、全てのブローカーにUpdateMetadataRequest要求
  • を送信する.
  • CreateTopics:topicリクエストを作成します.現在、API方式、スクリプト方式(--create)、またはCreateTopicsリクエスト方式によってtopicを作成する場合、ほとんどがZookeeperの/brokers/topicsの下でznodeを作成して作成ロジックをトリガーし、controllerはpathの変更を傍受して真の「topicの作成」ロジック
  • を実行します.
  • DeleteTopics:topicリクエストを削除します.CreateTopicsと同様に、Zookeeperの下の/admin/delete_を作成することによってtopics/ノードはtopicの削除をトリガーします.主な論理は:1:すべてのコピーの実行を停止します.2:すべてのコピーのログデータを削除します.3:zk上の/admin/delete_を削除topics/ノード.
  • パーティション再割り当て:kafka-reassign-partitionsスクリプトが行うことです.同様にZookeeperと組み合わせて使用し、スクリプト書き込み/admin/reassign_partitionsノードがトリガーされ、controllerはスキーマに従ってパーティションを割り当てます.実行プロセスは、コピーと新しいコピーセットの契約時に存在する再伸縮メカニズムを拡張します.
  • Preferred leader割当:パーティションリーダーのコピーを調整し、prefered leader選挙には現在2つのトリガ方式がある:1.自動トリガ(auto.leader.rebalance.enable=true)では、controllerがPreferred leaderを自動的に調整します.2.kafka-preferred-replica-electionスクリプトがトリガーされます.どちらも同じ手順でZookeeperへの/admin/preferred_replica_Election書き込みデータ、controller抽出データ実行preferred leader割当て
  • パーティション拡張:すなわちtopicパーティション数を増加させる.標準的な方法はkafka-reassign-partitionsスクリプトによっても完成しますが、ユーザーはZookeeperに直接データを書くことができます.例えば、新しいパーティションのコピーセットを/brokers/topics/の下に直接書き込むことができます.その後、controllerはあなたのためにleaderを自動的に選択し、パーティション
  • を追加します.
  • クラスタ拡張:brokerを追加するとZookeeper/brokers/idsの下にznodeが追加され、controllerはサービス発見の作業を自動的に完了します
  • brokerクラッシュ:同様に、controllerはZookeeperによってbroker状態をリアルタイムで検出することができる.ブロッカーが切られると、controllerは直ちに感知し、影響を受けるパーティション選挙のために新しいleader
  • を選択することができる.
  • ControlledShutdown:brokerは崩壊するだけでなく、「優雅に」脱退することができます.brokerが自ら終了すると、controllerはControlledShudownRequest要求を受信し、controllerは要求を適切に処理し、様々な終了作業
  • を実行する.
  • Controller leader選挙:controllerは必ず自分のleader選挙を提供して、このグローバルで唯一のコンポーネントがクラッシュしてダウンタイムになってサービスが中断しないようにしなければならない.この機能もZookeeperの助けによって実現された.

  • 5 ControllerとBroker間の通信メカニズム(NIO select)
  • controllerが起動すると、クラスタ内のすべてのBrokerに固有のSocket接続が作成され、100台のbrokerマシンが追加されると、controllerは100個のSocket接続を作成します.新バージョンでは現在、NIO selectが統一されており、実際には100スレッドを維持する必要があります.

  • 6 ControllerContextデータコンポーネント
  • controllerのキャッシュは、最も重要なデータコンポーネントと言える.ControllerContextは、kafkaクラスタ内のすべてのメタデータ情報をZookeeperにまとめ、controllerがサービスを正しく提供できる基礎である.

  • 本セットの技術コラムは著者(秦凱新)の普段の仕事の総括と昇華であり、実際のビジネス環境からケースを抽出して総括と分かち合い、ビジネス応用の最適化提案とクラスタ環境容量計画などの内容を提供することによって、本セットのブログに引き続き注目してください.IOT時代で最も戦闘力のあるチームに参加することを期待しています.QQメールアドレス:[email protected]学術交流があれば、いつでも連絡することができます.
    7まとめ
    kafkaクラスタControllerは主にZK持続化レプリカ割り当てスキームを通じて、レプリカ割り当てスキームに基づいてパーティションを作成し、ZK znode状態の変化を傍受して実行処理を行い、パーティションとレプリカISRメカニズムの安定した運行を維持する.huxihxの技術のブログと関連する書籍に感謝して、私にControllerの核心のメカニズムを理解させて、1篇の学習のノートを書いて、総括として、苦労して文を成し遂げて、本当に容易ではありませんて、ありがとうございます.
    秦凱新は深センで201812021541