Flink側出力ストリームスプリットアプリケーション

4712 ワード

ビジネスシーン:Flink同期Kafkaデータを使用してMySQLにリアルタイムで書き込むには、ログインデータをログインに分割し、日活し、新たに3つのMySQLテーブルに追加する必要があります.サイド出力ストリームを用いてストリームを複数のストリームに分割し,それぞれ処理する.
プライマリクラス
/**
  * Flink    Kafka,        ,     MySQL
  *
  * create by LiuJinHe 2020/5/26
  */
object CpDataKafkaToMySQL {
  private val logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //        1
    //    env.setParallelism(1)

    //     ,    ,  3   1 ,     10 
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3))

    //        ,       
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    //   checkpoint  
    env.enableCheckpointing(60000)
    val config = env.getCheckpointConfig
    //      exactly_one
    config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // checkpoint       
    config.setMinPauseBetweenCheckpoints(1000)
    // checkpoint     60 ,      
    config.setCheckpointTimeout(60000)
    //             checkpoint
    config.setMaxConcurrentCheckpoints(1)

    val kafkaProp = ConfigManager.load(Constants.CP_KAFKA_PROP)
    val kafkaConsumer = KafkaUtils.getConsumer(kafkaProp).setStartFromGroupOffsets()

    val dataKafkaStream = env.addSource(kafkaConsumer).name("cp kafka source")

    val dataStream = dataKafkaStream.map(
      json => {
        val sdkData =
          try
            JSON.parseObject(json, classOf[CpData])
          catch {
            case ex: Throwable =>
              val str =
                s"""
                   |${DateUtils.timestampToTime(System.currentTimeMillis())} Exception: ${ex.getMessage}
                   |============ illegal json:
                   |$json
                """.stripMargin
              logger.info(str)
              CpData("none", 0, 0, "")
          }
        sdkData
      })
      .filter(line =>
        line.key != "none" && line.cp_game_id != 0 &&
          (line.key == "role_create" || line.key == "cp_role_login" || line.key == "role_pay")
      )

    //       
    val roleActiveSide = new OutputTag[CpData]("role_active")
    val roleNewUserSide = new OutputTag[CpData]("role_new_user")

    val mainDataStream = dataStream
      .keyBy(data => data.key)
      .process(new CpProcess(roleActiveSide, roleNewUserSide))

    //     ,  
    val roleActiveSideStream = mainDataStream.getSideOutput(roleActiveSide)
    roleActiveSideStream
      .timeWindowAll(Time.seconds(1))
      .apply(new CpWindow)
      .addSink(new CpMySQLSink).name("active side stream mysql sink")

    //     ,  
    val roleNewUserSideStream = mainDataStream.getSideOutput(roleNewUserSide)
    roleNewUserSideStream
      .timeWindowAll(Time.seconds(1))
      .apply(new CpWindow)
      .addSink(new CpMySQLSink).name("new user side stream mysql sink")

    //     ,  
    mainDataStream
      .timeWindowAll(Time.seconds(1))
      .apply(new CpWindow)
      .addSink(new CpMySQLSink).name("cp main stream mysql sink")

    env.execute("cp K to M stream job")
  }
}

Kafkaデータはjson形式であり,サンプルクラスを定義する.FlinkはKafkaを読み込む前に記録しています.
/**
  *    
  */
case class CpData(key: String, cp_game_id: Int, time: Long, data: String)

他にもイベントがあるのでkeyBy以降のprocessはKeydProcessFunctionとなります.
/**
  * processFunction           
  *
  * create by LiuJinHe 2020/5/26
  */
class CpProcess(roleActiveSide:OutputTag[CpData], roleNewUserSide:OutputTag[CpData]) extends KeyedProcessFunction[String, CpData, CpData]{
  override def processElement(value: CpData, ctx: KeyedProcessFunction[String, CpData, CpData]#Context, out: Collector[CpData]): Unit = {
    //          
    out.collect(value)

    val key = ctx.getCurrentKey

    if (key == "cp_role_login") {
      //          
      ctx.output(roleActiveSide, value.copy("role_active",value.cp_game_id,value.time,value.data))
      ctx.output(roleNewUserSide, value.copy("role_new_user",value.cp_game_id,value.time,value.data))
    }
  }
}

 
WindowFunction
/**
  *     cp window function
  *
  * create by LiuJinHe 2020/5/26
  */
class CpWindow extends AllWindowFunction[CpData, Iterable[CpData], TimeWindow] {
  override def apply(window: TimeWindow, input: Iterable[CpData], out: Collector[Iterable[CpData]]): Unit = {
    if (input.nonEmpty) {
      println("1       : " + input.size)
      out.collect(input)
    }
  }
}

 
sinkはMySQL Sinkをカスタマイズし、前に書いたことがありますが、論理は基本的に変わりません.