SparkStreamingを使用して語数統計を完了し、結果をMySQLに書き込み、ブラックリストフィルタリング


foreachRDD設計モードの使用dstream.foreachRDDは強力な原語であり、データを外部システムに送信することを許可する.しかし、この原語を正しく有効に使う方法を知ることが重要である.よくある間違いを避けるには、次のようにします.
通常、外部システムにデータを書き込むには、接続先(例えば、リモートサーバとのTCP接続)を作成し、リモートシステムにデータを送信する必要がある.そのため、開発者は、Spark driverで接続オブジェクトを作成することを意図的に試み、Sparkスタッフでそれを使用してRDDに記録を保存しようとすることがある.例えば(Scalaで):
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

接続オブジェクトをシーケンス化しdriverからworkerに送信する必要があるため、これは正しくありません.このような接続対象が機械にまたがる移動することは少ない.このエラーは、シーケンス化エラー(接続オブジェクトはシーケンス化できない)、初期化エラー(接続オブジェクトはworkerで初期化する必要がある)などと表示する可能性があります.正しい解決策はworkerで接続オブジェクトを作成することです.
しかし、これにより、他の一般的なエラーが発生する可能性があります.レコードごとに新しい接続を作成します.例:
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常、接続オブジェクトの作成には時間とリソースのオーバーヘッドがある.このため、各記録の接続オブジェクトの作成および破棄は、不要なオーバーヘッドを引き起こす可能性があり、システム全体のスループットを大幅に低減することができる.より良い解決策はrddを使用することです.foreachPartition-接続オブジェクトを作成し、この接続を使用してRDDパーティションですべてのレコードを送信.
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

これにより、複数のレコードに接続作成コストを割り当てることができる.
最後に、接続対象を複数のRDD/ロットにわたって再利用することによりさらに最適化することができる.複数ロットのRDDを外部システムに送る際に再利用するのではなく、接続対象の静的プールを維持することができ、コストをさらに削減することができる.
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

プール内の接続は、必要に応じて怠惰に作成する必要があります.しばらく使用しないとタイムアウトします.これにより、外部システムへのデータ送信を最も効率的に行うことができる.
その他のポイント:
DStreamsは、RDDがRDDによって怠惰に実行するように、出力動作によって遅延実行する.具体的には、DStreamの出力動作におけるRDD動作は、受信データを強制的に処理する.したがって、アプリケーションに出力操作がない場合、またはdstreamがある場合.foreachRDD()などの出力動作は、RDDの動作がない場合には実行する.システムは、データを簡単に受信廃棄する.
デフォルトでは、出力操作はone-at-a-timeで実行する.アプリケーションで定義する順序で実行します.
DataFrameとSQL操作は、ストリームデータ上でDataFrames and SQLとSQL操作を簡単に使用できます.StreamingContextが使用しているSparkContextを使用してSparkSessionを作成する必要があります.また、driver障害時に再起動できるようにする必要がある.これは、単純なインスタンス化されたSparkSessionの一例のインスタンスを作成することによって実現する.DataFramesとSQLを使用して、以前の文字数の例を修正して単語数を生成します.各RDDをDataFrameに変換してテンポラリ・テーブルに登録し、SQLを使用して問合せを行う.
/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

完全なソースコードを参照してください.
また、異なるスレッドからのストリームデータ(すなわち、非同期実行のStreamingContext)で定義されたテーブルに対してSQLクエリーを実行することもできます.クエリが実行できるように、StreamingContextを十分な数のストリームデータを記憶するように設定するだけです.それ以外の場合、非同期SQLクエリーを知らないStreamingContextは、クエリーが完了する前に古いストリームデータを削除します.たとえば、最後のロットを問い合わせるが、実行に5分かかる場合はstreamingContextを呼び出すことができます.remember(Minutes(5)).実戦事例:SparkStreamingを使用して語数統計を完了し、結果をMySQLに書き込む
val lines = ssc.socketTextStream("192.168.43.150", 9999)
    val result = lines.flatMap(_.split(" +")).map((_, 1)).reduceByKey(_ + _)
    //     MySQL
    result.foreachRDD(rdd=>{
    rdd.foreachPartition(partitionOfRecords=>{
      // ConnectionPool is a static, lazily initialized pool of connections
      val connection = createConnection()
      partitionOfRecords.foreach(record=>{
        val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
        connection.createStatement().execute(sql)
      })
      connection.close()
    })
    })

    ssc.start()
    ssc.awaitTermination()
  }

 
 /**
    *   MySQL   
    */
  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123")
  }

問題:1.既存のデータについては更新するが、すべてのデータはinsertであり、MySQLテーブルでは追加の形式で表現されており、累積改善の考え方はない:a)データを挿入する前に単語が存在するかどうかを判断し、存在すればupdate、存在しなければinsert b)仕事ではHBAse/Redisが一般的に使用され、彼らは持参したAPIを持っている.加算可能2.各rddのpartitionはconnectionを作成し、接続プールウィンドウ関数の使用に変更することをお勧めします:window:1つの時間帯のデータ処理を行う2つのキーパラメータ:window lengthウィンドウの長さ;sliding interval:ウィンドウの間隔この2つのパラメータは私たちのbatch sizeと関係があります:倍数使用シーン:一定時間ごとにある範囲のデータを計算します-->10秒ごとに(ウィンドウの間隔)最初の10分(ウィンドウの長さ)のWordcount//Reduce last 30 seconds of data,every 10 seconds val windowedWordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
      :
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
//     
object TransForm {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf,Seconds(2))

    /**
      *      
      */
    val blacks = List("zs","ls")
    val blacksRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacks).map((_,true))
    val lines = ssc.socketTextStream("hadoop101",9999)
    val clicklog = lines.map(rdd => (rdd.split(",")(1), rdd)).transform(rdd => {
      rdd.leftOuterJoin(blacksRDD)
        .filter(x => x._2._2.getOrElse(false) != true)
        .map(x => x._2._1)
    })
    clicklog.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

Spark Streaming Spark SQLを統合して語周波数統計操作を完了する:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

//Spark Streaming  Spark SQL        
object SqlNetworkWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val lines = ssc.socketTextStream("hadoop101", 9999)
    val words = lines.flatMap(_.split(" +"))
    // Convert RDDs of the words DStream to DataFrame and run SQL query
    // Dstream  rdd  foreachRDD   ,
    //   toDF()   DataFrame,
    //   createOrReplaceTempView DataFrame        
    //         , SQL          
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._
      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()
      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")
      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()

    }


    ssc.start()
    ssc.awaitTermination()
  }
}

 
/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
  @transient private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf) = {
    if (instance == null) {
      SparkSession
        .builder()
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }

}