Flumeのデータ損失の問題


flumeでログファイルを読み取る場合、同じ秒でログファイルはまだ書き込まれているが、flumeによって読み取られ、識別する.COMPLETEDが異常を投げ出した
エラーログは次のとおりです.
2017-05-10 18:09:13,530 (cluster-ClusterId{value='5912e6c933662351ead2de96', description='null'}-192.168.0.126:27017) [INFO - com.mongodb.diagnostics.logging.SLF4JLogger.info(SLF4JLogger.java:71)] Monitor thread successfully connected to server with description ServerDescription{address=192.168.0.126:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 4, 3]}, minWireVersion=0, maxWireVersion=5, maxDocumentSize=16777216, roundTripTimeNanos=1929078}
2017-05-10 18:09:13,622 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - com.mongodb.diagnostics.logging.SLF4JLogger.info(SLF4JLogger.java:71)] Opened connection [connectionId{localValue:2, serverValue:135}] to 192.168.0.126:27017
2017-05-10 18:09:13,658 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.riderzen.flume.sink.MongoSink.process(MongoSink.java:139)] can't process events
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
	at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
	at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
	at org.riderzen.flume.sink.MongoSink.parseEvents(MongoSink.java:231)
	at org.riderzen.flume.sink.MongoSink.process(MongoSink.java:137)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)
2017-05-10 18:09:13,662 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.riderzen.flume.sink.MongoSink.parseEvents(MongoSink.java:223)] can't process events, drop it!

一時的な解決策は次のとおりです.
まずログファイルを/data/logs/rawdataに生成し、shellを書いて最後のファイルを保持し、他のファイルをflumeのログリスニングディレクトリに転送します.
機能:まずデータディレクトリを日付でフィルタリングして最後のファイルを保存し、他のファイルを新しいディレクトリに移動します.
pro.sh
#!/bin/sh
cd /data/logs/rawdata
while true; do
time=`date +%H:%M:%S`
server=`ls  |grep -v \`ls -rt|tail -1\` `
if [[ -n "$server" ]] || [[ "$time" == "00:00:00" ]]; then
mv $server /data/logs/flume
fi
sleep 2
done

注意:more/proc/cpuinfo|grep"physical id"|uniq|wc-lまずLinuxカーネルを見て、シングルコアであれば時間を「00:00:01」に設定してタスクを実行できます.
出力echo
`date +%H:%M:%S`

サイクル周波数を見てみましょう.何かいい方法があれば教えてください.)
その後、fileモードに変更するとデータが失われないことがわかりました.
次のように構成されています.
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/logs/flume
#a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader =true
a1.sources.r1.interceptors =il
a1.sources.r1.interceptors.il.type =timestamp
#a1.sources.r1.fileHeaderKey.op=upsert
a1.sources.r1.channels =c1

#a1.channels.c1.type =memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/data/logs/tmp
a1.channels.c1.dataDirs=/data/logs/complate
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100
#a1.channels.c1.keep-alive=3


a1.sinks.s1.type = org.riderzen.flume.sink.MongoSink
a1.sinks.s1.host = 192.168.0.126
a1.sinks.s1.port = 27017
a1.sinks.s1.model = single
a1.sinks.s1.db = dtc
a1.sinks.s1.collection = log
a1.sinks.s1.batch = 2
a1.sinks.s1.channel =c1