Flink-カスタムSource

10555 ワード

カスタムSourceは、ローカルテスト、デバッグコードによく使用され、本番ではほとんど使用されません.
カスタムソースを使用する方法:
  • 作成env
  • カスタムSourceFunction、主に2つの方法を書き換えます.
  • run():run()メソッドでctx.collect()を呼び出す
  • にデータを返す.
  • cancel()

  • 呼び出しaddSourceメソッド:env.addSource(new CustomGenerator())
  • コードには次の点に注意してください.
    package org.ourhome.streamapi
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.CheckpointingMode
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    
    import scala.util.Random
    
    /**
     * @Author Do
     * @Date 2020/4/15 22:04
     */
    object CustomSource {
      def main(args: Array[String]): Unit = {
        val params: ParameterTool = ParameterTool.fromArgs(args)
        val runType:String = params.get("runtype")
        println("runType: " + runType)
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        // exactly-once  
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        //  5s
        env.enableCheckpointing(5000) // checkpoint every 5000 msecs
        //  StateBackend, 
        env.setStateBackend(new FsStateBackend("file:///D:/Temp/checkpoint/flink/KafkaSource"))
        //  SourceFunction
        val dataSource: DataStream[List[Double]] = env.addSource(new CustomGenerator())
        dataSource.print().setParallelism(1)
    
        env.execute("Custom Source")
      }
    
      //  Source, SourceFunction 
      class CustomGenerator extends SourceFunction[List[Double]] {
        private var running = true
        override def run(ctx: SourceFunction.SourceContext[List[Double]]): Unit = {
          //  
          var randomNum: Random = new Random()
    
          while (running) {
            val n = 1.to(5).map(i => {
              i + randomNum.nextGaussian()
            }).toList
            //  ctx 
            ctx.collect(n)
    
            Thread.sleep(500)
          }
    
        }
    
        override def cancel(): Unit = {
          running = false
        }
      }
    
    }