Flinkのredis sink
1502 ワード
1.依存のインポート
2.カスタムRedisMapperクラス
例:Datream内のすべてのUserサンプルクラスオブジェクトのIDをkeyにOnline_と書くNumberのsetで
3.redis構成オブジェクトの作成
redis sinkは、FlinkJedisPoolConfig、FlinkJedisClusterConfig、FlinkJedisSentinelConfigの3種類のconfigクラスを提供します.
FlinkJedisPoolConfig:単一ノードのredisインスタンス.
FlinkJedisPoolConfig:クラスタモードのredisインスタンス.
FlinkJedisSentinelConfig:哨兵モード付きredisの例.
例としてFlinkJedisPoolConfig:
4.RedisSinkオブジェクトの作成
手順3で作成したconfigと手順2で定義したRedisMapperオブジェクトのパラメータを持つRedisSinkオブジェクトを作成します.
5.DstreamのaddSinkを呼び出す
org.apache.bahir
flink-connector-redis_2.11
1.0
2.カスタムRedisMapperクラス
例:Datream内のすべてのUserサンプルクラスオブジェクトのIDをkeyにOnline_と書くNumberのsetで
class MyRedisMapper extends RedisMapper[User]{
// redis
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SADD , "Online_Number")
}
// key
override def getKeyFromData(user: User): String = "Online_Number"
// value
override def getValueFromData(user: User): String = user.id.toString
}
3.redis構成オブジェクトの作成
redis sinkは、FlinkJedisPoolConfig、FlinkJedisClusterConfig、FlinkJedisSentinelConfigの3種類のconfigクラスを提供します.
FlinkJedisPoolConfig:単一ノードのredisインスタンス.
FlinkJedisPoolConfig:クラスタモードのredisインスタンス.
FlinkJedisSentinelConfig:哨兵モード付きredisの例.
例としてFlinkJedisPoolConfig:
val config: FlinkJedisPoolConfig = FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
4.RedisSinkオブジェクトの作成
手順3で作成したconfigと手順2で定義したRedisMapperオブジェクトのパラメータを持つRedisSinkオブジェクトを作成します.
val redisSink = new RedisSink[User](config,new MyRedisMapper)
5.DstreamのaddSinkを呼び出す
dataStream.addSink(redisSink)