Flume NG学習ノート(二)単機とクラスタFlume構成

7895 ワード

以下の内容は基本的に公式サイトから来ています.http://flume.apache.org/FlumeUserGuide.html英語が上手なら、私のところを見なくてもいいです...
この文書では、最新バージョンのapache flume 1.5を使用しています.Flumeをインストールし、Flumeが使用できるかどうかをテストします.Flumeディレクトリでは、次の文でテストします.
bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template
結果は図のように表示されます.
Flume NG 学习笔记(二)单机与集群Flume 配置_第1张图片
Ok、次に、一般的なアーキテクチャ、機能構成の例を見てみましょう.
一、最も簡単な単一エージェントFlume構成
 
次はプロファイルです.
#   :single_case1.conf.conf
#    :
#single_case1.conf.conf: A single-node Flume configuration
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
 
#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port= 44444
 
#Describe the sink
a1.sinks.k1.type= logger
 
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
 
#Bind the source and sink to the channel
a1.sources.r1.channels= c1
a1.sinks.k1.channel= c1

説明すると、ここではすべての例で$FLUME_にプロファイルを配置します.HOME/confディレクトリでは、後述しません.
 
#ノックコマンド
flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console
 
#パラメータコマンド
-c conf設定ディレクトリをconfとして指定
-f conf/single_case1.conf指定プロファイルはconf/single_case1.conf
-n a 1 agentの名前をa 1と指定し、case 1_とexample.confの一貫性
-Dflume.root.logger=INFO、console指定DEBUFモードでconsole出力INFO情報
具体的なパラメータコマンドはflume-nghelpで表示してください
 
#そして別の端末でテスト
telnet 127.0.0.1 44444
Flume NG 学习笔记(二)单机与集群Flume 配置_第2张图片
次に、前に起動した端末を見てconsoleが次のように出力されていることを確認します.

ここでメッセージが見つかりますhello world!出力して、hello world!hello world!hello world!ブロックされましたプロファイルでは、a 1の出力方法を選択します.sinks.k1.type= logger
すなわちconsole出力であり、flume-ngはloggerに対して16バイトしか表示されず、残りはsinkによって切断される.次はLoggerSinkです.JAva:
if(event != null) {
       if (logger.isInfoEnabled()) {
         logger.info("Event: " + EventHelper.dumpEvent(event));
       }
}

見に行こうJAvaのdumpEventメソッド:
privatestatic final int DEFAULT_MAX_BYTES = 16;
publicstatic String dumpEvent(Event event) {
   return dumpEvent(event, DEFAULT_MAX_BYTES);
}
 
publicstatic String dumpEvent(Event event, int maxBytes) {
   StringBuilder buffer = new StringBuilder();
   if (event == null || event.getBody() == null) {
     buffer.append("null");
   } else if (event.getBody().length == 0) {
     // do nothing... in this case, HexDump.dump() will throw anexception
   } else {
     byte[] body = event.getBody();
     byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     try {
       HexDump.dump(data, 0, out, 0);
       String hexDump = new String(out.toByteArray());
       // remove offset since it's not relevant for such a smalldataset
       if(hexDump.startsWith(HEXDUMP_OFFSET)) {
         hexDump =hexDump.substring(HEXDUMP_OFFSET.length());
       }
       buffer.append(hexDump);
     } catch (Exception e) {
      if(LOGGER.isInfoEnabled()) {
        LOGGER.info("Exception while dumpingevent", e);
      }
       buffer.append("...Exception while dumping:").append(e.getMessage());
     }
     String result = buffer.toString();
     if(result.endsWith(EOL) && buffer.length() >EOL.length()) {
       buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();
     }
   }
   return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";
 }

event処理中にデータの切り取り操作が発生したことは容易である.
Ok、次のコーナーに入ります.
 
二、「クラスタ」エージェントFlume構成

ここでクラスタの概念は複数のマシンの管理であり,最も簡単なのは2台のマシンの1台のエージェントホストがデータソースからデータを取得し,その後データを別のホストに転送し,出力することである.これは、ビジネス・マルチデータ・ソースの場合、各データ・ソースにエージェントを設定し、1台のエージェント・ホストにまとめて出力することを意味します.
次に、最も簡単なクラスタ構成、すなわち2つのエージェントを実現し、1台のデータソースデータを受け入れるエージェントがデータを要約したエージェントにプッシュし、要約したエージェントがデータを出力する.この2台のホストはそれぞれpush,pullです
上図によればAVRO RPCで通信する必要があるため、プッシュデータsinksタイプとプルデータのsourcesタイプはいずれもavroである.データエージェントのデータソースを引くには、前述のSpool Source形式で処理します.ここでは、ディレクトリとファイルを事前に作成しています.log

次に、プッシュエージェントホストのflumeプロファイルを設定します.
#          push.conf
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
 
#Describe/configure the source
a2.sources.r1.type= spooldir
a2.sources.r1.spoolDir= /tmp/logs
a2.sources.r1.channels= c1
 
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
 
#Describe/configure the source
a2.sinks.k1.type= avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= pull
a2.sinks.k1.port= 4444

次に、プロキシホストを要約するflumeプロファイルを設定します.
#           pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
 
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= pull
a1.sources.r1.port= 44444
 
#Describe the sink
a1.sinks.k1.type= logger
 a1.sinks.k1.channel = c1
 
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000

Spool Sourceは非リアルタイムですが、データ量が少なく処理が早いため、pullエージェントを先に起動するしかありません.
#ノックコマンド
flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
Flume NG 学习笔记(二)单机与集群Flume 配置_第3张图片
上図は成功を示しています.
前後してpushホストのflumeを起動する
#ノックコマンド
flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console
Flume NG 学习笔记(二)单机与集群Flume 配置_第4张图片
pullホストのステータスを確認すると、データが転送されていることがわかります.
そしてpushホストのファイルを見に行きます

接尾辞を付けたCOMPLETED.これは前文の言うことと一致している.
 
次に、新しいデータをディレクトリ/tmp/logsに格納すると、pushホストはpullホスト出力にデータを送信し、新しいデータファイルのファイル名を変更します.