Flume NG学習ノート(二)単機とクラスタFlume構成
7895 ワード
以下の内容は基本的に公式サイトから来ています.http://flume.apache.org/FlumeUserGuide.html英語が上手なら、私のところを見なくてもいいです...
この文書では、最新バージョンのapache flume 1.5を使用しています.Flumeをインストールし、Flumeが使用できるかどうかをテストします.Flumeディレクトリでは、次の文でテストします.
bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template
結果は図のように表示されます.
Ok、次に、一般的なアーキテクチャ、機能構成の例を見てみましょう.
一、最も簡単な単一エージェントFlume構成
次はプロファイルです.
説明すると、ここではすべての例で$FLUME_にプロファイルを配置します.HOME/confディレクトリでは、後述しません.
#ノックコマンド
flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console
#パラメータコマンド
-c conf設定ディレクトリをconfとして指定
-f conf/single_case1.conf指定プロファイルはconf/single_case1.conf
-n a 1 agentの名前をa 1と指定し、case 1_とexample.confの一貫性
-Dflume.root.logger=INFO、console指定DEBUFモードでconsole出力INFO情報
具体的なパラメータコマンドはflume-nghelpで表示してください
#そして別の端末でテスト
telnet 127.0.0.1 44444
次に、前に起動した端末を見てconsoleが次のように出力されていることを確認します.
ここでメッセージが見つかりますhello world!出力して、hello world!hello world!hello world!ブロックされましたプロファイルでは、a 1の出力方法を選択します.sinks.k1.type= logger
すなわちconsole出力であり、flume-ngはloggerに対して16バイトしか表示されず、残りはsinkによって切断される.次はLoggerSinkです.JAva:
見に行こうJAvaのdumpEventメソッド:
event処理中にデータの切り取り操作が発生したことは容易である.
Ok、次のコーナーに入ります.
二、「クラスタ」エージェントFlume構成
ここでクラスタの概念は複数のマシンの管理であり,最も簡単なのは2台のマシンの1台のエージェントホストがデータソースからデータを取得し,その後データを別のホストに転送し,出力することである.これは、ビジネス・マルチデータ・ソースの場合、各データ・ソースにエージェントを設定し、1台のエージェント・ホストにまとめて出力することを意味します.
次に、最も簡単なクラスタ構成、すなわち2つのエージェントを実現し、1台のデータソースデータを受け入れるエージェントがデータを要約したエージェントにプッシュし、要約したエージェントがデータを出力する.この2台のホストはそれぞれpush,pullです
上図によればAVRO RPCで通信する必要があるため、プッシュデータsinksタイプとプルデータのsourcesタイプはいずれもavroである.データエージェントのデータソースを引くには、前述のSpool Source形式で処理します.ここでは、ディレクトリとファイルを事前に作成しています.log
次に、プッシュエージェントホストのflumeプロファイルを設定します.
次に、プロキシホストを要約するflumeプロファイルを設定します.
Spool Sourceは非リアルタイムですが、データ量が少なく処理が早いため、pullエージェントを先に起動するしかありません.
#ノックコマンド
flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
上図は成功を示しています.
前後してpushホストのflumeを起動する
#ノックコマンド
flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console
pullホストのステータスを確認すると、データが転送されていることがわかります.
そしてpushホストのファイルを見に行きます
接尾辞を付けたCOMPLETED.これは前文の言うことと一致している.
次に、新しいデータをディレクトリ/tmp/logsに格納すると、pushホストはpullホスト出力にデータを送信し、新しいデータファイルのファイル名を変更します.
この文書では、最新バージョンのapache flume 1.5を使用しています.Flumeをインストールし、Flumeが使用できるかどうかをテストします.Flumeディレクトリでは、次の文でテストします.
bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template
結果は図のように表示されます.

