ビッグデータ入門の分散計算フレームワークSpark(4)--Spark Streaming統合Flume、Kafka

4272 ワード

1.概要


AppServerからlog 4 jを生成し、Flumeはhostname/portを通じてログ情報を収集し、KafkaSinkを利用してKafkaに送信し、Kafkaは2つのモードを通じてSpark Streamingに伝達し、各業務次元の統計と分析を完成し、統計結果を入庫する.

2.統合プロセス


vim streaming.conf(テストのみ)

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.6.130
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
 flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

ログのプロファイルlog 4 jを作成する.properties


導入依存
        
        
            org.apache.flume.flume-ng-clients
            flume-ng-log4jappender
            1.6.0
        
# INFO   、Flume 
log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.6.130
log4j.appender.flume.Port = 41414
#  , 
log4j.appender.flume.UnsafeMode = true

このようにプログラムを起動すると、Flumeエンドコンソールで対応するログ情報を収集します.

Zookeeperを起動し、Kafkaをドッキング

#  Zookeeper
zkServer.sh start
#  Kafka
kafka-server-start.sh -daemon /home/Kiku/app/kafka_2.11-0.9.0.0/config/server.properties
#  topic
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test-topic

vim streaming2.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.6.130
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic=test-topic
agent1.sinks.kafka-sink.brokerList=hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks=1
agent1.sinks.kafka-sink.batchSize=20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

Flumeの起動

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

Spark Streaming Kafkaデータ受信

/**
  * Spark Streaming   Kafka
  */
object KafkaStreamingApp {

  def main(args: Array[String]): Unit = {

    // hadoop , 
    System.setProperty("hadoop.home.dir", "E:/winutils/")

    if (args.length != 4) {
      System.err.println("Usage: KafkaStreamingApp    ")
    }

    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf()
        .setMaster("local[2]").setAppName("KafkaStreamingApp")
    val ssc = new StreamingContext(sparkConf, Seconds(5))


    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //Spark Streaming  Kafka
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    // , 
    messages.map(_._2).count().print()


    ssc.start()
    ssc.awaitTermination()
  }
}

プロセス


ローカルテストではまずLoggerGeneratorを実行し、ログ情報を生成し、Flameを使用してhostname、portを指定してデータを受信し、バッチでKafkaに渡し、プログラムではSpark StreamingでKafkaからデータを消費する.
本番では、LoggerGeneratorをjarパッケージに、Flume、Kafkaと同様に、Spark Streamingコードもjarパッケージに、spark-submitを使用してYARNにコミットして実行する必要があります.(実際の状況に応じて、運転モードlocal/yarn/standaloneを選択できます)