FlumeNGとKafkaの統合
3061 ワード
1,ProducerとしてのFlumeエンド構成で,netcatをsourceデータソース,sinkをkafkaとする
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
#producer.sources.s.type = seq
producer.sources.s.type = netcat
producer.sources.s.bind = localhost
producer.sources.s.port = 44444
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
2,consumerを配置して、sourceはKafkaで、sinkはloggerです
consumer.sources = s
consumer.channels = c
consumer.sinks = r
consumer.sources.s.type = seq
consumer.sources.s.channels = c
consumer.sinks.r.type = logger
consumer.sinks.r.channel = c
consumer.channels.c.type = memory
consumer.channels.c.capacity = 100
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource
consumer.sources.s.zookeeper.connect=127.0.0.1:2181
consumer.sources.s.group.id=testGroup
consumer.sources.s.zookeeper.session.timeout.ms=400
consumer.sources.s.zookeeper.sync.time.ms=200
consumer.sources.s.auto.commit.interval.ms=1000
consumer.sources.s.custom.topic.name=test
consumer.sources.s.custom.thread.per.consumer=4
3,それぞれ2つのエージェントを実行している
bin/flume-ng agent --conf conf --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/comsumer1.properties --name consumer -Dflume.root.logger=INFO,console
4,telnetポート44444
hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1111111111111111
OK
kak^Hfkakakkakakakkakkakkaakaknnnm
OK
abcdefghijklmnopqrstuvwxyz
OK
両方のエージェントから情報が出力されました
org.apache.flume.pluginsのコードリファレンス:https://github.com/baniuyao/flume-kafka上にも詳しい使い方があります