flume bucketpathのbugの一例


今日flume+kerberos書き込みhdfsで発生した問題です.テストされたプロファイル:
agent-server1.sources= testtail
agent-server1.sinks = hdfs-sink
agent-server1.channels= hdfs-channel
agent-server1.sources.testtail.type = netcat
agent-server1.sources.testtail.bind = localhost
agent-server1.sources.testtail.port = 9999
agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOP
agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytab
agent-server1.channels.hdfs-channel.type = memory
agent-server1.channels.hdfs-channel.capacity = 200000000
agent-server1.channels.hdfs-channel.transactionCapacity = 10000
agent-server1.sinks.hdfs-sink.type = hdfs
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%d
agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
agent-server1.sinks.hdfs-sink.hdfs.round = false
agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
agent-server1.sinks.hdfs-sink.channel = hdfs-channel
agent-server1.sources.testtail.channels = hdfs-channel

サービスを開始した後、telnetを使用してテストを行ったところ、次のエラーが発生しました.
14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
 Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:375)
        at java.lang.Long.valueOf(Long.java:525)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
        ... 5 more
14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
        ... 3 more
Caused by: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:375)
        at java.lang.Long.valueOf(Long.java:525)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
        ... 5 more

呼び出しスタックの情報からorgにエラーが発生する.apache.flume.formatter.output.BucketPathクラスのreplaceShorthandメソッド.org.apache.flume.sink.hdfs.HDFSEventSinkクラスでは、主にBucketPathクラスのescapeStringメソッドを呼び出して文字の変換を行い、最終的にreplaceShorthandメソッドを呼び出すプロセスメソッドを使用してhdfsのurlを生成します.ここでreplaceShorthandメソッドの関連コードは次のとおりです.
  public static String replaceShorthand(char c, Map headers,
      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
    String timestampHeader = headers.get("timestamp");
    long ts;
    try {
      ts = Long.valueOf(timestampHeader);
    } catch (NumberFormatException e) {
      throw new RuntimeException("Flume wasn't able to parse timestamp header"
        + " in the event to resolve time based bucketing. Please check that"
        + " you're correctly populating timestamp header (for example using"
        + " TimestampInterceptor source interceptor).", e);
    }
    if(needRounding){
      ts = roundDown(roundDown, unit, ts);
    }
........

コードからtimestampHeaderの値が取れないとtsに値を割り当てるときにエラーが表示されます.これはflumeのバグです.バグid:https://issues.apache.org/jira/browse/FLUME-1419解決策は3つある:1.構成を変更し、hdfsファイルのパスフォーマットを更新
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume

しかし、これでは日ごとにログを保存することはできません.関連するコードを変更します(patch:https://issues.apache.org/jira/secure/p_w_upload/12538891/FLUME-1419.patch)headersでtimestampの値が取得されない場合は、現在のtimestampの値を与えます.関連コード:
     String timestampHeader = headers.get("timestamp");
     long ts;
     try {
      if (timestampHeader == null) {
        ts = System.currentTimeMillis();
      } else {
        ts = Long.valueOf(timestampHeader);
      }
     } catch (NumberFormatException e) {
       throw new RuntimeException("Flume wasn't able to parse timestamp header"
         + " in the event to resolve time based bucketing. Please check that"
         + " you're correctly populating timestamp header (for example using"
                  + " TimestampInterceptor source interceptor).", e);
}

3.sourceのtimestampベースのinterceptorsを定義するには、構成に2行追加すればよい.
agent-server1.sources.testtail.interceptors = i1
agent-server1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

1つのテクニック:debug flumeの問題では、flumeの起動パラメータにdebugログをconsoleに打つように設定できます.
-Dflume.root.logger=DEBUG,console,LOGFILE