Flume(ng)カスタムsink実装と属性注入
5899 ワード
最近flumeを利用してリモートログを収集する必要があるので、flumeの最も基本的な使い方を学びます.ここでは記録だけをします.
リモートログ収集の全体的な考え方は、リモートカスタム実装log 4 jのappenderがflumeエンドにメッセージを送信し、flumeエンドカスタム実装sinkが私たちのルールに従ってログを保存することです.
カスタムSinkコード:
コンフィギュレーションケーブルインタフェースを実装することで、初期化時にコンフィギュレーションのパラメータの値をconfigureメソッドでcontextから取得できます.ここでは、flumeのプロファイルからrootPathの値、すなわちログに保存されているルートパスを取得したいと考えています.flume-conf.propertiesでは、次のように構成されています.
loggerSinkはカスタムsinkの名前で、私たちが値を取るときのkeyは、loggerSinkの後ろの部分だけでいいです.つまり、ここのrootPathです.
実際のビジネスロジックの実行は,複写AbstractSinkにおけるprocessメソッドを継承することによって実現される.ベースクラスのgetChannelメソッドからチャネルを取得し,そこからEvent処理を取り出せばよい.
リモートログ収集の全体的な考え方は、リモートカスタム実装log 4 jのappenderがflumeエンドにメッセージを送信し、flumeエンドカスタム実装sinkが私たちのルールに従ってログを保存することです.
カスタムSinkコード:
public class LocalFileLogSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
. getLogger(LocalFileLogSink .class );
private static final String PROP_KEY_ROOTPATH = "rootPath";
private String rootPath;
@Override
public void configure(Context context) {
String rootPath = context.getString(PROP_KEY_ROOTPATH );
setRootPath(rootPath);
}
@Override
public Status process() throws EventDeliveryException {
logger .debug("Do process" );
}
}
コンフィギュレーションケーブルインタフェースを実装することで、初期化時にコンフィギュレーションのパラメータの値をconfigureメソッドでcontextから取得できます.ここでは、flumeのプロファイルからrootPathの値、すなわちログに保存されているルートパスを取得したいと考えています.flume-conf.propertiesでは、次のように構成されています.
agent.sinks = loggerSink
agent.sinks.loggerSink.rootPath = ./logs
loggerSinkはカスタムsinkの名前で、私たちが値を取るときのkeyは、loggerSinkの後ろの部分だけでいいです.つまり、ここのrootPathです.
実際のビジネスロジックの実行は,複写AbstractSinkにおけるprocessメソッドを継承することによって実現される.ベースクラスのgetChannelメソッドからチャネルを取得し,そこからEvent処理を取り出せばよい.
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
logger .debug("Get event." );
Event event = ch.take();
txn.commit();
status = Status. READY ;
return status;
}finally {
Log. info( "trx close.");
txn.close();
}