[セットトップ]Flume+Kafka+SparkStreamingを使用したリアルタイムログ解析


各社はデータ分析やデータマイニングを行いたいと考えています.ログ、ETLを収集するのは第一歩です.今日はリアルタイム(リアルタイム、毎分分析)でログを収集し、ログを処理し、処理後の記録をHiveに保存し、実戦コードを添付します.
1.全体アーキテクチャ
通常、ログをどのように収集して分析するかを考えてみましょう.
まず、ビジネスログはNginx(または他の方法では、Nginxを使用してログを書き込みます)を使用して毎分ディスクに書き込まれます.次に、Sparkを使用してログを分析するには、ディスク内のファイルをHDFSにアップロードし、Sparkを処理し、図に示すようにHiveテーブルに格納する必要があります.
私たちは以前、この方法で毎日1回ログを分析していましたが、いくつかの欠点があります.
まず私たちのログはNginxを通じて毎分1つのファイルに保存して、このように1日のファイルの数はとても多くて、後続の分析の任務に不利で、だから先に1日のすべてのログのファイルを合併します
統合後、このファイルをディスクからHdfsに転送する必要がありますが、私たちのログサーバはHadoopクラスタ内にないので、直接Hdfsに転送することはできません.まず、ログサーバからHadoopクラスタがあるサーバにファイルを転送し、Hdfsにアップロードする必要があります.
最後に最も重要なのは、一日遅れてデータを分析することで、私たちの新しいビジネスニーズを満たすことができません.1時間の遅れ時間をコントロールしたほうがいいです.
以前は分析ログを収集する方法が原始的で、時間がかかり、ネットワーク伝送に多くの時間を浪費していたことがわかります.ログ量が大きいとデータが失われる可能性があります.その上で、アーキテクチャを改善しました.
全体のプロセスは、Flumeがログに書き込まれたディスクをリアルタイムで監視し、新しいログが書き込まれると、FlumeはログをKafkaにメッセージとして渡し、Spark StreamingがHiveにリアルタイムで消費することです.
では、Flumeは何ですか.なぜディスクファイルを監視できるのでしょうか.簡単に言えば、Flumeは大量のログファイルを収集、集約、移動するためのオープンソースフレームワークなので、リアルタイムでログを収集し、ログを転送するシーンに適しています.
Kafkaはメッセージシステムで、Flameが収集したログをKafkaメッセージキューに移動し、複数の場所で消費することができ、データを失わないことを保証することができます.
このアーキテクチャを通じて、収集したログは直ちにFlumeに発見されてKafkaに伝わることができて、Kafkaを通じて私達はログを各地方に使うことができて、同じログはHdfsの中に保存することができて、オフラインで分析することができて、またリアルタイムで計算することができて、その上安全性を保証することができて、基本的にリアルタイムの要求を達成することができます
全体の流れはすでにはっきりしていて、次の各突破は、私たちはシステム全体を実現し始めました.
2.実戦演習
2.1 Kafkaのインストール
インストールKafkaといくつかの基本的なコマンドをダウンロードしてここに送ってください:Kafkaインストールと概要
インストールが完了したらlauncherという名前で新規作成します.clickのtopic:
bin/kafka-topics.sh --create --zookeeper hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181 --replication-factor 2 --partitions 2 --topic launcher_click

このtopicを確認します.
bin/kafka-topics.sh --describe --zookeeper hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181 --topic launcher_click

2.2 Flumeのインストール
1、ダウンロード解凍
ダウンロード先:https://flume.apache.org/download.html 注意して住所のページをダウンロードすることに入って、清華大学のその住所を使って、さもなくばとても遅いことができます
wget http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
tar -xvf apache-flume-1.7.0-bin.tar.gz

2、プロファイルの変更
flumeディレクトリに入り、conf/flume-envを変更します.sh
export JAVA_HOME=/data/install/jdk
export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote"

プロファイルの追加:conf/flume_launcherclick.conf
# logser     flume     ,  flume  sources、channels sinks     
# sources         、channels        、sinks        
logser.sources = src_launcherclick
logser.sinks = kfk_launcherclick
logser.channels = ch_launcherclick

# source
#      TAILDIR,                   
logser.sources.src_launcherclick.type = TAILDIR
# positionFile           
logser.sources.src_launcherclick.positionFile = /data/install/flume/position/launcherclick/taildir_position.json
#       
logser.sources.src_launcherclick.filegroups = f1
#           ,          
logser.sources.src_launcherclick.filegroups.f1 = /data/launcher/stat_app/.*

# interceptor
#  kafka topic  
logser.sources.src_launcherclick.interceptors = i1 i2
logser.sources.src_launcherclick.interceptors.i1.type=static
logser.sources.src_launcherclick.interceptors.i1.key = type
logser.sources.src_launcherclick.interceptors.i1.value = launcher_click
logser.sources.src_launcherclick.interceptors.i2.type=static
logser.sources.src_launcherclick.interceptors.i2.key = topic
logser.sources.src_launcherclick.interceptors.i2.value = launcher_click

# channel
logser.channels.ch_launcherclick.type = memory
logser.channels.ch_launcherclick.capacity = 10000
logser.channels.ch_launcherclick.transactionCapacity = 1000

# kfk sink
#   sink   Kafka,          Kafka
logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink
# Kafka broker
logser.sinks.kfk_launcherclick.brokerList = 10.0.0.80:9092,10.0.0.140:9092

# Bind the source and sink to the channel
logser.sources.src_launcherclick.channels = ch_launcherclick
logser.sinks.kfk_launcherclick.channel = ch_launcherclick

3、起動
nohup bin/flume-ng agent --conf conf/ --conf-file conf/flume_launcherclick.conf --name logser -Dflume.root.logger=INFO,console >> logs/flume_launcherclick.log &

