flink入門のscala非同期IOアクセスredisおよびピット記録(1)を実現
15983 ワード
問題:sparkstreamingを使用する過程で、中間変数をredisに書き、ストリームプログラムでredisの中間変数を読む必要があることがよくあります.sparkstreamingでは、foreach演算子を1つ使うだけでこのようなニーズを実現することができます.flinkにはforeach演算子がありません.プロセスでredisのデータを読むにはどうすればいいですか.
方法:flinkの非同期IOアクセスredisでこの操作を完了できます.
注意:flinkの非同期IOでredisにアクセスするには、データベース(またはkey/valueストレージ)に対して適切な非同期I/Oを実現するには、クライアントが非同期要求をサポートするデータベースにアクセスする必要があるという前提があります.このようなクライアントがない場合、複数のクライアントを作成し、スレッドプールを使用して同期呼び出しを処理することによって、同期クライアントを限られた同時クライアントに変換しようとすることができるが、この方法は通常、適切な非同期クライアントよりも効率が低い.私の次の実装はjedisで実装されます.jedisクライアントインスタンスはスレッドセキュリティではなく、非同期操作をサポートしていないため、接続プールを使用して「非同期」をシミュレートします.
ピットを踏むヒント:私はここで直接redisを使います.Clientsによるredisの読み取りでは、vertx-redis-clientで実現されているものがたくさん見られます.以下のブログで説明します.https://blog.csdn.net/rlnLo2pNEfx9c/article/details/103692000でも、ここは気をつけて
vertxは現在scalaで2.12バージョンしかサポートされていません!!!
私が使っているのはscala 2です.11、だからこのバッグではなかなか通れない、ネット上のケースの多くはjavaで実現されているので、その時は自分がjavaからscalaを回る中で出てきた問題だと思っていたので、このピットは注意しなければなりません
インプリメンテーション
その結果、redisにはこのようなhashデータが格納されました.
1,jack,beijing
データを入力します.
1,jack,18
コンソールの出力が表示されます.
1,jack,18,beijing
方法:flinkの非同期IOアクセスredisでこの操作を完了できます.
注意:flinkの非同期IOでredisにアクセスするには、データベース(またはkey/valueストレージ)に対して適切な非同期I/Oを実現するには、クライアントが非同期要求をサポートするデータベースにアクセスする必要があるという前提があります.このようなクライアントがない場合、複数のクライアントを作成し、スレッドプールを使用して同期呼び出しを処理することによって、同期クライアントを限られた同時クライアントに変換しようとすることができるが、この方法は通常、適切な非同期クライアントよりも効率が低い.私の次の実装はjedisで実装されます.jedisクライアントインスタンスはスレッドセキュリティではなく、非同期操作をサポートしていないため、接続プールを使用して「非同期」をシミュレートします.
ピットを踏むヒント:私はここで直接redisを使います.Clientsによるredisの読み取りでは、vertx-redis-clientで実現されているものがたくさん見られます.以下のブログで説明します.https://blog.csdn.net/rlnLo2pNEfx9c/article/details/103692000でも、ここは気をつけて
vertxは現在scalaで2.12バージョンしかサポートされていません!!!
私が使っているのはscala 2です.11、だからこのバッグではなかなか通れない、ネット上のケースの多くはjavaで実現されているので、その時は自分がjavaからscalaを回る中で出てきた問題だと思っていたので、このピットは注意しなければなりません
インプリメンテーション
import java.util.concurrent.TimeUnit
import net.clickwifi.writer.RedisWriter
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.concurrent.Executors
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import redis.clients.jedis.Jedis
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object WriteToHbase {
def main(args: Array[String]): Unit = {
val parameter = ParameterTool.fromArgs(args)
val host = parameter.get("host")
val port = parameter.getInt("port")
println(host)
//
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.setParallelism(1)
// source
val socketDs = env.socketTextStream(host, port)
val resultDS = socketDs.map(str => {
val arrayStr = str.split(",")
val id = arrayStr(0)
val name = arrayStr(1)
val age = arrayStr(2)
(id, name, age)
})
//
val EndDS = AsyncDataStream.orderedWait(resultDS, new AsyncDatabaseRequest, 100000L, TimeUnit.MILLISECONDS)
EndDS.print()
env.execute("flink")
}
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction[(String, String, String), (String, String, String, String)] {
/** The database specific client that can issue concurrent requests with callbacks */
println("jedis ")
@transient
private var jedis: Jedis = _
private lazy val pool = new JedisPool(new JedisPoolConfig,"hadoop102",6379)
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(input: (String, String, String), resultFuture: ResultFuture[(String, String, String, String)]): Unit = {
var result = ""
val resultFutureRequested: Future[String] = Future {
jedis = pool.getResource
result = jedis.hget(input._1,input._2)
result
}
resultFutureRequested.onComplete {
case Success(result) =>
resultFuture.complete(Iterable((input._1,input._2,input._3,result)))
jedis.close()
case Failure(error) => case e: Exception => println(s" , : ${e.getMessage}")
}
println("invoke")
}
override def close(): Unit = {
if(jedis != null) jedis.close()
}
}
}
その結果、redisにはこのようなhashデータが格納されました.
1,jack,beijing
データを入力します.
1,jack,18
コンソールの出力が表示されます.
1,jack,18,beijing