Ok、次に、一般的なアーキテクチャ、機能構成の例を見てみましょう.
一、最も簡単な単一エージェントFlume構成
次はプロファイルです.
# :single_case1.conf.conf
# :
#single_case1.conf.conf: A single-node Flume configuration
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#Bind the source and sink to the channel
a1.sources.r1.channels= c1
a1.sinks.k1.channel= c1
説明すると、ここではすべての例で$FLUME_にプロファイルを配置します.HOME/confディレクトリでは、後述しません.
#ノックコマンド
flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console
#パラメータコマンド
-c conf設定ディレクトリをconfとして指定
-f conf/single_case1.conf指定プロファイルはconf/single_case1.conf
-n a 1 agentの名前をa 1と指定し、case 1_とexample.confの一貫性
-Dflume.root.logger=INFO、console指定DEBUFモードでconsole出力INFO情報
具体的なパラメータコマンドはflume-nghelpで表示してください
#そして別の端末でテスト
telnet 127.0.0.1 44444

次に、前に起動した端末を見てconsoleが次のように出力されていることを確認します.

ここでメッセージが見つかりますhello world!出力して、hello world!hello world!hello world!ブロックされましたプロファイルでは、a 1の出力方法を選択します.sinks.k1.type= logger
すなわちconsole出力であり、flume-ngはloggerに対して16バイトしか表示されず、残りはsinkによって切断される.次はLoggerSinkです.JAva:
if(event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
}
見に行こうJAvaのdumpEventメソッド:
privatestatic final int DEFAULT_MAX_BYTES = 16;
publicstatic String dumpEvent(Event event) {
return dumpEvent(event, DEFAULT_MAX_BYTES);
}
publicstatic String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
if (event == null || event.getBody() == null) {
buffer.append("null");
} else if (event.getBody().length == 0) {
// do nothing... in this case, HexDump.dump() will throw anexception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
HexDump.dump(data, 0, out, 0);
String hexDump = new String(out.toByteArray());
// remove offset since it's not relevant for such a smalldataset
if(hexDump.startsWith(HEXDUMP_OFFSET)) {
hexDump =hexDump.substring(HEXDUMP_OFFSET.length());
}
buffer.append(hexDump);
} catch (Exception e) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("Exception while dumpingevent", e);
}
buffer.append("...Exception while dumping:").append(e.getMessage());
}
String result = buffer.toString();
if(result.endsWith(EOL) && buffer.length() >EOL.length()) {
buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();
}
}
return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";
}
event処理中にデータの切り取り操作が発生したことは容易である.
Ok、次のコーナーに入ります.
二、「クラスタ」エージェントFlume構成

ここでクラスタの概念は複数のマシンの管理であり,最も簡単なのは2台のマシンの1台のエージェントホストがデータソースからデータを取得し,その後データを別のホストに転送し,出力することである.これは、ビジネス・マルチデータ・ソースの場合、各データ・ソースにエージェントを設定し、1台のエージェント・ホストにまとめて出力することを意味します.
次に、最も簡単なクラスタ構成、すなわち2つのエージェントを実現し、1台のデータソースデータを受け入れるエージェントがデータを要約したエージェントにプッシュし、要約したエージェントがデータを出力する.この2台のホストはそれぞれpush,pullです
上図によればAVRO RPCで通信する必要があるため、プッシュデータsinksタイプとプルデータのsourcesタイプはいずれもavroである.データエージェントのデータソースを引くには、前述のSpool Source形式で処理します.ここでは、ディレクトリとファイルを事前に作成しています.log

次に、プッシュエージェントホストのflumeプロファイルを設定します.
# push.conf
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= spooldir
a2.sources.r1.spoolDir= /tmp/logs
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= pull
a2.sinks.k1.port= 4444
次に、プロキシホストを要約するflumeプロファイルを設定します.
# pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= pull
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000
Spool Sourceは非リアルタイムですが、データ量が少なく処理が早いため、pullエージェントを先に起動するしかありません.
#ノックコマンド
flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console

上図は成功を示しています.
前後してpushホストのflumeを起動する
#ノックコマンド
flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console

pullホストのステータスを確認すると、データが転送されていることがわかります.
そしてpushホストのファイルを見に行きます

接尾辞を付けたCOMPLETED.これは前文の言うことと一致している.
次に、新しいデータをディレクトリ/tmp/logsに格納すると、pushホストはpullホスト出力にデータを送信し、新しいデータファイルのファイル名を変更します.