この時点でKafkaもFlumeも起動しており、構成からFlumeのモニタファイルは/data/launcher/stat_であることがわかりますapp/.*,そのため、カタログの下のファイルの内容が増加すればKafkaに送信されます.皆さんは自分でいくつかのテストログをこのカタログのファイルの下に追加して、Kafka Consumerを開いてKafkaがメッセージを受信したかどうかを見て、ここでSparkStreamingを完成してからテスト結果を見ます.
2.3 SparkStreamingプログラミング
SparkStreamingはSparkがリアルタイムストリームを処理するために使用し、リアルタイムから秒レベルまで使用できます.ここではこのようなリアルタイムは必要ありません.毎分ログ分析プログラムを実行します.主なコードは以下の通りです.
  def main(args: Array[String]) {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkConf = new SparkConf().setAppName("LauncherStreaming")

    // 60     
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    //  Kafka     
    val kafkaStream = KafkaUtils.createStream(
      ssc,
      "hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181", // Kafka     zookeeper
      "launcher-streaming", //        group.id
      Map[String, Int]("launcher_click" -> 0, "launcher_click" -> 1), //    Kafka  topic    
      StorageLevel.MEMORY_AND_DISK_SER).map(_._2) //       

    kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {
      val result = rdd.map(log => parseLog(log)) //         
        .filter(t => StringUtils.isNotBlank(t._1) && StringUtils.isNotBlank(t._2))
      //   hdfs
      result.saveAsHadoopFile(HDFS_DIR, classOf[String], classOf[String], classOf[LauncherMultipleTextOutputFormat[String, String]])
    })

    ssc.start()
    //      
    ssc.awaitTermination()
  }

編目が限られていて、完全なコードは私のgithubにアクセスします:https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/streaming/LauncherStreaming.scala
次に、パッケージをmasterにアップロードして実行します.
nohup /data/install/spark-2.0.0-bin-hadoop2.7/bin/spark-submit  --master spark://hxf:7077 --executor-memory 1G --total-executor-cores 4 --class com.analysis.main.LauncherStreaming --jars /home/hadoop/jar/kafka-clients-0.10.0.0.jar,/home/hadoop/jar/metrics-core-2.2.0.jar,/home/hadoop/jar/zkclient-0.3.jar,/home/hadoop/jar/spark-streaming-kafka-0-8_2.11-2.0.0.jar,/home/hadoop/jar/kafka_2.11-0.8.2.1.jar /home/hadoop/jar/SparkLearning.jar >> /home/hadoop/logs/LauncherDM.log &

次にテストを開始し、Flumeモニタディレクトリ/data/launcher/stat_app/.*ログを書き込みます.元のログの内容は次のようになります.
118.120.102.3|1495608541.238|UEsDBBQACAgIACB2uEoAAAAAAAAAAAAAAAABAAAAMGWUbW7bMAyGb6NfnUFRFEWhJ+gBdgBZVjpjjp04brMAO*yY2DKa9Y+B1+DnQ1LCztoITgK4wPGHfNUhmKGUPOn3DyP*zdOxSWM3T33XXMqy9OP7xXTZiTC1xlL0HgMEi+BfHoooBEGKr3fPpYy5jMse4Xzupus4TKkrs4kZOhI51CgWWKxsUQBRPMDr1*w5Hcuc0LiUEFBwdXQxAARXHb3+QXlOfzya0uZWOGwlEwBDwLD5oJBVFHsEEPF2U0EUToyr8k4tg9v8AkRrIcKmxGsU2eqQIM45dKuKFICo5oveEqOjh2JAIITImyIJqBk3JS4qh7Wby*TroxnL9ZKHXrsyWeBQoMXaEgXUKh6mOQ1l7NLc*Hwz8aDpAtndLFJEetkVc6S9V*bg+RFiKMvnTv6ahuGUTmWexqEfi3Elezx0botJrCCQn5jfCzWaqaUOqNpFYO23ckYl5GOlx4rLQuUllh27SsjZyLQTUn4K+3uVczlOi+7uuMzTYLoibeIspk71DtKuJC+7T5qXPg9lLddaZs6+Lolnj7ANW0dBGKOn72m3cbQJI2Kq4*C6Xhz9E5Pzeeg*i2l1IAJtpReILNq6DY4peFjHeO5vffPZd2UyejEJ28Puo0sI*2*5ojvhfNcquWomFMVp02Pz++M6Nach3e6XR5wOlrdSg4T7RkgtQAuC6HYl2sc62i6dUq*om+HWjvdHAPSk8hYkegHraxC8PwPons73XZeozDfXmaRzzzaD2XI4fX0QX*8BUEsHCKeftc48AgAAmQQAAA==

HDFSの対応ディレクトリにコンテンツがあるかどうかを確認します.
HDFSストレージの分析後のログ内容は以下の通りである.
99000945863664;864698037273329|119.176.140.248|1495594615129|2017-05-24 10:56:55|xiaomi|redmi4x|com.jingdong.app.mall&0ae359b6&1495534579412&1;com.autonavi.minimap&279f562f&1495534597934,1495534616627&2;com.android.contacts&91586932&1495538267103,1495540527138,1495576834653,1495583404117,1495591231535&5

SparkStreamingタスクのステータスは次のとおりです.
確かに毎分1回実行されていることがわかります
Refer
http://blog.xiaoxiaomo.com/2016/05/22/Flume-%E9%9B%86%E7%BE%A4%E5%8F%8A%E9%A1%B9%E7%9B%AE%E5%AE%9E%E6%88%98/
http://lxw1234.com/archives/2015/11/552.htm