kafka送信超大メッセージ設定

2999 ワード

最近、cdcアプリケーションが開発され、極端な状況をテストするために、kafkaが100万件のデータを過去に伝達する必要があり、1 G程度であり、他の段階の制限のため、パケットを分解することが不便である(注:測定によると、大きなパケットを歩くkafkaは必ずしも性能が良いとは限らず、さらに低い可能性もある).
百万以上の変更データをテストする場合、kafka brokerが許可する最大値を超えるメッセージが表示されるため、パケットが正常に送信されることを保証するために、次のパラメータを変更する必要があります.
socket.request.max.bytes=21748483647#socket serverが受信する最大要求サイズlogが設定されている.segment.bytes=21748483647#kafkaデータファイルのサイズは、この値がメッセージの長さより大きいことを確認します.一般的にはデフォルト値を使用すればよい(一般的には、ファイルシステムではなくメッセージシステムであるため、1つのメッセージが1 Gよりも大きくなりにくい).message.max.bytes=2174483647#kafka serverが受信する最大メッセージサイズはsocket.以下であるべきである.request.max.bytesreplica.fetch.max.bytes=2174483647#各パーティションが取得しようとするメッセージバイト数.より大きいのはmessageです.max.bytes、そうでないとbrokerはこのメッセージを受信しますが、このメッセージをコピーすることができず、データが失われます.fetch.message.max.bytes=21748483647#各抽出要求でトピックパーティションごとに抽出されたメッセージバイト数.より大きいのはmessageです.max.bytes、そうしないとbrokerは消費者側がこのメッセージを使用できないために保留します.
生産者は次のように設定できます.
kafkaProps.put("max.request.size", 2147483647);    #   message.max.bytes, producer.properties 
kafkaProps.put("buffer.memory", 2147483647);
kafkaProps.put("timeout.ms", 3000000);
kafkaProps.put("request.timeout.ms", 30000000);

消費者は以下のように設定されている.
props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");

各パラメータの意味はkafka公式ドキュメントを参照できますhttps://kafka.apache.org/documentation/#configuration.
kafka基礎知識体系は、LZ学習ノートkafka学習ガイド(総括版)を参照してください.
注意:各パラメータがメモリに与える影響は次のとおりです.Brokersは各パーティションにreplicaを割り当てます.fetch.max.bytesパラメータで指定したメモリ領域、replicaを仮定します.fetch.max.bytes=1 M、1000個のパーティションがある場合は、1 G未満のメモリが必要です.パーティション数*最大のメッセージがサーバのメモリを超えないようにします.そうしないと、OOMエラーが発生します.同様に、消費側のfetch.message.max.bytesは、最大メッセージに必要なメモリ領域を指定します.同様に、パーティション数*最大必要メモリ領域はサーバのメモリを超えてはいけません.したがって、大きなメッセージを送信する場合は、メモリが一定の場合、パーティション数が少ないか、より大きなメモリを使用するサーバしか使用できません.
上記の方法は効果的ですが、お勧めしません.Kafka設計の目的は、短いメッセージを迅速に処理することであり、一般的に10 Kサイズのメッセージのスループット性能が最も優れている(LinkedInのkafka性能テストを参照).しかし、XMLドキュメントやJSONコンテンツなど、より大きなメッセージを処理する必要がある場合があります.1つのメッセージの差は10~100 Mではありません.この場合、Kakfaはどのように処理すればいいですか.
この問題について、以下のいくつかの提案があります.
  • の最良の方法は、これらの大きなデータを直接転送しないことである.NAS,HDFS,S 3などの共有ストレージがあれば,これらの大きなファイルを共有ストレージに格納し,Kafkaを用いてファイルの位置情報を転送することができる.
  • の第2の方法は、大きなメッセージデータをスライスまたはブロック化し、生産側でデータを10 Kサイズにスライスし、パーティションプライマリ・キーを使用して、大きなメッセージのすべての部分が同じkafkaパーティションに送信されることを保証することであり(このように、各部分の分割順序が保持される)、消費側が使用すると、これらの部分が元のメッセージに再復元される.
  • 第3に、Kafkaの生産側はメッセージを圧縮することができ、元のメッセージがXMLである場合、圧縮後、メッセージはそれほど大きくならない可能性がある.生産側の構成パラメータにcompressionを使用する.codecとcommpressed.topicsは圧縮機能をオンにすることができ、圧縮アルゴリズムはGZipまたはSnappyを使用することができる.

  • これらの値が大きすぎると、メッセージが指定された時間(max.poll.interval.ms(デフォルト300秒))で消費されず、rebalanceされますが、kafka自体にバグ(サーバ側のrebalance.timeout.ms(デフォルト60秒)が有効ではありません)があり、消費者グループのrebalance時間が長くなるため、注意が必要です.https://blog.csdn.net/u013200380/article/details/87868696.
    転載先:https://www.cnblogs.com/zhjh256/p/11369165.html