flumeのHFS Sink詳細解(転載)

90454 ワード

原文の住所:http://www.aboutyun.com/thread-21422-1-1.html
一、重要な問題点
1.どのような構成がhdfsファイルのクローズに影響しますか?2.HFS Sinkで毎回トリガするイベントは何ですか?3.HFS Sinkにおけるパラメータの優先度はどうなりますか?
二、HFS Sinkとファイルを書く関連配置
hdfs.path->hdfsディレクトリパス
hdfs.filePrefix->ファイルプレフィックス。標準値FlueData
hdfs.fileSuffix->ファイルの拡張子
hdfs.rollInterval->どのぐらいの時間後にclose hdfsファイルを作成しますか?単位は秒で、デフォルトは30秒です。0に設定すると時間によってはclose hdfsファイルが表示されません。
hdfs.rollSize->ファイルサイズが一定値を超えたら、closeファイル。デフォルトの値1024、単位はバイトです。0に設定するとファイルサイズに基づいていないことを示します。
hdfs.rollCount->いくつかのイベントを書き込んだ後、closeファイルを作成します。標準値は10個です。0に設定するとイベント数に基づいていないことを表します。
hdfs.fileType->ファイルフォーマットは、3つのフォーマットがあります。選択できます。
hdfs.batch Size->バッチ数は、HFS Sinkが毎回Channelから取るイベントの個数です。標準値100
hdfs.minBlockReplicas->HFSの各ブロックの最小のreplicas数字は、セットしないとhadoopの中の配置を取ります。
hdfs.maxOpenFiles->最大開いているファイル数を許可します。デフォルトは5000です。この値を超えると、早いファイルは閉じられます。
serializer->HFS Sinkはファイルを書く時、プログレッシブ操作を行います。対応するSerializer言い訳を呼び出しますので、ご希望のSerializerをカスタマイズできます。
hdfs.retryInterval->HFSファイルのクローズに失敗した後、再度オフを試みる遅延数は、単位が秒です。
hdfs.callTimeout->HFS操作が許容される時間、例えばhdfsファイルのopen、write、flash、close操作。単位はミリ秒で、標準値は10000です。
三、HDFSSイベントSink分析
hdfs.pathで、hdfs.filePrefixとhdfs.fileSuffixはそれぞれ/data/%Y/%m/%H,flume,txtを例にして、ソースを分析します。
HDCFEventSinkのprocess方法を直接見る:
public Status process() throws EventDeliveryException {
    //   Channel
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    //     BucketWriter  ,BucketWriter    hdfs          
    List<BucketWriter> writers = Lists.newArrayList();
    // Channel     
    transaction.begin();
    try {
      int txnEventCount = 0;
      //     batchSize   。   batchSize       hdfs.batchSize
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        Event event = channel.take();
        if (event == null) {
          break;
        }
 
        //   hdfs       
        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);
        //   hdfs   , fileName       hdfs.filePrefix, flume
        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
          timeZone, needRounding, roundUnit, roundValue, useLocalTime);
 
        //   hdfs    ,     path,filePrefix,fileSuffix
        //      lookupPath  /data/2015/07/20/15/flume
        String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
        BucketWriter bucketWriter;
        HDFSWriter hdfsWriter = null;
        //         
        WriterCallback closeCallback = new WriterCallback() {
          @Override
          public void run(String bucketPath) {
            LOG.info("Writer callback called.");
            synchronized (sfWritersLock) {
              // sfWriters   HashMap,    maxOpenFiles    。  maxOpenFiles            
              //          hdfs  close     sfWriters        。          maxOpenFiles
              // sfWriters  Map  key    hdfs  ,value BucketWriter
              sfWriters.remove(bucketPath);
            }
          }
        };
        synchronized (sfWritersLock) {
          //    sfWriters      key /data/2015/07/20/15/flume BucketWriter
          bucketWriter = sfWriters.get(lookupPath);
          if (bucketWriter == null) {
              //         BucketWriter
            //    fileType     HDFSWriter,fileType   3   ,   SequenceFile, DataStream or CompressedStream
            hdfsWriter = writerFactory.getWriter(fileType);
            //     BucketWriter,       hdfsWriter      ,BucketWriter hdfs        HDFSWriter
            bucketWriter = initializeBucketWriter(realPath, realName,
              lookupPath, hdfsWriter, closeCallback);
            //     BucketWriter   sfWriters 
            sfWriters.put(lookupPath, bucketWriter);
          }
        }
 
        //  BucketWriter   writers   
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }
 
        //  hdfs  
        try {
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
            "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }
      }
 
      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }
 
      //          flush   hdfs  
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }
 
      //     
      transaction.commit();
 
      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
        return Status.READY;
      }
    } catch (IOException eIO) {
      //         
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      return Status.BACKOFF;
    } catch (Throwable th) {
      //         
      transaction.rollback();
      LOG.error("process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      //     
      transaction.close();
    }
}
四、BucketWriter分析
次にBucketWriterのアプリとflushの方法を見ます。
apped方法:
public synchronized void append(final Event event)
      throws IOException, InterruptedException {
    checkAndThrowInterruptedException();
    // If idleFuture is not null, cancel it before we move forward to avoid a
    // close call in the middle of the append.
    if(idleFuture != null) {
      idleFuture.cancel(false);
      // There is still a small race condition - if the idleFuture is already
      // running, interrupting it can cause HDFS close operation to throw -
      // so we cannot interrupt it while running. If the future could not be
      // cancelled, it is already running - wait for it to finish before
      // attempting to write.
      if(!idleFuture.isDone()) {
        try {
          idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
          LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
            " file close may have failed", ex);
        } catch (Exception ex) {
          LOG.warn("Error while trying to cancel closing of idle file. ", ex);
        }
      }
      idleFuture = null;
    }
 
    //   hdfs       
    if (!isOpen) {
      // hdfs         
      if (closed) {
        throw new BucketClosedException("This bucket writer was closed and " +
          "this handle is thus no longer valid");
      }
      //   hdfs  
      open();
    }
 
    //            
    if (shouldRotate()) {
      boolean doRotate = true;
 
      if (isUnderReplicated) {
        if (maxConsecUnderReplRotations > 0 &&
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
          doRotate = false;
          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
            LOG.error("Hit max consecutive under-replication rotations ({}); " +
                "will not continue rolling files under this path due to " +
                "under-replication", maxConsecUnderReplRotations);
          }
        } else {
          LOG.warn("Block Under-replication detected. Rotating file.");
        }
        consecutiveUnderReplRotateCount++;
      } else {
        consecutiveUnderReplRotateCount = 0;
      }
 
      if (doRotate) {
          //                  ,         。   close      ,            
        close();
        open();
      }
    }
 
    //  event  
    try {
      sinkCounter.incrementEventDrainAttemptCount();
      callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          //         HDFSWriter append  
          writer.append(event); // could block
          return null;
        }
      });
    } catch (IOException e) {
      LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
          bucketPath + ") and rethrowing exception.",
          e.getMessage());
      try {
        close(true);
      } catch (IOException e2) {
        LOG.warn("Caught IOException while closing file (" +
             bucketPath + "). Exception follows.", e2);
      }
      throw e;
    }
 
    //     +  
    processSize += event.getBody().length;
    //     +1
    eventCounter++;
    //    +1
    batchCounter++;
 
    //         hdfs.batchSize    flush  
    if (batchCounter == batchSize) {
      flush();
    }
}
まずopen方法を見て、hdfsファイルを開く方法:
private void open() throws IOException, InterruptedException {
    // hdfs     HDFSWriter         
    if ((filePath == null) || (writer == null)) {
      throw new IOException("Invalid file settings");
    }
 
    final Configuration config = new Configuration();
    // disable FileSystem JVM shutdown hook
    config.setBoolean("fs.automatic.close", false);
 
    // Hadoop is not thread safe when doing certain RPC operations,
    // including getFileSystem(), when running under Kerberos.
    // open() must be called by one thread at a time in the JVM.
    // NOTE: tried synchronizing on the underlying Kerberos principal previously
    // which caused deadlocks. See FLUME-1231.
    synchronized (staticLock) {
      checkAndThrowInterruptedException();
 
      try {
          // fileExtensionCounter   AtomicLong     ,            
        //        ,         ,    open      。      BucketWriter  open               1
        //      counter
        long counter = fileExtensionCounter.incrementAndGet();
 
        //            ,      flume               
        //    fullFileName   flume.1437375933234
        String fullFileName = fileName + "." + counter;
 
        //      , fullFileName   flume.1437375933234.txt
        if (fileSuffix != null && fileSuffix.length() > 0) {
          fullFileName += fileSuffix;
        } else if (codeC != null) {
          fullFileName += codeC.getDefaultExtension();
        }
 
        //      inUsePrefix inUseSuffix。            "" ".tmp"
        // buckerPath  /data/2015/07/20/15/flume.1437375933234.txt.tmp
        bucketPath = filePath + "/" + inUsePrefix
          + fullFileName + inUseSuffix;
        // targetPath  /data/2015/07/20/15/flume.1437375933234.txt
        targetPath = filePath + "/" + fullFileName;
 
        LOG.info("Creating " + bucketPath);
        callWithTimeout(new CallRunner<Void>() {
          @Override
          public Void call() throws Exception {
            if (codeC == null) {
              // Need to get reference to FS using above config before underlying
              // writer does in order to avoid shutdown hook &
              // IllegalStateExceptions
              if(!mockFsInjected) {
                fileSystem = new Path(bucketPath).getFileSystem(
                  config);
              }
              //   HDFSWriter    
              writer.open(bucketPath);
            } else {
              // need to get reference to FS before writer does to
              // avoid shutdown hook
              if(!mockFsInjected) {
                fileSystem = new Path(bucketPath).getFileSystem(
                  config);
              }
              //   HDFSWriter    
              writer.open(bucketPath, codeC, compType);
            }
            return null;
          }
        });
      } catch (Exception ex) {
        sinkCounter.incrementConnectionFailedCount();
        if (ex instanceof IOException) {
          throw (IOException) ex;
        } else {
          throw Throwables.propagate(ex);
        }
      }
    }
    isClosedMethod = getRefIsClosed();
    sinkCounter.incrementConnectionCreatedCount();
    //        
    resetCounters();
 
    //      hdfs.rollInterval     ,       close  
    if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch(Throwable t) {
            LOG.error("Unexpected error", t);
          }
          return null;
        }
      };
      //           。             timedRollFuture    
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
          TimeUnit.SECONDS);
    }
 
    isOpen = true;
}
flashメソッドは、closeとappedメソッド(処理されたイベント数がバッチ数に等しい)でのみ呼び出されます。

