Flinkノート


Flinkを使用する4つのステップ
  • 実行環境
  • を作成する.
  • Source
  • を追加
  • Transform
  • Sink

  • 実行環境の作成
  • Flinkは、実行環境を作成するAPIとは異なり、バッチenvを作成するコードは次のとおりです.
  • val env = ExecutionEnvironment.getExecutionEnvironment
    

    ストリーム処理envを作成するコードは次のとおりです.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    

    Sourceの追加
  • 集合ベースsource
  • val stream = env.fromCollection(List(
         TemperatureRecord("d1", 25.5, 100),
         TemperatureRecord("d2", 25.5, 100),
         TemperatureRecord("d3", 25.5, 100),
         TemperatureRecord("d4", 25.5, 100),
         TemperatureRecord("d5", 25.5, 100),
         TemperatureRecord("d6", 25.5, 100),
         TemperatureRecord("d7", 25.5, 100)
       ))
    
  • ファイルベースsource
  • val stream = env.readTextFile("""/source.txt""")
    
  • ネットワークソケットに基づくsource
  • val dataStream = env.socketTextStream("192.168.1.101", 7777)
    
  • カスタムsource kafka source
  • val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.1.101:9092")
    properties.setProperty("zookeeper.connect", "192.168.1.101:2181")
    properties.setProperty("group.id", "kafkaStreamTest")
    
    val kafka11 = new FlinkKafkaConsumer011[String]("kafkaStreamTest", new SimpleStringSchema(), properties)
    
    val stream = env.addSource(kafka11)
    

    カスタムテストソース
    class TestSourceFunction extends SourceFunction[String] {
      var running = true
    
      override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    
        var i = 0
        while (running) {
          i += 1
          sourceContext.collect(i + " " + "v_" + Random.nextInt(5))
          Thread.sleep(1000)
        }
      }
    
      override def cancel(): Unit = {
        running = false
      }
    
    }
    

    Transform
    Flinkのtranformはsparkのtransformよりも分類でき、変換を行うAPIでもありますが、詳細は異なる場合があります.
    Transformation
    へんかん
    説明
    map
    DataStream → DataStream
    エレメントを取得してエレメントを生成
    flatMap
    DataStream → DataStream
    1つの要素を取り、0以上の要素を生成します.
    filter
    DataStream → DataStream
    データ・ストリームのフィルタリング
    keyBy
    DataStream → KeyedStream
    類似データベースのgroupby
    reduce
    KeyedStream → DataStream
    つの汎用Tがあって、2つのTタイプのパラメータを受け取って、1つのTタイプのパラメータを返して、戻り値は次回実行する第1のパラメータとして、第2のパラメータはデータストリームの中のデータです
    fold
    KeyedStream → DataStream
    reduceのようなものは廃棄されていますが、2つの汎用型を受信すると、出入り流の汎用型が異なる場合があります.
    Aggregations
    KeyedStream → DataStream
    KeyedStreamを集約し、sum、min、max、minBy、maxByを含む
    union
    DataStream*→ DataStream
    同じタイプの複数のデータ・ストリームの接続
    connect
    DataStream,DataStream → ConnectedStreams
    2つのデータ・ストリームを接続します.この2つのデータ・ストリームのタイプは異なります.
    split
    DataStream → SplitStream
    いくつかの基準に基づいてストリームを2つ以上のストリームに分割
    select
    SplitStream → DataStream
    分割ストリームから1つ以上のストリームを選択
    window
    KeyedStream → WindowedStream
    すでにパーティション化されているKeyedStreamsでWindowを定義する
    timeWindow
    KeyedStream → WindowedStream
    すでにパーティション化されているKeyedStreamsでの時間Windowの定義
    countWindow
    KeyedStream → WindowedStream
    すでにパーティション化されているKeyedStreamsでカウントウィンドウを定義する
    windowAll
    DataStream → WindowedStream
    一般的なDataStreamsでのWindowの定義
    Window api待更
    以上の操作では、MapFunction、RichMapFunctionなどのカスタム関数クラスの転送がサポートされています.
    Sink
    FlinkのSinkはSparkのactionに似ていて、主にデータの出力によく使われるSinkに使われています.
  • kafka Sinkまず依存
  • を増加させる
    <dependency>
       <groupId>org.apache.flinkgroupId>
       <artifactId>flink-connector-kafka-0.10_2.11artifactId>
       <version>1.3.1version>
    dependency>
    

    しゅプログラム
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //TestObjectSourceFunction       Source
    val stream = env.addSource(new TestObjectSourceFunction)
    val producer = new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema())
    stream.map(_.id)
      .addSink(producer)
    env.execute("kafkaSinkTest")
    
  • es Sink
  • まず依存
  • を増加する.
     <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-elasticsearch6_2.11artifactId>
        <version>1.7.2version>
    dependency>
    

    しゅプログラム
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //TestObjectSourceFunction      Source
    val stream = env.addSource(new TestObjectSourceFunction)
    
    val httpHosts = new util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("localhost", 9200))
    
    val esSink = new ElasticsearchSink.Builder[ApplyInfo](httpHosts, new ElasticsearchSinkFunction[ApplyInfo] {
      override def process(item: ApplyInfo, ctx: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
    	val esSource = new util.HashMap[String, String]()
    	esSource.put("id", item.id)
    	esSource.put("areaCode", item.areaCode)
    	val req = Requests.indexRequest("applyInfo").`type`("applyInfo").source(esSource)
    	requestIndexer.add(req)
      }
    }).build()
    
    stream.addSink(esSink)
    env.execute("esSinkTest")
    
  • カスタムSinkカスタムSink Functionを実現すればよいが、一般的にRichSinkFunctionを実現すればより豊富な機能を提供する
  • val env = StreamExecutionEnvironment.getExecutionEnvironment
    //TestObjectSourceFunction      Source
    val stream = env.addSource(new TestObjectSourceFunction)
    stream.addSink(new RichSinkFunction[ApplyInfo] {
      var out: OutputStream = _
    
      override def open(parameters: Configuration): Unit = {
    	out = new FileOutputStream("/tmp/customSink.txt")
      }
    
      override def invoke(value: ApplyInfo, context: SinkFunction.Context[_]): Unit = {
    	out.write((value.id + "," + value.areaCode + "\r
    "
    ).getBytes(StandardCharsets.UTF_8)) } override def close(): Unit = { out.close() } }) env.execute("esSinkTest")