Flume+Kafka消費リアルタイムログ
5624 ワード
環境のインストール
前提はJDK 1.8 Hadoop 2.7をインストールしました。 zookeeper 3.4 scala 2.12
1 Kafkaを取り付けます
1)インストールパッケージをダウンロードする
参考:http://blog.csdn.net/u014035172/article/details/68061463
まず、公式サイトで最新バージョンのKafkaをダウンロードして、あるフォルダに解凍します。
2)配置環境:内部のserver.propertiesファイルを編集し、主に以下の情報を配置する。
3)起動:
または
4)試験の取り付けが成功したかどうか(ZKが起動されたことを前提として)
Kafkaは私達のために連続性テストを行うためのconsolieを提供しています。これはproducer:bin/kafka-consolie-producer.sh--brooker-list locast:9092--topic testを実行します。これはproducerを開くコマンドラインに相当します。コマンドラインのパラメータは後で説明します。次にconsumerを実行します。新しいterminal:bin/kafka-consolie-consumer.sh--zookeeper local host:2181--topic test--from-beginningがconsumerの命令を実行した後、あなたはproducerのterminalに情報を入力して、すぐにconsumerのminterminalにあなたの情報を出力します。通信クライアントのようです。
もし5が実行されているのを見たら、あなたのパーソナル版の配置が成功したと説明します。二つのコマンドのパラメータの意味を説明します。
ここに来てKafkaの任務は完了しました。
2.Flumeをインストールする
1)インストールパッケージをダウンロードする
まず、最新バージョンのKafkaを公式サイトでダウンロードし、あるフォルダに解凍します。
2)配置環境
参考:http://www.cnblogs.com/the-tops/p/6008040.html
copy flum-conf.properties.template flum-conf.properties
agent.sources=seqGenSrc agent.cnnels=memory Channel agent.sinks=loggerSink(「For of the sources」)the type is defined agent.sources.seqGenSrc.type=exec agent.sources.seqGenSrc.com.cmand=tail-F/home/man*/proj/logg/logg.logge.sources.seqGenSrc.chanelse=Thements皱Each sink's type must be defined agent.sinks.loggerSink.type=org.apache.fluume.sink.kafka.KafkaSink/ここではプラグインで下のagent.sinks.loggerSink.topic=testlogs/これはtopicで、あとkafka消費者はこのagent.sinks.loginger Sink.metadata.brook.list=locast=Locast=90。 //定義するには、flume起動が間違っています。ヒント初期化bootstststststststststststststoragent.sinks.sinks.loggerSink.partition.org.apartache. fume.plugins.SinglePatition agagent.sinks.logger Sink.serializzr.class=orggggggggggzzzzzzzkazzzzzzzzzzzzzzzzzzzzzzzzzzzininininininfffffffffffffffffffffffffffffffffffffffrererererererererererSink.max.message.size=20 agent.sinks.loggerSink.producer.type=sync agent.sinks.loggerSink.custom.encoding=UTF-8
3)起動
2017-09-30 19:50:22,695(lifecycleSupervisor-1)[INFO-org.apphe.flum.instructionation.MonitodeCounterGroup.start(MonitodeCounterGroup.java:95)Coponenttype:SINK,name loggelstarted
これでflumeの設置と配置が完了しました。
2.リアルタイムログの準備
簡単にログを作成するプログラムを書きます。
log 4 j.propertiesログのパス構成は上記flumeでログを読み込むアドレスと同じです。
パッケージ運転は、上のプログラムからどんどんログが発生します。
4.消費者プログラムを書く
前提はJDK 1.8 Hadoop 2.7をインストールしました。 zookeeper 3.4 scala 2.12
1 Kafkaを取り付けます
1)インストールパッケージをダウンロードする
参考:http://blog.csdn.net/u014035172/article/details/68061463
まず、公式サイトで最新バージョンのKafkaをダウンロードして、あるフォルダに解凍します。
2)配置環境:内部のserver.propertiesファイルを編集し、主に以下の情報を配置する。
broker.id=90 # kafka id, , ip
host.name=192.168.100.90 # ip, broker ip
zookeeper.connect=192.168.100.90:2181 #zookeeper
3)起動:
nohup bin/kafka-server-start.sh config/server.properties >kafka.log 2>&1 &
または
bin/kafka-server-start.sh -daemon config/server.properties &
起动时はnohupまたは-daemen方式で起动してください。でないと、あなたは端末を闭めました。kafkaサービスも止まってしまいました。その他のkafkaサーバーの配置は上の構成を参考にして、brook er.idの配置に注意してください。サーバーごとに違います。4)試験の取り付けが成功したかどうか(ZKが起動されたことを前提として)
Kafkaは私達のために連続性テストを行うためのconsolieを提供しています。これはproducer:bin/kafka-consolie-producer.sh--brooker-list locast:9092--topic testを実行します。これはproducerを開くコマンドラインに相当します。コマンドラインのパラメータは後で説明します。次にconsumerを実行します。新しいterminal:bin/kafka-consolie-consumer.sh--zookeeper local host:2181--topic test--from-beginningがconsumerの命令を実行した後、あなたはproducerのterminalに情報を入力して、すぐにconsumerのminterminalにあなたの情報を出力します。通信クライアントのようです。
もし5が実行されているのを見たら、あなたのパーソナル版の配置が成功したと説明します。二つのコマンドのパラメータの意味を説明します。
ここに来てKafkaの任務は完了しました。
2.Flumeをインストールする
1)インストールパッケージをダウンロードする
まず、最新バージョンのKafkaを公式サイトでダウンロードし、あるフォルダに解凍します。
2)配置環境
参考:http://www.cnblogs.com/the-tops/p/6008040.html
copy flum-conf.properties.template flum-conf.properties
agent.sources=seqGenSrc agent.cnnels=memory Channel agent.sinks=loggerSink(「For of the sources」)the type is defined agent.sources.seqGenSrc.type=exec agent.sources.seqGenSrc.com.cmand=tail-F/home/man*/proj/logg/logg.logge.sources.seqGenSrc.chanelse=Thements皱Each sink's type must be defined agent.sinks.loggerSink.type=org.apache.fluume.sink.kafka.KafkaSink/ここではプラグインで下のagent.sinks.loggerSink.topic=testlogs/これはtopicで、あとkafka消費者はこのagent.sinks.loginger Sink.metadata.brook.list=locast=Locast=90。 //定義するには、flume起動が間違っています。ヒント初期化bootstststststststststststststoragent.sinks.sinks.loggerSink.partition.org.apartache. fume.plugins.SinglePatition agagent.sinks.logger Sink.serializzr.class=orggggggggggzzzzzzzkazzzzzzzzzzzzzzzzzzzzzzzzzzzininininininfffffffffffffffffffffffffffffffffffffffrererererererererererSink.max.message.size=20 agent.sinks.loggerSink.producer.type=sync agent.sinks.loggerSink.custom.encoding=UTF-8
3)起動
bin
/flume-ng
agent --conf conf --conf-
file
conf
/roomy
.conf --name producer -Dflume.root.logger=INFO,console
エラーがなければ起動します。2017-09-30 19:50:22,695(lifecycleSupervisor-1)[INFO-org.apphe.flum.instructionation.MonitodeCounterGroup.start(MonitodeCounterGroup.java:95)Coponenttype:SINK,name loggelstarted
これでflumeの設置と配置が完了しました。
2.リアルタイムログの準備
簡単にログを作成するプログラムを書きます。
public class LogGenetator implements Runnable {
Logger logger = Logger.getLogger(LogGenetator.class);
private int num;
public LogGenetator(int num) {
this.num = num;
}
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
new Thread(new LogGenetator(i)).start();
}
}
public void run() {
while (true) {
logger.debug("Test infomation produced by " + Thread.currentThread().getName());
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
log 4 j.propertiesログのパス構成は上記flumeでログを読み込むアドレスと同じです。
パッケージ運転は、上のプログラムからどんどんログが発生します。
4.消費者プログラムを書く
public class LogConsumer {
public static void main(String[] args) {
// Create a consumer
KafkaConsumer consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", "localhost:9092");
// Set the consumer group (all consumers must belong to a group).
//kafka groupid,
properties.setProperty("group.id", "test-consumer-group");
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start
// reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset", "earliest");
consumer = new KafkaConsumer(properties);
// flume topic
consumer.subscribe(Arrays.asList("testlog"));
// Loop until ctrl + c
int count = 0;
while (true) {
// Poll for records
ConsumerRecords records = consumer.poll(20);
// Did we get any?
if (records.count() == 0) {
System.out.println("records count is 0");
} else {
// Yes, loop over records
for (ConsumerRecord record : records) {
// Display record and count
count += 1;
System.out.println(count + ": " + record.value());
}
}
}
}
}
パッケージ運転は、コンソールで第三段階のリアルタイムログを見ることができます。