Flume+Kafka消費リアルタイムログ

5624 ワード

環境のインストール
前提はJDK 1.8 Hadoop 2.7をインストールしました。 zookeeper 3.4 scala 2.12 
1 Kafkaを取り付けます
1)インストールパッケージをダウンロードする
参考:http://blog.csdn.net/u014035172/article/details/68061463
まず、公式サイトで最新バージョンのKafkaをダウンロードして、あるフォルダに解凍します。
2)配置環境:内部のserver.propertiesファイルを編集し、主に以下の情報を配置する。
 
broker.id=90  #  kafka      id,           ,   ip    
host.name=192.168.100.90  #         ip,     broker     ip
zookeeper.connect=192.168.100.90:2181 #zookeeper  
 
3)起動: 
nohup bin/kafka-server-start.sh config/server.properties >kafka.log 2>&1 &
 
 
または
bin/kafka-server-start.sh -daemon config/server.properties  &
起动时はnohupまたは-daemen方式で起动してください。でないと、あなたは端末を闭めました。kafkaサービスも止まってしまいました。その他のkafkaサーバーの配置は上の構成を参考にして、brook er.idの配置に注意してください。サーバーごとに違います。
 
4)試験の取り付けが成功したかどうか(ZKが起動されたことを前提として)
 
Kafkaは私達のために連続性テストを行うためのconsolieを提供しています。これはproducer:bin/kafka-consolie-producer.sh--brooker-list locast:9092--topic testを実行します。これはproducerを開くコマンドラインに相当します。コマンドラインのパラメータは後で説明します。次にconsumerを実行します。新しいterminal:bin/kafka-consolie-consumer.sh--zookeeper local host:2181--topic test--from-beginningがconsumerの命令を実行した後、あなたはproducerのterminalに情報を入力して、すぐにconsumerのminterminalにあなたの情報を出力します。通信クライアントのようです。
もし5が実行されているのを見たら、あなたのパーソナル版の配置が成功したと説明します。二つのコマンドのパラメータの意味を説明します。
ここに来てKafkaの任務は完了しました。
2.Flumeをインストールする
1)インストールパッケージをダウンロードする
まず、最新バージョンのKafkaを公式サイトでダウンロードし、あるフォルダに解凍します。
2)配置環境
参考:http://www.cnblogs.com/the-tops/p/6008040.html
copy flum-conf.properties.template flum-conf.properties
agent.sources=seqGenSrc agent.cnnels=memory Channel agent.sinks=loggerSink(「For of the sources」)the type is defined agent.sources.seqGenSrc.type=exec agent.sources.seqGenSrc.com.cmand=tail-F/home/man*/proj/logg/logg.logge.sources.seqGenSrc.chanelse=Thements皱Each sink's type must be defined agent.sinks.loggerSink.type=org.apache.fluume.sink.kafka.KafkaSink/ここではプラグインで下のagent.sinks.loggerSink.topic=testlogs/これはtopicで、あとkafka消費者はこのagent.sinks.loginger Sink.metadata.brook.list=locast=Locast=90。 //定義するには、flume起動が間違っています。ヒント初期化bootstststststststststststststoragent.sinks.sinks.loggerSink.partition.org.apartache. fume.plugins.SinglePatition agagent.sinks.logger Sink.serializzr.class=orggggggggggzzzzzzzkazzzzzzzzzzzzzzzzzzzzzzzzzzzininininininfffffffffffffffffffffffffffffffffffffffrererererererererererSink.max.message.size=20 agent.sinks.loggerSink.producer.type=sync agent.sinks.loggerSink.custom.encoding=UTF-8
 
3)起動bin/flume-ng  agent --conf conf --conf-file  conf/roomy.conf --name producer -Dflume.root.logger=INFO,consoleエラーがなければ起動します。
2017-09-30 19:50:22,695(lifecycleSupervisor-1)[INFO-org.apphe.flum.instructionation.MonitodeCounterGroup.start(MonitodeCounterGroup.java:95)Coponenttype:SINK,name loggelstarted
これでflumeの設置と配置が完了しました。
 
2.リアルタイムログの準備
簡単にログを作成するプログラムを書きます。
 
public class LogGenetator implements Runnable {
	Logger logger = Logger.getLogger(LogGenetator.class);
	private int num;
	public LogGenetator(int num) {
		this.num = num;
	}
	public static void main(String[] args) {
		for (int i = 0; i < 4; i++) {
			new Thread(new LogGenetator(i)).start();
		}
	}
	public void run() {
		while (true) {
			logger.debug("Test infomation produced by " + Thread.currentThread().getName());
			try {
				Thread.sleep((long) (Math.random() * 1000));
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}
 
log 4 j.propertiesログのパス構成は上記flumeでログを読み込むアドレスと同じです。
パッケージ運転は、上のプログラムからどんどんログが発生します。
4.消費者プログラムを書く
 
public class LogConsumer {
	public static void main(String[] args) {
		// Create a consumer
		KafkaConsumer consumer;
		// Configure the consumer
		Properties properties = new Properties();
		// Point it to the brokers
		properties.setProperty("bootstrap.servers", "localhost:9092");
		// Set the consumer group (all consumers must belong to a group).
                //kafka   groupid,      
		properties.setProperty("group.id", "test-consumer-group");
		// Set how to serialize key/value pairs
		properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// When a group is first created, it has no offset stored to start
		// reading from. This tells it to start
		// with the earliest record in the stream.
		properties.setProperty("auto.offset.reset", "earliest");
		consumer = new KafkaConsumer(properties);

		//     flume    topic  
		consumer.subscribe(Arrays.asList("testlog"));

		// Loop until ctrl + c
		int count = 0;
		while (true) {
			// Poll for records
			ConsumerRecords records = consumer.poll(20);
			// Did we get any?
			if (records.count() == 0) {
				System.out.println("records count is 0");
			} else {
				// Yes, loop over records
				for (ConsumerRecord record : records) {
					// Display record and count
					count += 1;
					System.out.println(count + ": " + record.value());
				}
			}
		}
	}
}
パッケージ運転は、コンソールで第三段階のリアルタイムログを見ることができます。