FlumeNGとKafkaの統合



 
 
1,ProducerとしてのFlumeエンド構成で,netcatをsourceデータソース,sinkをkafkaとする
 
 #agent section  
producer.sources = s  
producer.channels = c  
producer.sinks = r  
  
#source section  
#producer.sources.s.type = seq  
producer.sources.s.type = netcat  
producer.sources.s.bind = localhost  
producer.sources.s.port = 44444  
producer.sources.s.channels = c  
  
# Each sink's type must be defined  
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink  
producer.sinks.r.metadata.broker.list=127.0.0.1:9092  
producer.sinks.r.partition.key=0  
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition  
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder  
producer.sinks.r.request.required.acks=0  
producer.sinks.r.max.message.size=1000000  
producer.sinks.r.producer.type=sync  
producer.sinks.r.custom.encoding=UTF-8  
producer.sinks.r.custom.topic.name=test  
  
#Specify the channel the sink should use  
producer.sinks.r.channel = c  
  
# Each channel's type is defined.  
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000  

 
 
2,consumerを配置して、sourceはKafkaで、sinkはloggerです
 
 
 consumer.sources = s  
consumer.channels = c  
consumer.sinks = r  
  
consumer.sources.s.type = seq  
consumer.sources.s.channels = c  
consumer.sinks.r.type = logger  
  
consumer.sinks.r.channel = c  
consumer.channels.c.type = memory  
consumer.channels.c.capacity = 100  
  
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource  
consumer.sources.s.zookeeper.connect=127.0.0.1:2181  
consumer.sources.s.group.id=testGroup  
consumer.sources.s.zookeeper.session.timeout.ms=400  
consumer.sources.s.zookeeper.sync.time.ms=200  
consumer.sources.s.auto.commit.interval.ms=1000  
consumer.sources.s.custom.topic.name=test  
consumer.sources.s.custom.thread.per.consumer=4  

 
 
3,それぞれ2つのエージェントを実行している
 
bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
 
bin/flume-ng agent --conf conf  --conf-file conf/comsumer1.properties   --name consumer -Dflume.root.logger=INFO,console
 
4,telnetポート44444
 
 
 hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444  
Trying ::1...  
Trying 127.0.0.1...  
Connected to localhost.  
Escape character is '^]'.  
1111111111111111  
OK  
kak^Hfkakakkakakakkakkakkaakaknnnm  
OK  
abcdefghijklmnopqrstuvwxyz  
OK 

 
 
両方のエージェントから情報が出力されました
 
org.apache.flume.pluginsのコードリファレンス:https://github.com/baniuyao/flume-kafka上にも詳しい使い方があります