Flink側出力ストリームスプリットアプリケーション
4712 ワード
ビジネスシーン:Flink同期Kafkaデータを使用してMySQLにリアルタイムで書き込むには、ログインデータをログインに分割し、日活し、新たに3つのMySQLテーブルに追加する必要があります.サイド出力ストリームを用いてストリームを複数のストリームに分割し,それぞれ処理する.
プライマリクラス
Kafkaデータはjson形式であり,サンプルクラスを定義する.FlinkはKafkaを読み込む前に記録しています.
他にもイベントがあるのでkeyBy以降のprocessはKeydProcessFunctionとなります.
WindowFunction
sinkはMySQL Sinkをカスタマイズし、前に書いたことがありますが、論理は基本的に変わりません.
プライマリクラス
/**
* Flink Kafka, , MySQL
*
* create by LiuJinHe 2020/5/26
*/
object CpDataKafkaToMySQL {
private val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1
// env.setParallelism(1)
// , , 3 1 , 10
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3))
// ,
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// checkpoint
env.enableCheckpointing(60000)
val config = env.getCheckpointConfig
// exactly_one
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// checkpoint
config.setMinPauseBetweenCheckpoints(1000)
// checkpoint 60 ,
config.setCheckpointTimeout(60000)
// checkpoint
config.setMaxConcurrentCheckpoints(1)
val kafkaProp = ConfigManager.load(Constants.CP_KAFKA_PROP)
val kafkaConsumer = KafkaUtils.getConsumer(kafkaProp).setStartFromGroupOffsets()
val dataKafkaStream = env.addSource(kafkaConsumer).name("cp kafka source")
val dataStream = dataKafkaStream.map(
json => {
val sdkData =
try
JSON.parseObject(json, classOf[CpData])
catch {
case ex: Throwable =>
val str =
s"""
|${DateUtils.timestampToTime(System.currentTimeMillis())} Exception: ${ex.getMessage}
|============ illegal json:
|$json
""".stripMargin
logger.info(str)
CpData("none", 0, 0, "")
}
sdkData
})
.filter(line =>
line.key != "none" && line.cp_game_id != 0 &&
(line.key == "role_create" || line.key == "cp_role_login" || line.key == "role_pay")
)
//
val roleActiveSide = new OutputTag[CpData]("role_active")
val roleNewUserSide = new OutputTag[CpData]("role_new_user")
val mainDataStream = dataStream
.keyBy(data => data.key)
.process(new CpProcess(roleActiveSide, roleNewUserSide))
// ,
val roleActiveSideStream = mainDataStream.getSideOutput(roleActiveSide)
roleActiveSideStream
.timeWindowAll(Time.seconds(1))
.apply(new CpWindow)
.addSink(new CpMySQLSink).name("active side stream mysql sink")
// ,
val roleNewUserSideStream = mainDataStream.getSideOutput(roleNewUserSide)
roleNewUserSideStream
.timeWindowAll(Time.seconds(1))
.apply(new CpWindow)
.addSink(new CpMySQLSink).name("new user side stream mysql sink")
// ,
mainDataStream
.timeWindowAll(Time.seconds(1))
.apply(new CpWindow)
.addSink(new CpMySQLSink).name("cp main stream mysql sink")
env.execute("cp K to M stream job")
}
}
Kafkaデータはjson形式であり,サンプルクラスを定義する.FlinkはKafkaを読み込む前に記録しています.
/**
*
*/
case class CpData(key: String, cp_game_id: Int, time: Long, data: String)
他にもイベントがあるのでkeyBy以降のprocessはKeydProcessFunctionとなります.
/**
* processFunction
*
* create by LiuJinHe 2020/5/26
*/
class CpProcess(roleActiveSide:OutputTag[CpData], roleNewUserSide:OutputTag[CpData]) extends KeyedProcessFunction[String, CpData, CpData]{
override def processElement(value: CpData, ctx: KeyedProcessFunction[String, CpData, CpData]#Context, out: Collector[CpData]): Unit = {
//
out.collect(value)
val key = ctx.getCurrentKey
if (key == "cp_role_login") {
//
ctx.output(roleActiveSide, value.copy("role_active",value.cp_game_id,value.time,value.data))
ctx.output(roleNewUserSide, value.copy("role_new_user",value.cp_game_id,value.time,value.data))
}
}
}
WindowFunction
/**
* cp window function
*
* create by LiuJinHe 2020/5/26
*/
class CpWindow extends AllWindowFunction[CpData, Iterable[CpData], TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[CpData], out: Collector[Iterable[CpData]]): Unit = {
if (input.nonEmpty) {
println("1 : " + input.size)
out.collect(input)
}
}
}
sinkはMySQL Sinkをカスタマイズし、前に書いたことがありますが、論理は基本的に変わりません.