public synchronized void flush() throws IOException, InterruptedException {
    checkAndThrowInterruptedException();
    if (!isBatchComplete()) { //isBatchComplete  batchCount    0。       batchCount  0,      
      doFlush(); // doFlush     HDFSWriter sync  ,   batchCount   0
 
      // idleTimeout    ,        
      if(idleTimeout > 0) {
        // if the future exists and couldn't be cancelled, that would mean it has already run
        // or been cancelled
        if(idleFuture == null || idleFuture.cancel(false)) {
          Callable<Void> idleAction = new Callable<Void>() {
            public Void call() throws Exception {
              LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
                System.currentTimeMillis());
              if (isOpen) {
                close(true);
              }
              return null;
            }
          };
          idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
              TimeUnit.SECONDS);
        }
      }
    }
}
close方法:
public synchronized void close(boolean callCloseCallback)
        throws IOException, InterruptedException {
    checkAndThrowInterruptedException();
    try {
      // close      flush  ,  batchCount,   HDFSWriter sync  
      flush();
    } catch (IOException e) {
      LOG.warn("pre-close flush failed", e);
    }
    boolean failedToClose = false;
    LOG.info("Closing {}", bucketPath);
    //         ,       HDFSWriter close  
    CallRunner<Void> closeCallRunner = createCloseCallRunner();
    if (isOpen) { //        
      try {
          //   HDFSWriter close  
        callWithTimeout(closeCallRunner);
        sinkCounter.incrementConnectionClosedCount();
      } catch (IOException e) {
        LOG.warn(
          "failed to close() HDFSWriter for file (" + bucketPath +
            "). Exception follows.", e);
        sinkCounter.incrementConnectionFailedCount();
        failedToClose = true;
        //             ,retryInterval      
        final Callable<Void> scheduledClose =
          createScheduledCloseCallable(closeCallRunner);
        timedRollerPool.schedule(scheduledClose, retryInterval,
          TimeUnit.SECONDS);
      }
      isOpen = false;
    } else {
      LOG.info("HDFSWriter is already closed: {}", bucketPath);
    }
 
    // timedRollFuture    hdfs.rollInterval         。  hdfs.rollInterval   0,          
    //    close  ,       hdfs.rollInterval      flush  ,        ,        
    //       timedRollFuture     
    if (timedRollFuture != null && !timedRollFuture.isDone()) {
      timedRollFuture.cancel(false); // do not cancel myself if running!
      timedRollFuture = null;
    }
 
    //     hdfs.idleTimeout,     
    if (idleFuture != null && !idleFuture.isDone()) {
      idleFuture.cancel(false); // do not cancel myself if running!
      idleFuture = null;
    }
 
    //      ,     ,       
    if (bucketPath != null && fileSystem != null && !failedToClose) {
      //   /data/2015/07/20/15/flume.1437375933234.txt.tmp      /data/2015/07/20/15/flume.1437375933234.txt
      renameBucket(bucketPath, targetPath, fileSystem);
    }
    if (callCloseCallback) { // callCloseCallback close     
 
      //            ,   BucketWriter onCloseCallback  
      //   onCloseCallback     HDFSEventSink      closeCallback。     sfWriters.remove(bucketPath);
      //   onCloseCallback   true,      BucketWriter      open     。        。
      // onCloseCallback   append     shouldRotate       close         false,      true
      runCloseAction(); 
 
      closed = true;
    }
}
振り返ってみますと、appedメソッドのshuldRotate方法は、shuldRotate方法が実行されていくとファイルを閉じて新しいファイルを再度開くことになります。
private boolean shouldRotate() {
    boolean doRotate = false;
 
    //   HDFSWriter isUnderReplicated  ,      hdfs        。
    if (writer.isUnderReplicated()) {
      this.isUnderReplicated = true;
      doRotate = true;
    } else {
      this.isUnderReplicated = false;
    }
 
    // rollCount     hdfs.rollCount。 eventCounter     rollCount  , close  ,        
    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }
 
    // rollSize     hdfs.rollSize。processSize             。 processSize  rollSize   , close  ,        
    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }
 
    return doRotate;
}
五、HDFSS Writer分析
各BucketWriterに対応するのは一つのHFS Writerだけです。
HDFSS Writerは一つのインターフェースで、三つの具体的な実装クラスがあります。それぞれfileTypeに対応するのは、DataStream、SequenceFile、CommpresedStreamです。HDFSS Data Streamを例にとって、BUCET Writerで使用するHFSS Writerのいくつかの方法を分析します。
apped方法、hdfsファイルを書く:
@Override
public void append(Event e) throws IOException {
    //     ,    serializer write  
    // serializer org.apache.flume.serialization.EventSerializer      
    //    Serializer BodyTextEventSerializer
    serializer.write(e);
}
openメソッド:
@Override
public void open(String filePath) throws IOException {
    Configuration conf = new Configuration();
    //   hdfs  
    Path dstPath = new Path(filePath);
    FileSystem hdfs = getDfs(conf, dstPath);
    //   doOpen  
    doOpen(conf, dstPath, hdfs);
}
 
