Flinkのredis sink

1502 ワード

1.依存のインポート


    org.apache.bahir
    flink-connector-redis_2.11
    1.0

2.カスタムRedisMapperクラス
例:Datream内のすべてのUserサンプルクラスオブジェクトのIDをkeyにOnline_と書くNumberのsetで
class MyRedisMapper extends RedisMapper[User]{

//           redis     
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SADD , "Online_Number")
  }
  
//       key 
    override def getKeyFromData(user: User): String = "Online_Number"
//       value 
    override def getValueFromData(user: User): String = user.id.toString
}

3.redis構成オブジェクトの作成
redis sinkは、FlinkJedisPoolConfig、FlinkJedisClusterConfig、FlinkJedisSentinelConfigの3種類のconfigクラスを提供します.
FlinkJedisPoolConfig:単一ノードのredisインスタンス.
FlinkJedisPoolConfig:クラスタモードのredisインスタンス.
FlinkJedisSentinelConfig:哨兵モード付きredisの例.
例としてFlinkJedisPoolConfig:
 val config: FlinkJedisPoolConfig = FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()

4.RedisSinkオブジェクトの作成
手順3で作成したconfigと手順2で定義したRedisMapperオブジェクトのパラメータを持つRedisSinkオブジェクトを作成します.
val redisSink = new RedisSink[User](config,new MyRedisMapper)

5.DstreamのaddSinkを呼び出す
dataStream.addSink(redisSink)