Structured Streaming動的ストリームjoin静的ストリームExample


Structured Streaming動的ストリームjoin静的ストリームExample
Sparkコード

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.sql.streaming

import java.util
import java.util.UUID

import com.alibaba.fastjson.JSON
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import scala.collection.mutable


object ContinuousStructuredKafkaWordCount {
  def main(args: Array[String]): Unit = {

    val checkpointLocation =
      if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("ContinuousStructuredKafkaWordCount")
        .enableHiveSupport()
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    //       json udf
    val getValueByFieldFromJsonString: ((String, String) => String) =
      (body: String, field: String) => {
        try {
          val jsonObject = JSON.parseObject(body)
          jsonObject.getString(field)
        } catch {
          case _: Exception => ""
        }
      }

     spark.sqlContext.udf.register("getValueByFieldFromJsonString", getValueByFieldFromJsonString)

    import spark.implicits._

    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "orders")
      .option("groupId", "groupID0234")
      .load()
      .selectExpr("CAST(offset AS STRING)", "CAST(value AS STRING)")//           
      .selectExpr("getValueByFieldFromJsonString(value,'proName')",
        "getValueByFieldFromJsonString(value,'_id')")
      .toDF("value", "id")

    spark.sql("set spark.sql.crossJoin.enabled=true")
    val staticTable = spark.sql("select * from source")
    staticTable.show()
    val out = lines.join(staticTable, lines("id") === staticTable("source_key"), "inner")
    val output = out.select("value").writeStream
      .format("kafka")
      .outputMode("append")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "result")
      .option("checkpointLocation", checkpointLocation)
       // .trigger(Trigger.Continuous("1 second")) // only change in query
       // .trigger( Trigger.ProcessingTime("10 seconds")) // only change in query
      .start()

    output.awaitTermination()
  }
}

// scalastyle:on println


kafka送信データ


{"_id":1,"orderId":126,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148516556}
send successfully
{"_id":2,"orderId":127,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148517563}
send successfully
{"_id":3,"orderId":128,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148518570}
send successfully
{"_id":4,"orderId":129,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148519576}
send successfully
{"_id":0,"orderId":130,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148520582}
send successfully
{"_id":1,"orderId":131,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148521587}
send successfully
{"_id":2,"orderId":132,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148522591}
send successfully
{"_id":3,"orderId":133,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148523593}
send successfully
{"_id":4,"orderId":134,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148524594}
send successfully
{"_id":0,"orderId":135,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148525599}
send successfully
{"_id":1,"orderId":136,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148526603}
send successfully
{"_id":2,"orderId":137,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148527609}
send successfully
{"_id":3,"orderId":138,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148528613}
send successfully
{"_id":4,"orderId":139,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148529619}
send successfully
{"_id":0,"orderId":140,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148530624}
send successfully
{"_id":1,"orderId":141,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148531627}
send successfully
{"_id":2,"orderId":142,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148532633}
send successfully
{"_id":3,"orderId":143,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148533640}
send successfully
{"_id":4,"orderId":144,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148534643}
send successfully
{"_id":0,"orderId":145,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148535647}
send successfully
{"_id":1,"orderId":146,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148536653}
send successfully
{"_id":2,"orderId":147,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148537657}
send successfully
{"_id":3,"orderId":148,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148538663}
send successfully
{"_id":4,"orderId":149,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148539667}
send successfully
{"_id":0,"orderId":150,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148540672}

静的テーブルのデータ

hive> select * from source;
OK
1   2   3
Time taken: 1.546 seconds, Fetched: 1 row(s)
hive> desc source;
OK
source_key              string
source_value            string
ds                      string

Kafkaの結果データ:

join  [  id    ]
prt_name_id_______2
prt_name_id_______3
prt_name_id_______4
prt_name_id_______0
prt_name_id_______1
prt_name_id_______2
prt_name_id_______3
prt_name_id_______4
prt_name_id_______0
prt_name_id_______1
prt_name_id_______2
prt_name_id_______3
prt_name_id_______4
prt_name_id_______0



join  [  id  id=1     ]

prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1