Kafkaテーマとパーティション-ノート3


文書ディレクトリ
  • Kafkaテーマとパーティション-ノート3
  • テーマの添削調査
  • 新規トピック
  • トピックの表示
  • トピックの変更
  • トピック側構成パラメータとbroker側関係
  • トピック
  • を削除
  • KafkaAdminClient
  • パーティション
  • 優先コピー(preferred replica)の選挙
  • パーティション再割り当て
  • レプリケーションストリーム
  • レプリカファクタ
  • を修正する.
  • パーティション数の選択
  • kafka性能試験
  • 適切なパーティション数
  • Kafkaテーマとパーティション-ノート3
    トピックとパーティションは論理概念であり、パーティションには1~複数のコピーがあり、各コピーは1つのログファイルに対応し、各ログファイルは1~複数のログセグメントLogSegmentに対応し、各ログセグメントはインデックスファイル、ログストレージファイル、スナップショットファイルに細分化することもできる.
    テーマの添削
    テーマの添削は一般的にkafka-topics.shスクリプトは、kafkaのbinディレクトリまたはKafkaAdminClientを使用してXXXRequestリクエストを実行します.
    新規トピック
    brokerエンド構成パラメータauto.create.topics.enableがtrue(デフォルト)に設定されている場合、生産者が作成されていないトピックにメッセージを送信すると(または、消費者が未知のトピックからメッセージを読み取り始めたり、クライアントが未知のトピックにメタデータ要求を送信したりすると)、brokerエンド構成パラメータnum.partitions(デフォルト1)、コピー係数default.replication.factor(デフォルト1)のトピックが自動的に作成されます.auto.create.topics.enableセットtrueは推奨されません
    kafka-topics.sh新規トピック:本質はkafkaを呼び出すことである.admin.TopicCommand
        topic-test      4     2
    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-test --partitions 4 --replication-factor 2
    	--zookeeper:ZooKeeper    
    	--partitions:   
    	--replication-factor:   
    	--topic:    
    	
    	--create:           ,      list,describe,alter,delete
    	
    Kafka  log.dir log.dirs                  
        :-            
    
             ZooKeeper /brokers/topics/             
    
    --replica-assignment  :             ,  partitions replication-factor     ,
    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-test --replica-assignment 0:1,1:2,2:0
    
             3,   2   
    
    --config:                 ,          
    	--config max.message.bytes=1000        
    --if-not-exists:                 
    

    トピック、パーティション、コピー、ログの関係:同じパーティション内の複数のコピーを異なるbrokerに分散して、1つのbrokerが重複しないように表現する必要があります-名前付きファイル
    一対多
    一対多
    一対一
    テーマ
    パーティションパーティション
    レプリカReplica
    ログファイルlog
    kafka-topics.shスクリプトは、トピックの作成時に.または_が含まれているかどうかを検出し、kafkaの内部では.を下線_に変更するので、トピック名に2つの記号が含まれないようにします.
    broker端パラメータbroker.rackラック情報
    Kafka    broker     。        ,                            
        broker  broker.rack          disable-rack-aware        
    

    トピックの表示
    リストとdescribeコマンドでトピック情報を表示
    トピック内の内容の表示
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning
    

    List現在使用可能なすべてのトピックを表示
    bin/kafka-topics.sh --zookeeper localhost:2181 --list 
    test
    test1
    __consumer_offsets
    

    describeパーティションコピーの割り当ての詳細を表示
    bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-test
    
    Topic:PushNotice        PartitionCount:1        ReplicationFactor:1     Configs:
    Topic: PushNotice       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
    
    PartitionCount   
    ReplicationFactor   
    Partition    
    Leader :leader    brokerId
    Replicas:    brokerId,AR  
    Isr:   ISR  ,brokerId
    
    --topics-with-overrides:              --config,             
    --under-replicated-partitions --unavailable-partitions:        
    	--under-replicated-partitions             。                   ,          ,     ISR    AR  
    	
    

    トピックの変更
    トピック作成後もパーティション数、構成などを変更できますが、alterコマンドでkafkaはパーティション数の削減をサポートせずにパーティション数の増加のみをサポートすることに注意してください.
    alter
         
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-test --partitions 5
    
    --if-exists:      
    
      config  
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-test --config max.message.bytes=2000
    
    --delete-config:         ,        
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-test --delete-config max.message.bytes
    

    kafka-config.sh
          alter     describe      , kafka-topics.sh  ,kafka-config.sh             ,       broker,      3      
    

    トピック側構成パラメータとbroker側関係
    トピックの構成パラメータを変更しない場合は、broker側の対応するパラメータをデフォルト値として使用します.
    トピックエンドパラメータ
    さぎょう
    brokerエンドパラメータ
    cleanup.policy
    ログ圧縮ポリシー.デフォルトdelete,compactなど
    log.cleanup.policy
    compression.type
    メッセージの圧縮タイプ.デフォルトproducer,gzipなど

    delete.retention.ms
    削除が識別されたデータが保持できる時間は、デフォルトで86400000、つまり1日です.
    log.cleaner.delete.retention.ms
    file.delete.delay.ms
    ファイルのクリーンアップが待機できる時間は、デフォルトで60000、つまり1分です.
    log.segment.delete.delay.ms
    max.message.bytes
    メッセージの最大バイト数、デフォルト100012

    message.timestamp.type
    メッセージのタイムスタンプタイプ、デフォルトCreateTime、およびLogAppendTime
    log.message.timestamp.type
    message.timestame.difference.max.ms
    メッセージに含まれるタイムスタンプとbrokerがメッセージを受信したときのタイムスタンプの最大差、デフォルトLong.MAX_VALUE,message.timestamp.typeがCreateTimeの場合有効

    min.insync.replicas
    パーティションISRコレクションの少なくともコピー数、デフォルト1

    retention.bytes
    パーティションに保持できるメッセージの合計量、デフォルト-1、制限なし
    log.retention.bytes
    retention.ms
    deleteのログクリーンアップポリシーを使用してメッセージを保持できる時間は、デフォルトで604800000、すなわち7日間です.-1の場合は制限ありません.
    log.retention.ms
    segment.bytes
    ログ・セグメントの最大値、デフォルトの1073741824、すなわち1 GB
    log.segment.bytes
    segment.ms
    最大7日間、ログ・セグメントをスクロールします.
    log.roll.ms
    unclean.leader.election.enable
    非ISRコレクションからリーダーコピーを選択できるかどうか、デフォルトfalse、trueがデータを失う可能性がある場合

    トピックの削除
    deleteコマンド:デフォルトのtrueを削除するには、brokerエンドパラメータdelete.topic.enableをtrueに設定する必要があります.
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic-test
    
    --if-exist
    
    kafka-topics.sh               ZooKeeper  /admin/delete_topics                  ,      ,   kafka       
    

    ZooKeeperクライアントによるトピックの削除:
    zkCli.sh
    create /admin/delete_topics/topic-test ""
    

    手動で削除:
             ZooKeeper  /brokers/topics  /config/topics   
              log.dir            3   
    

    KafkaAdminClient
    KafkaAdminClientでbroker、構成、ACL、トピックを管理する
        
    public CreateTopicsResult createTopics(final Collection newTopics,
                                               final CreateTopicsOptions options)
        
    public DeleteTopicsResult deleteTopics(Collection topicNames,
                                               DeleteTopicsOptions options)
                                                   
    public ListTopicsResult listTopics(final ListTopicsOptions options)
           
    public DescribeTopicsResult describeTopics(final Collection topicNames, DescribeTopicsOptions options)
          
    public DescribeConfigsResult describeConfigs(Collection configResources, final DescribeConfigsOptions options)
          
    public AlterConfigsResult alterConfigs(Map configs, final AlterConfigsOptions options)
        
    public CreatePartitionsResult createPartitions(Map newPartitions,
                                                       final CreatePartitionsOptions options)
    

    検証トピック
      auto.create.topics.enable   false,         
    
    Kafka broker     create.topic.policy.class.name   null,                   
    
    	       CreateTopicPolicy    PolicyTest        server.properties   create.topic.policy.class.name=org.apache.kafka.server.policy.PolicyTest
    	
        topic ,        
    

    パーティション
    優先レプリカを含む選挙、パーティション再割り当て、レプリケーション制限フロー、レプリカ係数の変更
    優先コピー(preferred replica)の選挙
    パーティションはマルチコピーメカニズムを使用し、leaderコピーは外部に読み書きを提供し、followerコピーはメッセージの同期のみを行う.パーティションのリーダーコピーが使用できない場合は、パーティション全体が使用できなくなることを意味し、残りのfollowerコピーから新しいリーダーを選択します.
    優先コピーpreferred replica:
           AR           。                  ,       leader 0    2 
    
    Topic: topic1 PartitionCount:3 ReplicationFactor:3 Configs:
    Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
    Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2
    Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
    
        topic1   0 AR  Replicas [1,2,0],    0       1。               leader  ,        preferred leader
    
    Kafka             Kafka       ,    leader    , leader              
    

    優先コピーの選挙とは、一定の方法で優先コピーがリーダーコピーに選ばれることを意味し、パーティションバランスとも呼ばれる.
    Kafkaにはパーティション自動バランシング機能があり、brokerエンドパラメータauto.leader.rebalance.enable、デフォルトtrue、パーティション自動バランシング機能をオンにすると、Kafkaのコントローラはタイミングタスクを起動してすべてのbrokerノードをポーリングし、各brokerノードのパーティション不平衡率**(brokerの不平衡率=非優先コピーのleader個数/パーティション総数)**がleader.imbalance.per.broker.percentageパラメータ構成の比を超えているかどうかを計算します.デフォルトでは10%ですが、設定した比を超えると自動的に優先コピーの選挙動作が実行され、パーティションバランスが求められます.実行サイクルはパラメータleader.imbalance.check.interval.secondsによって制御され、デフォルトは300秒です.
    brokerエンドパーティション自動バランス関連パラメータ
    auto.leader.rebalance.enable:               true  :      false,            ,     ,       
    leader.imbalance.per.broker.percentage:broker                   10%         
    leader.imbalance.check.interval.seconds:          ,           leader.imbalance.per.broker.percentage      ,  300 
    

    kafka-perferred-replica-election.sh手動でパーティション再バランスを実行する
    kafka-perferred-replica-election.sh        leader           。
               ,          leader     
    
    bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
    
        
    Created preferred replica election path with __consumer_offsets-22,__consumer_offsets-30,__consumer_offsets-8
    Successfully started preferred replica election for partitions Set(    ...)
    
        kafka-perferred-replica-election.sh                        。leader             ,               。           ,              。
               ,            ZooKeeper /admin/preferred_replica_election  ,        ZooKeeper      ,      。      1MB
    
    path-to-json-fileパラメータは、一部のパーティションに対して優先コピーの選挙操作を一括して実行する
               ,  kafka-perferred-replica-election.sh  path-to-json-file      JSON  (                 )            
    
      JSON  test.json
    {
    	"partitions":[
    		{
    			"partition":0,
    			"topic":"topic-test"
    		},
            {
    			"partition":1,
    			"topic":"topic-test"
    		}
    	]
    }
    
        bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file test.json
    
        :     0 1          
    Created preferred replica election path with topic-test-0,topic-test-1
    Successfully started preferred replica election for  partitions Set(topic-test-0,topic-test-1)
    

    **注意:**生産環境では、path-to-json-fileパラメータを使用してバッチ化し、手動で優先コピーの選挙操作を実行します.
    パーティション再割り当て
    Kafkaパーティションの割り当てに関する問題:
  • マルチコピーあるノードdownの下で、このノードleaderコピーはクラスタの他のfollowerコピーに渡されます.この時点でこのノードのパーティションコピーは無効になるが、Kafkaはこれらの無効なコピーをクラスタ内の残りの利用可能なbrokerノード上の
  • に自動的に移行することはない.
  • クラスタにbrokerノードが追加された場合、新しく作成されたトピックパーティションのみがこのノードに割り当てられる可能性がありますが、以前のトピックパーティションは新しく追加されたノードに自動的に割り当てられず、新しいノードの負荷と元のノードの負荷との間に深刻なアンバランスが発生します.
    上記の2つの問題を解決するために、パーティションコピーを合理的に再割り当てする必要がある.Kafkaはkafka-reassign-partitions.shスクリプトを提供し、クラスタ拡張、brokerノードの失効したシーンでパーティションを移行することができる.kafka-reassign-partitions.shスクリプト実行手順
  • トピックリストを含むJSONファイルを作成する
  • トピックリストとbrokerノードリストに基づいて再割り当てスキーム
  • を生成する.
  • 具体的な再割り当て
  • を実行する.
        brokerId 1 broker     ,             
    
    1.    JSON   test.json;                       topic-test   
    {
    	"topics":[
    		{
    			"topic":"topic-test"
    		}
    	],
    	"version":1
    }
    2.  JSON          broker                 
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topics-to-move-json-file test.json --broker-list 0,2
    	--generate           ,   kafka-topics.sh --create,               
    	--topic-to-move-json                     
    	--broker-list         broker    
        :
    	            json               
    	Crrent partition replica assignment
    	{...}
    	        ,          ,          json    
    	Proposed partition reassignment configuration
    	       test1.json   
    3.          
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file test1.json
        test1.json               !
    	--execute       ,       
    	--reassignment-json-file test1.json               
    

    パーティション再割り当ての進行状況の表示
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file test1.json
    

    パーティション再割り当ての基本原理:本質はデータレプリケーションにある
                   (      ),         leader           (    ,    ,  ,  )
                   (           )
      :               ,     leader                   leader
    

    パーティション再分割はクラスタのパフォーマンスに大きな影響を及ぼし、ネットワークやディスクなどの追加のリソースを消費する必要があります.実際には再配分の粒度を下げ、複数の小ロットに分けて実行すべきである
    注意:brokerをオフラインにするには、再パーティションを再割り当てする前にbrokerを閉じたり再起動したりすることが望ましい.このようにbrokerにはパーティションのleaderノードは存在しません.ブローカー間のトラフィックレプリケーションを削減
    レプリケーション制限フロー
    クラスタ内のトピックまたはパーティションのトラフィックが一定期間にわたって特に大きい場合、パーティションの再割り当て時に粒度を減らすだけでは対応できません.この場合、ストリーム制限メカニズムが必要です.レプリカ間のレプリケーショントラフィックを制限し、再割り当て期間中のサービス全体が影響を受けないことを保証できます.
    レプリカ間レプリケーションストリーム制限方式
  • kafka-config.shスクリプト
  • kafka-reassign-partitions.shスクリプト
  • kafka-config.shスクリプト制限フロー:動的構成により制限フローbrokerエンドパラメータに達する
    follower.replication.throttled.rate:  follower       
    leader.replication.throttled.rate:  leader        
             ,  B/s
    
          1kb/s
    bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 1 --alter --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024
      
    Completed Updating config for entity: brokers '1'.
    

    トピックエンドパラメータ
    follower.replication.throttled.replicas:             follower     [0:1,1:2,2:0]  [  n:follower  broker]
    leader.replication.throttled.replicas:             leader    [0:0,1:1,2:2]  [  n:leader  brokerId]
    
    kafka-reassign-partitions.shスクリプトストリーム制限:throttleパラメータが必要です.このメソッドの本質も最初のメソッドの4つのパラメータです.
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file test1.json --throttle 10 
           10B/s                   
                         ,            
    	bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file test1.json
    

    コピーファクタの変更
    詳細はパーティション再割り当てを参照
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file test1.json
    
    json  :
    {
        "version": 1,
        "partitions": [
            {
                "topic": "topic-throttle",
                "partition": 1,
                "replicas": [
                    2,
                    0
                    //1.        
                ],
                "log_dirs": [
                    "any",
                    "any"
                    //2.       any log_dirs      replicas       !
                ]
            },
            {
                "topic": "topic-throttle",
                "partition": 0,
                "replicas": [
                    0,
                    2
                ],
                "log_dirs": [
                    "any",
                    "any"
                ]
            },
            {
                "topic": "topic-throttle",
                "partition": 2,
                "replicas": [
                    0,
                    2
                ],
                "log_dirs": [
                    "any",
                    "any"
                ]
            }
        ]
    }
    
    

    レプリカ係数は増加しても減少してもよく、replicasパラメータは1つのレプリカごとにbrokerId,log_dirsパラメータが1つ増えるany
    パーティション数の選択
    kafka性能テスト
    生産者性能試験kafka-producer-perf-test.sh、消費者性能試験kafka-consumer-perf-test.shkafka-producer-perf-test.sh
       topic-test   100    ,       1024B,     acks   1
    bin/kafka-producer-perf-test.sh --topic topic-test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
        
    184756 records sent, 36951.2 records/sec (36.09 MB/sec), 711.8 ms avg latency, 1298.0 max latency.
    381240 records sent, 76248.0 records/sec (74.46 MB/sec), 403.8 ms avg latency, 538.0 max latency.
    386370 records sent, 77274.0 records/sec (75.46 MB/sec), 397.8 ms avg latency, 461.0 max latency.
    1000000 records sent, 64094.346879 records/sec (62.59 MB/sec), 457.14 ms avg latency, 1298.00 ms max latency, 400 ms 50th, 786 ms 95th, 1161 ms 99th, 1286 ms 99.9th.
       1000000   ,64094.346879   / (62.59 MB / ),457.14 ms    ,1298.00 ms    ,400 ms 50%,786 ms 95%,1161 ms 99%,1286 ms 99.9%。
    	records send     
    	records/sec                
    	MB/sec                
    	avg latency          
    	max latency          
    	50th,99.9th    50%,99.9%       
    
    --topic:      
    --num-records:          
    --record-size:            B
    --producer-props:        ,        
    --producer.config:          
    --throghput:    ,   0   ,  0,                             77274.0 records/sec (75.46 MB/sec)   b/sec
    --print-metrics:        
    
    kafka-consumer-perf-test.sh
    bin/kafka-consumer-perf-test.sh --topic topic-test --messages 1000000 --broker-list localhost:9092
    
    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
    WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the --timeout option to increase the timeout.
    2020-03-04 15:10:58:531, 2020-03-04 15:11:04:657, 976.5625, 159.4127, 1000000, 163238.6549, 16, 6110, 159.8302, 163666.1211
    
    start.time       
    end.time       
    data.consumed.in.MB   MB,       
    MB.sec              
    data.consumed.in.nMsg:       
    nMsg.sec            
    rebalance.time.ms          ms
    fetch.time.ms          
    fetch.MB.sec            
    fetch.nMsg.sec         
      fetch.time.ms=end.time-start.time-rebalance.time.ms
    

    適切なパーティション数
    パーティション数が多ければ多いほど良いわけではありません.パーティション数はkafkaのパフォーマンス向上に閾値を持っています.
    パーティション数の上限とlinuxのファイル記述子の制限はulimit -n 65536によって上限を上げるかlimitsを修正することに関係する.confファイル
    root soft nofile 65535
    root hard nofile 65535
    

    異なる環境のパーティション数のしきい値が異なり、最大効率は環境テストを行う必要がある
    パーティションは、リーダーロールの切り替え時に使用できません.
    brokerノード数が少ない場合は、パーティション数をクラスタ内のbrokerの倍数、例えば3,6,9に設定することをお勧めします.