Flume基礎学習
7343 ワード
Flumeは非常に優秀なログ収集ツールです.様々な形式のログ収集をサポートしています.apacheのトップオープンソースとして、Flumeの大きなデータには幅広い応用があります.
まずFlumの解凍ディレクトリでflume-env.sh.templeteをflum.env.sh.sh.templeteに変更する必要があります.
jdkの位置を修正します.
ソurce
私たちはAvro、NetCatからできます.Http,TailDir.私たちはJava開発では通常、ロゴ4 jなどのログツールを使ってログを天別に保存していますので、tailDir Sourceに注目しています.
Taildir Source
Flume 1.7の前に、もしファイルの新規コンテンツを監視したいなら、私達が一般的に採用しているsourceはexec tailですが、これは弊害があります.あなたのサーバーのあたごが再起動した時に、データの読み込みはまだ最初から始まります.これは明らかに私達が見たいのではありません.Fluume 1.7が出ていない前に、私達の一般的な解決策は:記録を読み取った後に、現在のレコードの行番号を一つのファイルに記録して、あたごが再起動した時に、ファイルから最後の読取ファイルの行数を取得してから、監視して読み続けることです.データが失われないように保証します.重複しないようにします.
Flume 1.7にsourceのタイプが追加されました.これは一つのディレクトリの複数のファイルを監視することができ、リアルタイムで記録を読み込み保存するブレークポイントの継続機能を実現しました.
しかし、Flume 1.7でファイルの名前が変更されると、新しいファイルとして再取得されます.
Chanel
Memory Chanel
Memory ChannelはEventをメモリキューに保存します.この列が保存できるイベントの数は最大値の上限があります.Eventデータはすべてメモリに保存されているので、Memory Chanelは最高の性能を持っていますが、データが失われる恐れがあります.Flumeが崩壊したり、再起動したりすれば、Chanelに保存されているEventは全部失われます.同時にメモリ容量が限られているため、Event数が最大値またはメモリが容量の上限に達すると、Memory Chanelがデータを失うことがあります.
File Chanel
File ChannelはEventをローカルハードディスクに保存して、Memory Chanelよりもっと良い信頼性と回復性を提供しますが、ローカルファイルを操作するには、性能が悪いです.
Kafka Chanel
Kafka ChanelはEventをKafkaクラスタに保存し、File Chanelよりも優れた性能とMemory Chanelより高い信頼性を提供することができます.
sink
Avro Sink
Avro SinkはFlumeの層状収集機構の重要な構成部分である.この受信機に送信されたFlumeイベントはAvroイベントとなり、設定で指定されたホスト名/ポートペアに送信される.イベントは、設定されたチャネルから一括構成のバッチサイズで取り出される.
Kafka Sink
Kafka Sinkは、Flumevent headerのtopicとkey属性を使用して、イベントをKafkaに送信します.FluumeEventのheaderにtopic属性がある場合、このイベントはheaderのtopic属性で指定されたtopicに送信されます.FluumeEventのheaderにkey属性がある場合、この属性はこのイベントのデータ指定パーティションに使用され、同じkeyを持つイベントが同じパーティションに分割されます.key属性nullであれば、イベントはランダムなパーティションに送信されます.
特定のイベントのheaderのkeyまたはtopic属性は、スクリーンセーバをカスタマイズして設定することができます.
Flumeスクリーンショット
主に、タイムスタンプが不正とjsonデータが不完全なログをフィルタリングして、エラーログ、起動ログとイベントログを区別して、kafkaの違ったtopicに送りやすいです.参考後項の設定
flume-ngメッセンジャー
--conf-file/opt/module/flume/conf/file-flum-kafka.com nf
--name a 1-Dflum.root.logger=INFO、consolie
最初のパラメータは自分で作成したプロファイルのパスです.
2番目のパラメータはflume agentの名前です.つまり設定ファイルで定義された名前です.
3番目のパラメータはflumでInfoレベルログを印刷し、コンソールに印刷します.
大データにおけるデータ処理の流れの例
先例図から分かる.私達はログから未HFSに転化するまで消費できるデータは普通二つのFlume段階を経験します.ログファイル-->Flume-->Kafka kafka-->Flume-->HFS 二つの段階の処理構成
第一段階の配置参考
OpenWriteリリース!
まずFlumの解凍ディレクトリでflume-env.sh.templeteをflum.env.sh.sh.templeteに変更する必要があります.
jdkの位置を修正します.
ソurce
私たちはAvro、NetCatからできます.Http,TailDir.私たちはJava開発では通常、ロゴ4 jなどのログツールを使ってログを天別に保存していますので、tailDir Sourceに注目しています.
Taildir Source
Flume 1.7の前に、もしファイルの新規コンテンツを監視したいなら、私達が一般的に採用しているsourceはexec tailですが、これは弊害があります.あなたのサーバーのあたごが再起動した時に、データの読み込みはまだ最初から始まります.これは明らかに私達が見たいのではありません.Fluume 1.7が出ていない前に、私達の一般的な解決策は:記録を読み取った後に、現在のレコードの行番号を一つのファイルに記録して、あたごが再起動した時に、ファイルから最後の読取ファイルの行数を取得してから、監視して読み続けることです.データが失われないように保証します.重複しないようにします.
Flume 1.7にsourceのタイプが追加されました.これは一つのディレクトリの複数のファイルを監視することができ、リアルタイムで記録を読み込み保存するブレークポイントの継続機能を実現しました.
しかし、Flume 1.7でファイルの名前が変更されると、新しいファイルとして再取得されます.
Chanel
Memory Chanel
Memory ChannelはEventをメモリキューに保存します.この列が保存できるイベントの数は最大値の上限があります.Eventデータはすべてメモリに保存されているので、Memory Chanelは最高の性能を持っていますが、データが失われる恐れがあります.Flumeが崩壊したり、再起動したりすれば、Chanelに保存されているEventは全部失われます.同時にメモリ容量が限られているため、Event数が最大値またはメモリが容量の上限に達すると、Memory Chanelがデータを失うことがあります.
File Chanel
File ChannelはEventをローカルハードディスクに保存して、Memory Chanelよりもっと良い信頼性と回復性を提供しますが、ローカルファイルを操作するには、性能が悪いです.
Kafka Chanel
Kafka ChanelはEventをKafkaクラスタに保存し、File Chanelよりも優れた性能とMemory Chanelより高い信頼性を提供することができます.
sink
Avro Sink
Avro SinkはFlumeの層状収集機構の重要な構成部分である.この受信機に送信されたFlumeイベントはAvroイベントとなり、設定で指定されたホスト名/ポートペアに送信される.イベントは、設定されたチャネルから一括構成のバッチサイズで取り出される.
Kafka Sink
Kafka Sinkは、Flumevent headerのtopicとkey属性を使用して、イベントをKafkaに送信します.FluumeEventのheaderにtopic属性がある場合、このイベントはheaderのtopic属性で指定されたtopicに送信されます.FluumeEventのheaderにkey属性がある場合、この属性はこのイベントのデータ指定パーティションに使用され、同じkeyを持つイベントが同じパーティションに分割されます.key属性nullであれば、イベントはランダムなパーティションに送信されます.
特定のイベントのheaderのkeyまたはtopic属性は、スクリーンセーバをカスタマイズして設定することができます.
Flumeスクリーンショット
主に、タイムスタンプが不正とjsonデータが不完全なログをフィルタリングして、エラーログ、起動ログとイベントログを区別して、kafkaの違ったtopicに送りやすいです.参考後項の設定
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charset.forName("UTF-8"));
// body ,newBody , display
if (LogUtils.validateReportLog(body)) {
return event;
}
return null;
}
@Override
public List intercept(List events) {
ArrayList intercepts = new ArrayList<>();
// Event,
for (Event event : events) {
Event interceptEvent = intercept(event);
if (interceptEvent != null){
intercepts.add(interceptEvent);
}
}
return intercepts;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
起動コマンドflume-ngメッセンジャー
--conf-file/opt/module/flume/conf/file-flum-kafka.com nf
--name a 1-Dflum.root.logger=INFO、consolie
最初のパラメータは自分で作成したプロファイルのパスです.
2番目のパラメータはflume agentの名前です.つまり設定ファイルで定義された名前です.
3番目のパラメータはflumでInfoレベルログを印刷し、コンソールに印刷します.
大データにおけるデータ処理の流れの例
先例図から分かる.私達はログから未HFSに転化するまで消費できるデータは普通二つのFlume段階を経験します.
第一段階の配置参考
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.LogETLInterceptor$Builder
# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = logType
a1.sources.r1.selector.mapping.start = c1
a1.sources.r1.selector.mapping.event = c2
# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20
# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
第二段階の配置参考##
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second
##
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
## 。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
##
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
この記事はブログから一文多発プラットフォームになります.OpenWriteリリース!