Flume(ng)カスタムsink実装と属性注入

5899 ワード

最近flumeを利用してリモートログを収集する必要があるので、flumeの最も基本的な使い方を学びます.ここでは記録だけをします.
リモートログ収集の全体的な考え方は、リモートカスタム実装log 4 jのappenderがflumeエンドにメッセージを送信し、flumeエンドカスタム実装sinkが私たちのルールに従ってログを保存することです.
カスタムSinkコード:
public class LocalFileLogSink extends AbstractSink implements Configurable {
     private static final Logger logger = LoggerFactory
              . getLogger(LocalFileLogSink .class );
            private static final String PROP_KEY_ROOTPATH = "rootPath";
      private String rootPath;
     @Override
     public void configure(Context context) {
          String rootPath = context.getString(PROP_KEY_ROOTPATH );
          setRootPath(rootPath);
     }
          
          @Override
          public Status process() throws EventDeliveryException {
           logger .debug("Do process" );
       
}

コンフィギュレーションケーブルインタフェースを実装することで、初期化時にコンフィギュレーションのパラメータの値をconfigureメソッドでcontextから取得できます.ここでは、flumeのプロファイルからrootPathの値、すなわちログに保存されているルートパスを取得したいと考えています.flume-conf.propertiesでは、次のように構成されています.
agent.sinks = loggerSink
agent.sinks.loggerSink.rootPath = ./logs

loggerSinkはカスタムsinkの名前で、私たちが値を取るときのkeyは、loggerSinkの後ろの部分だけでいいです.つまり、ここのrootPathです.
実際のビジネスロジックの実行は,複写AbstractSinkにおけるprocessメソッドを継承することによって実現される.ベースクラスのgetChannelメソッドからチャネルを取得し,そこからEvent処理を取り出せばよい.
 Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
          txn.begin();
           try {
               logger .debug("Get event." );
              Event event = ch.take();
              txn.commit();
              status = Status. READY ;
              return status;
                    finally {
              Log. info( "trx close.");
              txn.close();
          }