flink入門のscala非同期IOアクセスredisおよびピット記録(2)を実現


私たちはflink入門のscalaで非同期IOアクセスredisとピット記録(1)を実現する中でjedisを採用して非同期読取redisをシミュレートして、それでは本編で、私たちはredisの高級クライアントlettuceを採用して、このクライアントはとても強くて、非同期操作をサポートして、もし具体的に理解したいならば
まず依存をインポートします.
		<dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.5.RELEASE</version>
		</dependency>

次に、AsyncDataStreamのorderedWaitメソッド(kafkaDsはkafkaから読み込まれたデータストリーム)も使用します.
AsyncDataStream.orderedWait(kafkaDs, new AsyncRedisRead, 30L, TimeUnit.SECONDS, 1000)

非同期読み出し関数のカスタマイズ:
class AsyncRedisRead extends RichAsyncFunction[Obj1, Obj1] {
     

    private var redisClient: RedisClient = _
    private var connection: StatefulRedisConnection[String, String] = _
    private var async: RedisAsyncCommands[String, String] = _

    override def open(parameters: Configuration): Unit = {
     
      super.open(parameters)
      val redisUri = "redis://10.xx.xx.xxx"
      redisClient = RedisClient.create(redisUri)
      connection = redisClient.connect()
      async = connection.async()
    }

    override def asyncInvoke(input: Obj1, resultFuture: ResultFuture[Obj1]): Unit = {
     
      async.keys(s"flink*").thenAccept(new Consumer[java.util.List[String]] {
     
        override def accept(t: List[String]): Unit = {
     
          if (t.isEmpty) {
     
            resultFuture.complete(Iterable[Obj1](input))
          }
          else {
     
            var seq = Seq[Obj1]()
            //    
            for (i  0 until t.size()) {
     
                val member= t.get(i)
                val infoArray = member.split("_")
                val ruleId = infoArray(3)
                val userId = infoArray(4)
                val suspectId = infoArray(5)
                val alarmStatus = infoArray(6)
                seq = seq :+ Obj1(input.uuid, input.wifiList, ruleId, userId, suspectId)               
            }
            resultFuture.complete(seq)
          }
        }
      })
    }

    override def close(): Unit = {
     
      super.close()
      if (redisClient != null) redisClient.shutdown()
      if (connection != null) connection.close()
    }
  }

簡単に見えますが、ここで注意しなければなりません.ResultFutureはResultFutureを初めて呼び出します.completeで実行が完了しました.その後のすべてのcomplete呼び出しは無視されます!!!
集合ループを繰り返してデータを返す場合は、このように書かないでください.
		for (i  0 until t.size()) {
     
                val member= t.get(i)
                val infoArray = member.split("_")
                val ruleId = infoArray(3)
                val userId = infoArray(4)
                val suspectId = infoArray(5)
                val alarmStatus = infoArray(6)
                resultFuture.complete(Iterable[Obj1](Obj1(input.uuid, input.wifiList, ruleId, userId, suspectId)))
            }

このように書くと、最初のデータを巡ってcompleteを実行した後、後のデータはcompleteで返されません.
ここまでFlink非同期次元テーブル読み取りredisに関する操作はこうですが、不適切な点があれば、コメントの訂正を歓迎します.