flume avro sink

2717 ワード

このシーンでは、ローカルflumeによってavro sinkを用いてcdhクラスタに送信されるドロップダウンマシンが一般的であり、このマシンはavro sourceを用いてhdfsまたはkafkaにデータを書き込む
avro sink
flume-avro-sink.sources.r1.type = TAILDIR
flume-avro-sink.sources.r1.channels = c1
flume-avro-sink.sources.r1.filegroups = f1 
flume-avro-sink.sources.r1.filegroups.f1 = /home/bigdata/flume/data/*.txt
flume-avro-sink.sources.r1.positionFile = /var/log/flume/taildir_position.json
 
flume-avro-sink.channels.c1.type  =  memory 
flume-avro-sink.channels.c1.capacity = 10000
flume-avro-sink.channels.c1.transactionCapacity = 10000

flume-avro-sink.sinks.avro-sink.channel = c1
flume-avro-sink.sinks.avro-sink.type = avro
flume-avro-sink.sinks.avro-sink.hostname = spark005
flume-avro-sink.sinks.avro-sink.port = 41414

avro source
avro-source.sources = r1
avro-source.channels = c1
avro-source.sinks = hdfs-sink kafka-sink


avro-source.sources.r1.type = avro
avro-source.sources.r1.channels = c1
avro-source.sources.r1.bind = 0.0.0.0
avro-source.sources.r1.port = 414141

avro-source.channels.c1.type  =  memory 
avro-source.channels.c1.capacity = 10000
avro-source.channels.c1.transactionCapacity = 10000


avro-source.sinks.hdfs-sink.type = hdfs
avro-source.sinks.hdfs-sink.channel = c1
avro-source.sinks.hdfs-sink.hdfs.path = hdfs://spark001:8020/flume/events/%y%m%d%H%M-%[IP]
avro-source.sinks.hdfs-sink.hdfs.filePrefix = log-
avro-source.sinks.hdfs-sink.hdfs.rollSize = 10000000
avro-source.sinks.hdfs-sink.hdfs.round = true
avro-source.sinks.hdfs-sink.hdfs.roundValue = 10
avro-source.sinks.hdfs-sink.hdfs.roundUnit = minute
avro-source.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true

avro-source.sinks.kafka-sink.channel = c1
avro-source.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-source.sinks.kafka-sink.kafka.topic = flumetopic
avro-source.sinks.kafka-sink.kafka.bootstrap.servers = spark003:9092,spark004:9092,spark005:9092
avro-source.sinks.kafka-sink.kafka.flumeBatchSize = 20
avro-source.sinks.kafka-sink.kafka.producer.acks = 1
avro-source.sinks.kafka-sink.kafka.producer.linger.ms = 1
avro-source.sinks.kafka-sink.producer.compression.type = snappy

sink側のmemory channelのcapacityとtransactionCapacityはsourceより大きく設定しなければなりません.そうしないと、次のようにエラーが発生します.
 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
	at org.apache.flume.sink.AbstractRpcSink.process(Abstrac
: Avro source r1: Unable to process event batch. Exception follows.
org.apache.flume.ChannelFullException: Space for commit to queue 
couldn't be acquired. Sinks are likely not keeping up with sources, 
or the buffer size is too tight