logstash kafka複数台のマシンがデータを取得し、1つのインスタンスだけが消費する【logstash 2.3.4】
5278 ワード
説明:まずkafkaのpartition数は6であり、複数のlogstashサービスマシンのtopicは同じであり、groupIdも同じである.それぞれ機械を起動すると、kafkaデータを消費している機械は1台しかなく、いつも最後に起動している機械は消費しています.zk時間など様々なkafka構成を修正しても効果がなく、最後にrubyソースコードを見ざるを得なくなりました.kakfaのコードはまだ少し問題があるような気がします.
まずlogstashのinput-kafkaモジュールのコード構造を紹介します
主なコードロジックはこの2つのrubyファイルに集中していますkafka.rbは主に構成パラメータを受信groupを呼び出す.rbはkafkaデータを消費し、このプロセスは呼び出したkafka java apiであり、javaコードとほぼ同様に流れている.よく読んでみるとgroup.rbのコードに問題があります.
定義されたrunメソッドでは、
主な意味は、zkディレクトリの下にあるgroupIdの情報を削除することです.この言葉があるだけで、もともとlogstashインスタンスの消費情報が削除されるので、上記の現象が発生します(具体的には、時間的な理由であまり調査されていません)
———————ここが解決策———————logstash-2.3.4/vendle/bundle/jruby/1.9/gems/jruby-kafka-1.5.0-java/lib/jruby-kafkaディレクトリの下でgroupを開く.rbファイル、コードシートを見つけて、#注釈で落とせばいい
まずlogstashのinput-kafkaモジュールのコード構造を紹介します
kafka.rb -- logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.8/lib/logstash/inputs/
group.rb -- logstash-2.3.4/vendor/bundle/jruby/1.9/gems/jruby-kafka-1.5.0-java/lib/jruby-kafka
主なコードロジックはこの2つのrubyファイルに集中していますkafka.rbは主に構成パラメータを受信groupを呼び出す.rbはkafkaデータを消費し、このプロセスは呼び出したkafka java apiであり、javaコードとほぼ同様に流れている.よく読んでみるとgroup.rbのコードに問題があります.
def run(a_num_threads, a_queue)
begin
if @reset_beginning == 'from-beginning'
Java::kafka::utils::ZkUtils.maybeDeletePath(@zk_connect, "/consumers/#{@group_id}")
end
@consumer = Java::kafka::consumer::Consumer.createJavaConsumerConnector(create_consumer_config)
rescue ZkException => e
raise KafkaError.new(e), "Got ZkException: #{e}"
end
thread_value = a_num_threads.to_java Java::int
streams = get_streams(thread_value)
@executor = Executors.newFixedThreadPool(a_num_threads)
@executor_submit = @executor.java_method(:submit, [Java::JavaLang::Runnable.java_class])
thread_number = 0
streams.each do |stream|
@executor_submit.call(Kafka::Consumer.new(stream, thread_number, a_queue, @consumer_restart_on_error, @consumer_restart_sleep_ms))
thread_number += 1
end
@running = true
end
定義されたrunメソッドでは、
if @reset_beginning == 'from-beginning'
Java::kafka::utils::ZkUtils.maybeDeletePath(@zk_connect, "/consumers/#{@group_id}")
end
主な意味は、zkディレクトリの下にあるgroupIdの情報を削除することです.この言葉があるだけで、もともとlogstashインスタンスの消費情報が削除されるので、上記の現象が発生します(具体的には、時間的な理由であまり調査されていません)
———————ここが解決策———————logstash-2.3.4/vendle/bundle/jruby/1.9/gems/jruby-kafka-1.5.0-java/lib/jruby-kafkaディレクトリの下でgroupを開く.rbファイル、コードシートを見つけて、#注釈で落とせばいい
#if @reset_beginning == 'from-beginning' #Java::kafka::utils::ZkUtils.maybeDeletePath(@zk_connect, "/consumers/#{@group_id}")
#end