auto.offset.reset説明
Kafkaのconsumerはpull形式でメッセージデータを取得し、consumerはhigh levelとlow level APIの2つのバージョンを提供している.
1)consumerがpartitionより多ければ、無駄です.kafkaの設計は1つのpartitionでは同時化は許されないので、consumer数はpartition数より大きくしないでください.2)consumerがpartitionより少ない場合、1つのconsumerは複数のpartitionsに対応します.ここではconsumer数とpartition数を主に合理して割り当てます.そうしないとpartition内のデータが不均一になります.partitonの数はconsumerの数の整数倍であることが望ましいので、partitionの数は重要である.例えば24をとると、consumerの数を設定しやすい.3)consumerが複数のpartitionからデータを読み取ると、データ間の順序性が保証されず、kafkaは1つのpartitionでデータが秩序化されていることを保証するが、複数のpartitionは、あなたが読む順序によって異なる.4)consumer,broker,partitionはrebalanceを招く.だからrebalance後にconsumer対応のpartitionが変わる5)High-levelインタフェースでデータが取得できないときはblockします.6)ConsumerおよびBrokerを突然停止するとメッセージが重複する場合があり、それを回避するためにshutdownより前にThreadを通過する.sleep(10000)Consumerにoffsetをzookeeperに同期させる時間を与える
以下にkafkaconsumerのテストコードを示します.
auto.offset.resetのデフォルト値がlargestであればauto.offset.resetはどんな役割を果たしますか?auto.offset.resetは、ConsumerがZooKeeperで初期offsetがないことを発見した場合、またはoffsetが不正であることを発見した場合にConsumerを定義する動作を定義し、一般的な構成は次のとおりです.
smallest:offsetを自動的に最小offsetに設定します.
Largest:offsetを自動的に最大のoffsetに設定します.
anything else:異常を投げ出す;
このような状況に遭遇したことがあります:まずいくつかのデータをproduceして、それからproduceデータのスレッドを停止します——〉それからconsumerの上のコードでデータを消費して、データが消費できないことを発見します!
その理由は、初期のoffsetのデフォルトは不正であり、auto.offset.resetのデフォルト値はlargestで、offsetを自動的に最大のoffsetに設定することを示しています.この場合、生産者からkafka pushデータがないので、もちろん消費できるデータはありません.このときkafka pushデータに生産者がいる場合、コードは最新の位置からデータを消費することができる.
コードに次の設定を追加すると、
では、生産者スレッドを停止した後、消費者スレッドを起動して、以前のproduceのデータを消費することができます.
3.1 kafka.tools.ConsumerOffsetChecker現在のグループのoffset状況を表示するには、次のコマンドを使用します.
またはtopicを指定する
上図に示すpidはtopicのpartition番号を表し、上図のtopicはuser 0のpartiton数は10である.生産者スレッドを開始してからしばらくしてから、上記のコマンドを再実行すると、次のようになります.
3.2 kafka.tools.UpdateOffsetsInZK
このコマンドの3つのパラメータ[earliest|latest]は、offsetをどこに置くかを示す.properties、ここはプロファイルのパスtopic、topic名、ここはuser 01
1 consumerとpartition
1)consumerがpartitionより多ければ、無駄です.kafkaの設計は1つのpartitionでは同時化は許されないので、consumer数はpartition数より大きくしないでください.2)consumerがpartitionより少ない場合、1つのconsumerは複数のpartitionsに対応します.ここではconsumer数とpartition数を主に合理して割り当てます.そうしないとpartition内のデータが不均一になります.partitonの数はconsumerの数の整数倍であることが望ましいので、partitionの数は重要である.例えば24をとると、consumerの数を設定しやすい.3)consumerが複数のpartitionからデータを読み取ると、データ間の順序性が保証されず、kafkaは1つのpartitionでデータが秩序化されていることを保証するが、複数のpartitionは、あなたが読む順序によって異なる.4)consumer,broker,partitionはrebalanceを招く.だからrebalance後にconsumer対応のpartitionが変わる5)High-levelインタフェースでデータが取得できないときはblockします.6)ConsumerおよびBrokerを突然停止するとメッセージが重複する場合があり、それを回避するためにshutdownより前にThreadを通過する.sleep(10000)Consumerにoffsetをzookeeperに同期させる時間を与える
2 auto.offset.resetのいくつかの問題
以下にkafkaconsumerのテストコードを示します.
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">kafkaConsumer</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">extends</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">Thread</span>{</span>
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> String topic;
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-title" style="box-sizing: border-box;">kafkaConsumer</span>(String topic){
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">super</span>();
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.topic = topic;
}
<span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span>
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">run</span>() {
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> HashMap<String, Integer>();
topicCountMap.put(topic, <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// </span>
Map<String, List<KafkaStream<<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[], <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[], <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[]> stream = messageStreams.get(topic).get(<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>);<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// </span>
ConsumerIterator<<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[], <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[]> iterator = stream.iterator();
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span>(iterator.hasNext()){
String message = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> String(iterator.next().message());
System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" : "</span> + message);
}
}
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> ConsumerConnector <span class="hljs-title" style="box-sizing: border-box;">createConsumer</span>() {
Properties properties = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> Properties();
properties.put(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"zookeeper.connect"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"ip1:2181,ip2:2181,ip3:2181"</span>);<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// zk </span>
properties.put(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"group.id"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"group03"</span>);
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> Consumer.createJavaConsumerConnector(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> ConsumerConfig(properties));
}
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span>(String[] args) {
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> kafkaConsumer(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"user"</span>).start();<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// kafka topic:user </span>
}
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li></ul>
auto.offset.resetのデフォルト値がlargestであればauto.offset.resetはどんな役割を果たしますか?auto.offset.resetは、ConsumerがZooKeeperで初期offsetがないことを発見した場合、またはoffsetが不正であることを発見した場合にConsumerを定義する動作を定義し、一般的な構成は次のとおりです.
smallest:offsetを自動的に最小offsetに設定します.
Largest:offsetを自動的に最大のoffsetに設定します.
anything else:異常を投げ出す;
このような状況に遭遇したことがあります:まずいくつかのデータをproduceして、それからproduceデータのスレッドを停止します——〉それからconsumerの上のコードでデータを消費して、データが消費できないことを発見します!
その理由は、初期のoffsetのデフォルトは不正であり、auto.offset.resetのデフォルト値はlargestで、offsetを自動的に最大のoffsetに設定することを示しています.この場合、生産者からkafka pushデータがないので、もちろん消費できるデータはありません.このときkafka pushデータに生産者がいる場合、コードは最新の位置からデータを消費することができる.
コードに次の設定を追加すると、
<code class="hljs livecodeserver has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">properties.<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">put</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"auto.offset.reset"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"smallest"</span>); </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
では、生産者スレッドを停止した後、消費者スレッドを起動して、以前のproduceのデータを消費することができます.
3 high-levelのConsumerツール
3.1 kafka.tools.ConsumerOffsetChecker現在のグループのoffset状況を表示するには、次のコマンドを使用します.
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> ./kafka-run-class<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.sh</span> kafka<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.tools</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.ConsumerOffsetChecker</span> --group group03</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
またはtopicを指定する
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">./kafka-run-class<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.sh</span> kafka<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.tools</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.ConsumerOffsetChecker</span> --topic user01 --group group03</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
上図に示すpidはtopicのpartition番号を表し、上図のtopicはuser 0のpartiton数は10である.生産者スレッドを開始してからしばらくしてから、上記のコマンドを再実行すると、次のようになります.
3.2 kafka.tools.UpdateOffsetsInZK
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">./kafka-run-class<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.sh</span> kafka<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.tools</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.UpdateOffsetsInZK</span> earliest config/consumer<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.properties</span> user01</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
このコマンドの3つのパラメータ[earliest|latest]は、offsetをどこに置くかを示す.properties、ここはプロファイルのパスtopic、topic名、ここはuser 01