ビッグデータ入門の分散計算フレームワークSpark(4)--Spark Streaming統合Flume、Kafka
4272 ワード
1.概要
AppServerからlog 4 jを生成し、Flumeはhostname/portを通じてログ情報を収集し、KafkaSinkを利用してKafkaに送信し、Kafkaは2つのモードを通じてSpark Streamingに伝達し、各業務次元の統計と分析を完成し、統計結果を入庫する.
2.統合プロセス
vim streaming.conf(テストのみ)
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.6.130
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
ログのプロファイルlog 4 jを作成する.properties
導入依存
org.apache.flume.flume-ng-clients
flume-ng-log4jappender
1.6.0
# INFO 、Flume
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.6.130
log4j.appender.flume.Port = 41414
# ,
log4j.appender.flume.UnsafeMode = true
このようにプログラムを起動すると、Flumeエンドコンソールで対応するログ情報を収集します.
Zookeeperを起動し、Kafkaをドッキング
# Zookeeper
zkServer.sh start
# Kafka
kafka-server-start.sh -daemon /home/Kiku/app/kafka_2.11-0.9.0.0/config/server.properties
# topic
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test-topic
vim streaming2.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.6.130
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic=test-topic
agent1.sinks.kafka-sink.brokerList=hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks=1
agent1.sinks.kafka-sink.batchSize=20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
Flumeの起動
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
Spark Streaming Kafkaデータ受信
/**
* Spark Streaming Kafka
*/
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
// hadoop ,
System.setProperty("hadoop.home.dir", "E:/winutils/")
if (args.length != 4) {
System.err.println("Usage: KafkaStreamingApp ")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf()
.setMaster("local[2]").setAppName("KafkaStreamingApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//Spark Streaming Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
// ,
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}
プロセス
ローカルテストではまずLoggerGeneratorを実行し、ログ情報を生成し、Flameを使用してhostname、portを指定してデータを受信し、バッチでKafkaに渡し、プログラムではSpark StreamingでKafkaからデータを消費する.
本番では、LoggerGeneratorをjarパッケージに、Flume、Kafkaと同様に、Spark Streamingコードもjarパッケージに、spark-submitを使用してYARNにコミットして実行する必要があります.(実際の状況に応じて、運転モードlocal/yarn/standaloneを選択できます)
vim streaming.conf(テストのみ)
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.6.130
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
ログのプロファイルlog 4 jを作成する.properties
導入依存
org.apache.flume.flume-ng-clients
flume-ng-log4jappender
1.6.0
# INFO 、Flume
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.6.130
log4j.appender.flume.Port = 41414
# ,
log4j.appender.flume.UnsafeMode = true
このようにプログラムを起動すると、Flumeエンドコンソールで対応するログ情報を収集します.
Zookeeperを起動し、Kafkaをドッキング
# Zookeeper
zkServer.sh start
# Kafka
kafka-server-start.sh -daemon /home/Kiku/app/kafka_2.11-0.9.0.0/config/server.properties
# topic
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test-topic
vim streaming2.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.6.130
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic=test-topic
agent1.sinks.kafka-sink.brokerList=hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks=1
agent1.sinks.kafka-sink.batchSize=20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
Flumeの起動
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
Spark Streaming Kafkaデータ受信
/**
* Spark Streaming Kafka
*/
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
// hadoop ,
System.setProperty("hadoop.home.dir", "E:/winutils/")
if (args.length != 4) {
System.err.println("Usage: KafkaStreamingApp ")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf()
.setMaster("local[2]").setAppName("KafkaStreamingApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//Spark Streaming Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
// ,
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}
プロセス
ローカルテストではまずLoggerGeneratorを実行し、ログ情報を生成し、Flameを使用してhostname、portを指定してデータを受信し、バッチでKafkaに渡し、プログラムではSpark StreamingでKafkaからデータを消費する.
本番では、LoggerGeneratorをjarパッケージに、Flume、Kafkaと同様に、Spark Streamingコードもjarパッケージに、spark-submitを使用してYARNにコミットして実行する必要があります.(実際の状況に応じて、運転モードlocal/yarn/standaloneを選択できます)