Structured Streaming動的ストリームjoin静的ストリームExample
29596 ワード
Structured Streaming動的ストリームjoin静的ストリームExample
Sparkコード
kafka送信データ
静的テーブルのデータ
Kafkaの結果データ:
Sparkコード
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.sql.streaming
import java.util
import java.util.UUID
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import scala.collection.mutable
object ContinuousStructuredKafkaWordCount {
def main(args: Array[String]): Unit = {
val checkpointLocation =
if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.master("local[2]")
.appName("ContinuousStructuredKafkaWordCount")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
// json udf
val getValueByFieldFromJsonString: ((String, String) => String) =
(body: String, field: String) => {
try {
val jsonObject = JSON.parseObject(body)
jsonObject.getString(field)
} catch {
case _: Exception => ""
}
}
spark.sqlContext.udf.register("getValueByFieldFromJsonString", getValueByFieldFromJsonString)
import spark.implicits._
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders")
.option("groupId", "groupID0234")
.load()
.selectExpr("CAST(offset AS STRING)", "CAST(value AS STRING)")//
.selectExpr("getValueByFieldFromJsonString(value,'proName')",
"getValueByFieldFromJsonString(value,'_id')")
.toDF("value", "id")
spark.sql("set spark.sql.crossJoin.enabled=true")
val staticTable = spark.sql("select * from source")
staticTable.show()
val out = lines.join(staticTable, lines("id") === staticTable("source_key"), "inner")
val output = out.select("value").writeStream
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "result")
.option("checkpointLocation", checkpointLocation)
// .trigger(Trigger.Continuous("1 second")) // only change in query
// .trigger( Trigger.ProcessingTime("10 seconds")) // only change in query
.start()
output.awaitTermination()
}
}
// scalastyle:on println
kafka送信データ
{"_id":1,"orderId":126,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148516556}
send successfully
{"_id":2,"orderId":127,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148517563}
send successfully
{"_id":3,"orderId":128,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148518570}
send successfully
{"_id":4,"orderId":129,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148519576}
send successfully
{"_id":0,"orderId":130,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148520582}
send successfully
{"_id":1,"orderId":131,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148521587}
send successfully
{"_id":2,"orderId":132,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148522591}
send successfully
{"_id":3,"orderId":133,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148523593}
send successfully
{"_id":4,"orderId":134,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148524594}
send successfully
{"_id":0,"orderId":135,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148525599}
send successfully
{"_id":1,"orderId":136,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148526603}
send successfully
{"_id":2,"orderId":137,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148527609}
send successfully
{"_id":3,"orderId":138,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148528613}
send successfully
{"_id":4,"orderId":139,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148529619}
send successfully
{"_id":0,"orderId":140,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148530624}
send successfully
{"_id":1,"orderId":141,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148531627}
send successfully
{"_id":2,"orderId":142,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148532633}
send successfully
{"_id":3,"orderId":143,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148533640}
send successfully
{"_id":4,"orderId":144,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148534643}
send successfully
{"_id":0,"orderId":145,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148535647}
send successfully
{"_id":1,"orderId":146,"proName":"prt_name_id_______1","amount":1,"orderTime":1527148536653}
send successfully
{"_id":2,"orderId":147,"proName":"prt_name_id_______2","amount":1,"orderTime":1527148537657}
send successfully
{"_id":3,"orderId":148,"proName":"prt_name_id_______3","amount":1,"orderTime":1527148538663}
send successfully
{"_id":4,"orderId":149,"proName":"prt_name_id_______4","amount":1,"orderTime":1527148539667}
send successfully
{"_id":0,"orderId":150,"proName":"prt_name_id_______0","amount":1,"orderTime":1527148540672}
静的テーブルのデータ
hive> select * from source;
OK
1 2 3
Time taken: 1.546 seconds, Fetched: 1 row(s)
hive> desc source;
OK
source_key string
source_value string
ds string
Kafkaの結果データ:
join [ id ]
prt_name_id_______2
prt_name_id_______3
prt_name_id_______4
prt_name_id_______0
prt_name_id_______1
prt_name_id_______2
prt_name_id_______3
prt_name_id_______4
prt_name_id_______0
prt_name_id_______1
prt_name_id_______2
prt_name_id_______3
prt_name_id_______4
prt_name_id_______0
join [ id id=1 ]
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1
prt_name_id_______1