flink入門のscala非同期IOアクセスredisおよびピット記録(2)を実現
私たちはflink入門のscalaで非同期IOアクセスredisとピット記録(1)を実現する中でjedisを採用して非同期読取redisをシミュレートして、それでは本編で、私たちはredisの高級クライアントlettuceを採用して、このクライアントはとても強くて、非同期操作をサポートして、もし具体的に理解したいならば
まず依存をインポートします.
次に、AsyncDataStreamのorderedWaitメソッド(kafkaDsはkafkaから読み込まれたデータストリーム)も使用します.
非同期読み出し関数のカスタマイズ:
簡単に見えますが、ここで注意しなければなりません.ResultFutureはResultFutureを初めて呼び出します.completeで実行が完了しました.その後のすべてのcomplete呼び出しは無視されます!!!
集合ループを繰り返してデータを返す場合は、このように書かないでください.
このように書くと、最初のデータを巡ってcompleteを実行した後、後のデータはcompleteで返されません.
ここまでFlink非同期次元テーブル読み取りredisに関する操作はこうですが、不適切な点があれば、コメントの訂正を歓迎します.
まず依存をインポートします.
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
次に、AsyncDataStreamのorderedWaitメソッド(kafkaDsはkafkaから読み込まれたデータストリーム)も使用します.
AsyncDataStream.orderedWait(kafkaDs, new AsyncRedisRead, 30L, TimeUnit.SECONDS, 1000)
非同期読み出し関数のカスタマイズ:
class AsyncRedisRead extends RichAsyncFunction[Obj1, Obj1] {
private var redisClient: RedisClient = _
private var connection: StatefulRedisConnection[String, String] = _
private var async: RedisAsyncCommands[String, String] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val redisUri = "redis://10.xx.xx.xxx"
redisClient = RedisClient.create(redisUri)
connection = redisClient.connect()
async = connection.async()
}
override def asyncInvoke(input: Obj1, resultFuture: ResultFuture[Obj1]): Unit = {
async.keys(s"flink*").thenAccept(new Consumer[java.util.List[String]] {
override def accept(t: List[String]): Unit = {
if (t.isEmpty) {
resultFuture.complete(Iterable[Obj1](input))
}
else {
var seq = Seq[Obj1]()
//
for (i 0 until t.size()) {
val member= t.get(i)
val infoArray = member.split("_")
val ruleId = infoArray(3)
val userId = infoArray(4)
val suspectId = infoArray(5)
val alarmStatus = infoArray(6)
seq = seq :+ Obj1(input.uuid, input.wifiList, ruleId, userId, suspectId)
}
resultFuture.complete(seq)
}
}
})
}
override def close(): Unit = {
super.close()
if (redisClient != null) redisClient.shutdown()
if (connection != null) connection.close()
}
}
簡単に見えますが、ここで注意しなければなりません.ResultFutureはResultFutureを初めて呼び出します.completeで実行が完了しました.その後のすべてのcomplete呼び出しは無視されます!!!
集合ループを繰り返してデータを返す場合は、このように書かないでください.
for (i 0 until t.size()) {
val member= t.get(i)
val infoArray = member.split("_")
val ruleId = infoArray(3)
val userId = infoArray(4)
val suspectId = infoArray(5)
val alarmStatus = infoArray(6)
resultFuture.complete(Iterable[Obj1](Obj1(input.uuid, input.wifiList, ruleId, userId, suspectId)))
}
このように書くと、最初のデータを巡ってcompleteを実行した後、後のデータはcompleteで返されません.
ここまでFlink非同期次元テーブル読み取りredisに関する操作はこうですが、不適切な点があれば、コメントの訂正を歓迎します.