protected void doOpen(Configuration conf,
    Path dstPath, FileSystem hdfs) throws
        IOException {
    if(useRawLocalFileSystem) {
      if(hdfs instanceof LocalFileSystem) {
        hdfs = ((LocalFileSystem)hdfs).getRaw();
      } else {
        logger.warn("useRawLocalFileSystem is set to true but file system " +
            "is not of type LocalFileSystem: " + hdfs.getClass().getName());
      }
    }
 
    boolean appending = false;
    //   FSDataOutputStream,    outStream
    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
            (dstPath)) {
      outStream = hdfs.append(dstPath);
      appending = true;
    } else {
      outStream = hdfs.create(dstPath);
    }
 
    //    Serializer
    serializer = EventSerializerFactory.getInstance(
        serializerType, serializerContext, outStream);
    if (appending && !serializer.supportsReopen()) {
      outStream.close();
      serializer = null;
      throw new IOException("serializer (" + serializerType +
          ") does not support append");
    }
 
    // must call superclass to check for replication issues
    registerCurrentStream(outStream, hdfs, dstPath);
 
    if (appending) {
      serializer.afterReopen();
    } else {
      serializer.afterCreate();
    }
}
close方法:
@Override
public void close() throws IOException {
    serializer.flush();
    serializer.beforeClose();
    outStream.flush();
    outStream.sync();
    outStream.close();
 
    unregisterCurrentStream();
}
syncメソッド:
@Override
public void sync() throws IOException {
    serializer.flush();
    outStream.flush();
    outStream.sync();
}
isUnider Replicated方法は、Abstract HDCFSRで定義されています。
@Override
public boolean isUnderReplicated() {
    try {
      //       replication    
      int numBlocks = getNumCurrentReplicas();
      if (numBlocks == -1) {
        return false;
      }
      int desiredBlocks;
      if (configuredMinReplicas != null) {
        //      hdfs.minBlockReplicas
        desiredBlocks = configuredMinReplicas;
      } else {
        //    hdfs.minBlockReplicas     hdfs    
        desiredBlocks = getFsDesiredReplication();
      }
      //                       ,  true
      return numBlocks < desiredBlocks;
    } catch (IllegalAccessException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (InvocationTargetException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (IllegalArgumentException e) {
      logger.error("Unexpected error while checking replication factor", e);
    }
    return false;
}
六、まとめ
hdfs.rollInterval,hdfs.rollSize,hdfs.rollCount,hdfs.minBlockReplicas,hdfs.batSizeの5つの構成はhdfsファイルのクローズに影響しています。
なお、この5つの構成はhdfsファイルであり、hdfsファイルである。hdfsファイルが閉じたら、これらの構成指標は計算を再開します。BucketWriterのopenメソッドでは、reetCountersメソッドを呼び出しますので、この方法はカウンタをリセットします。また、hdfs.rollIntervalに基づくtimedRollFutureスレッドの戻り値は、close方式では破棄されている。したがって、この5つの属性は、closeファイルであり、openファイルである限り、計算を再開します。
hdfs.rollIntervalは時間と関係があり、時間がhdfs.rollInterval配置の秒数に達したら、closeファイルになります。
hdfs.rollSizeは、各eventのバイトサイズに関連しています。一つのeventのバイトが合計されてhdfs.rollSizeに等しい場合、closeファイルになります。
hdfs.rollCountは、イベントの数がhdfs.rollCountに等しい場合、closeファイルになります。
hdfs.batSizeは、イベントがhdfs.batSize個に追加された場合、つまりHDFS Sinkはhdfs.batSize個のイベントを毎回持つということで、これらのすべてのイベントが同じhdfsファイルに書き込まれているため、今回の条件をトリガし、他の4つの構成はすべて達成されていない。そしてファイルをcloseします。
hdfs.minBlockReplicasは、ファイルに対して最も小さいコピーブロック数を期待していることを示しています。したがって、hdfs.rollInterval,hdfs.rollSize,hdfs.rollCountの3つのパラメータを設定し、これらの3つのパラメータは条件に合致していませんでしたが、複数のファイルを生成しました。このパラメータの優先度はhdfs.rollSize,hdfs.rollCountよりも高いです。