Spark Streaming実戦事例(一)

18176 ワード

本節の主な内容
このセクションの内容は、公式ドキュメントから来ています.http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations
  • Sparkフロー計算概要
  • Spark Streaming関連コアクラス
  • 入門事例
  • 1.Sparkフロー計算の概要
    HadoopのMapReduceやSpark SQLなどはオフライン計算しかできず、リアルタイム性の要求が高いビジネスニーズを満たすことができません.例えば、リアルタイム推奨、リアルタイムサイト性能分析など、ストリーミング計算はこれらの問題を解決することができます.現在、Storm、Spark Streaming、Samzaの3つの比較的一般的なフロー計算フレームワークがあります.各フレームワークの比較と使用状況については、以下を参照してください.http://www.csdn.net/article/2015-03-09/2824135.このセクションでは、Spark Streamingについて重点的に説明します.Spark StreamingはSparkの5つのコアコンポーネントの1つとして、多くのデータソースへのアクセスを原生的にサポートし、Spark MLLib、Graphxと組み合わせて使用でき、分散環境でのオンラインマシン学習アルゴリズムの設計を簡単に完了します.Sparkがサポートする入力データソースおよび出力ファイルを下図に示します.
    Spark Streaming 实战案例(一)_第1张图片
    次のケースの実戦では、この部分に触れます.中間の「Spark Streaming」は、入力したデータソースを処理し、結果を出力します.その内部動作原理は次の図のようになります.
    Spark Streaming 实战案例(一)_第2张图片 Spark Streamingリアルタイムで入力されたデータストリームを受け取り、バッチで分割した後、Sparkエンジンに渡して処理し、処理が完了したら結果を外部ファイルに出力します.
    まず、次のSpark Streamingベースのword countコードを見てみましょう.ストリーム計算の初歩的な理解を助けることができます.
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamingWordCount {
      def main(args: Array[String]) {
        if (args.length < 1) {
          System.err.println("Usage: StreamingWordCount ")
          System.exit(1)
        }
    
        //  SparkConf  
        val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
        // Create the context
        //  StreamingContext  ,       
        val ssc = new StreamingContext(sparkConf, Seconds(20))
    
        // Create the FileInputDStream on the directory and use the
        // stream to count words in new files created
        //            ,   
        val lines = ssc.textFileStream(args(0))
        //     
        val words = lines.flatMap(_.split(" "))
        //        
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        //    
        wordCounts.print()
        //  Spark Streaming
        ssc.start()
        //    ,         
        ssc.awaitTermination()
      }
    }

    上記のプログラムを実行した後、コマンドラインインタフェースを介して、対応するファイルディレクトリにファイルをコピーします.具体的には、Spark Streaming 实战案例(一)_第3张图片
    プログラムが実行されると、ファイル作成時間に基づいてファイルが処理され、前回の実行時間以降に作成されたファイルが処理され、出力結果は以下の通りである:Spark Streaming 实战案例(一)_第4张图片
    2.Spark Streaming関連コアクラス
    1. DStream(discretized stream)
    Spark Streamingは、前述のKafka、Flumeなどのデータソースから作成できるDStreamであるデータストリームの抽象化を提供し、DStreamは本質的に一連のRDDから構成されている.各RDDのデータは、対応する時間間隔(interval)に流入するデータであり、下図のようにSpark Streaming 实战案例(一)_第5张图片
    DStreamのすべての操作は、最終的にはRDDの操作に変換されます.たとえば、前のStreamingWordCountプログラムでは、flatMap操作はDStreamのすべてのRDDに作用します.次の図に示します.
    Spark Streaming 实战案例(一)_第6张图片
    2.StreamingContext Spark Streamingでは、StreamingContextはプログラム全体のエントリであり、その作成方法は様々であり、最も一般的なのはSparkConfによって作成される.
    import org.apache.spark._
    import org.apache.spark.streaming._
    
    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val ssc = new StreamingContext(conf, Seconds(1))

    StreamingContextオブジェクトの作成時にSparkConfに基づいてSparkContextが作成されます
      /**
       * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
       * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
       * @param batchDuration the time interval at which streaming data will be divided into batches
       */
      def this(conf: SparkConf, batchDuration: Duration) = {
        this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
      }

    つまりStreamingContextはSparkContextのパッケージであり、StreamingContextには他にもいくつかの構造方法があり、興味のあることは、後期にソース解析時に詳細に説明し、StreamingContextを作成する際にbatchDurationを指定し、バッチ間隔を設定するために使用され、アプリケーションやクラスタリソースの状況に応じて設定する必要があることを理解することができます.StreamingContextの作成が完了したら、次の手順に従います.
  • 入力ソースによるInputDStreaim
  • の作成
  • DStreamingに対してtransformationとoutput操作を行い、この操作は後期フロー計算の論理
  • を構成する.
  • StreamingContextを通過する.start()メソッドは、データの受信および処理のプロセス
  • を開始する.
  • streamingContextを使用する.awaitTermination()メソッドは、プログラム処理の終了(手動停止またはエラー停止)
  • を待つ.
  • streamingContextを呼び出すこともできる.stop()メソッド終了プログラムの実行
  • StreamingContextについていくつか注意すべき点があります.
    1.StreamingContextが起動すると、新しい操作を追加しても機能しません.つまり、StreamingContextが起動する前に、すべての計算ロジック2を定義する.StreamingContextが停止すると、再起動できません.つまり、再計算するには、プログラム全体を再実行する必要があります.3.単一JVMにおいて、一定期間に2つのactive状態のStreamingContext 4が出現しない.StreamingContextのstopメソッドを呼び出すとSparkContextもstopによって削除する、StreamingContextを閉じる場合にSparkContextを保持するにはstopメソッドにパラメータstopSparkContext=false/***Stop the execution of the streams immediately(does not wait for all received data*to be processed)を入力する必要がある.By default, if stopSparkContext is not specified, the underlying * SparkContext will also be stopped. This implicit behavior can be configured using the * SparkConf configuration spark.streaming.stopSparkContextByDefault. * * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext * will be stopped regardless of whether this StreamingContext has been * started. */ def stop( stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true) ): Unit = synchronized { stop(stopSparkContext, false) } 5.SparkContextオブジェクトは複数のStreamingContextsで繰り返し使用できますが、前のStreamingContextsが停止してから次のStreamingContextオブジェクトを作成する必要があります.
    3.InputDStreamsおよびReceivers InputDStreamとは、データストリームのソースから受け取る入力データストリームを指し、先のStreamingWordCountプログラムではval lines=ssc.textFileStream(args(0))はInputDStreamの一種である.ファイル・ストリームに加えて、各input DStreamは、データ・ソースから送信されたデータを受信し、後でSpark処理のためにメモリに保存するReceiverオブジェクトに関連付けられています.
    Spark Streaimgは、2つのオリジナルサポートストリームデータソースを提供します.
  • Basic sources(ベースストリームデータソース).ファイルシステム(ローカルファイルシステムおよび分散ファイルシステム)、Socket接続、AkkaのActorなどのStreamingContext APIによって直接作成されます.ファイルストリーム(File Streams)の作成方法:a.streamingContext.fileStreamKeyClass,Value Class,InputFormatClass b.streamingContext.textFileStream(dataDirectory)リアルタイムでtextFileStreamメソッドが最終的に呼び出されるのもfileStreamメソッドdef textFileStream(directory:String):DStream[String]=withNamedScope("text file stream"){fileStreamLongWritable,Text,TextInputFormat.map(._2.Strtoing)}Akka Actorストリームデータに基づく作成方法:streamingContext.actorStream(actorProps,actor-name)Socketストリームデータに基づく作成方法:ssc.socketTextStream(hostname:String,port:Int,storageLevel:StorageLevel=StorageLevel.MEMORY_AND_DISK_SER_2)RDDキューに基づくストリームデータ作成方法:streamingContext.queueStream(queueOfRDDs)
  • Advanced sources(アドバンスドストリームデータソース).Kafka,Flume,Kinesis,Twitterなど、外部ツールクラスを利用する必要があり、実行時に外部依存(次節で説明)
  • が必要である
    Spark Streamingはユーザー3もサポートする.Custom Source(カスタムストリームデータソース)では、次のセクションで説明するreceiverを定義する必要があります.
    最後に注意すべき点は2つあります.
  • Spark Streamingをローカルで実行する場合、master URLでは「local」または「local[1]」は使用できません.input DStreamがreceiver(sockets,Kafka,Flumeなど)に関連付けられている場合、receiver自体がスレッドを実行する必要があり、受信したデータを処理するスレッドはありません.したがって、SparkStreamingプログラムをローカルで実行する場合、master URLとして「local[n]」を使用すると、nはreceiverの数よりも大きくなります.
  • クラスタ上でSpark Streamingを実行する場合、Spark Streamingプログラムに割り当てられたCPUコア数もreceiverの数より大きくなければなりません.そうしないと、システムはデータのみを受け入れ、データを処理できません.

  • 3.導入事例
    実行結果の後期表示を容易にするため、変更ログレベルはLevel.WARN
    import org.apache.spark.Logging
    
    import org.apache.log4j.{Level, Logger}
    
    /** Utility functions for Spark Streaming examples. */
    object StreamingExamples extends Logging {
    
      /** Set reasonable logging levels for streaming if the user has not configured log4j. */
      def setStreamingLogLevels() {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
          // We first log something to initialize Spark's default logging, then we override the
          // logging level.
          logInfo("Setting log level to [WARN] for streaming example." +
            " To override add a custom log4j.properties to the classpath.")
          Logger.getRootLogger.setLevel(Level.WARN)
        }
      }
    }
  • NetworkWordCount Socketストリームデータ
  • に基づく
    object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount  ")
          System.exit(1)
        }
        //       Level.WARN
        StreamingExamples.setStreamingLogLevels()
    
        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[4]")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
    
        // Create a socket stream on target ip:port and count the
        // words in input stream of 
    delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. // SocketInputDStream, ip:port val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }

    ランタイムパラメータSpark Streaming 实战案例(一)_第7张图片の構成
    使用
    //  netcat server
    root@sparkmaster:~/streaming# nc -lk 9999
    

    NetworkWordCountプログラムを実行し、netcat serverが実行するコンソールに任意の文字列を入力します.
    root@sparkmaster:~/streaming# nc -lk 9999
    Hello WORLD
    HELLO WORLD WORLD
    TEWST
    NIMA
    

    Spark Streaming 实战案例(一)_第8张图片
  • QueueStream RDDキューに基づくストリームデータ
  • import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    object QueueStream {
    
      def main(args: Array[String]) {
    
        StreamingExamples.setStreamingLogLevels()
        val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")
        // Create the context
        val ssc = new StreamingContext(sparkConf, Seconds(1))
    
        // Create the queue through which RDDs can be pushed to
        // a QueueInputDStream
        //  RDD  
        val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
    
        // Create the QueueInputDStream and use it do some processing
        //   QueueInputDStream 
        val inputStream = ssc.queueStream(rddQueue)
    
        //      RDD  
        val mappedStream = inputStream.map(x => (x % 10, 1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
    
        //    
        reducedStream.print()
    
        //    
        ssc.start()
    
        // Create and push some RDDs into
        for (i 1 to 30) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 3000, 10)
          Thread.sleep(1000)
    
        //      StreamingContext   
        ssc.stop()
      }
    }

    Spark Streaming 实战案例(一)_第9张图片