Flink Redis Sinkの使い方とサポートの有効期限の改善

6732 ワード

Flinkのデフォルトでは、Kafka、RabbitMQ、HDFS、ElasticSearchなどとドッキングするコネクタなど、開梱用のコネクタが多数用意されています.他にも、Apache Bahirプロジェクト(公式サイトは粗末ですが、ここを参照)で提供されているコネクタがあります.その中にはRedis Sinkが含まれています.このプロジェクトのドキュメントは少し不足しているので、本稿ではまず使い方を記録します.
以下のようにMaven依存性を導入した.現在bahir-flinkプロジェクトは停滞しており、最新バージョンは1.1-SNAPSHOTである.
    
      org.apache.bahir
      flink-connector-redis_${scala.bin.version}
      ${bahir.version}
      compile
    

最も一般的なスタンドアロンRedisシナリオでは、プラグインが提供するコアクラスは3つあります.それぞれ:
  • FlinkJedisPoolConfigクラス:Jedis接続プールに関するパラメータ;
  • RedisMapperインタフェース:ユーザデータからキー値を抽出し、Redisコマンドを構成するマッパーは、ユーザ自身が実現する必要がある.
  • RedisSinkクラス:構築されたFlinkJedisPoolConfigおよびRedisMapperに基づいてストリームデータがRedisに書き込まれます.

  • 氏はFlinkJedisPoolConfigの例になります.
        //    ParameterUtil       ,              ,     
        //           ,             ,   Maven profile   
        Properties redisProps = ParameterUtil.getFromResourceFile("redis.properties");
    
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
          .setHost(redisProps.getProperty("redis.host"))
          .setPort(NumberUtils.createInteger(redisProps.getProperty("redis.port")))
          .setPassword(redisProps.getProperty("redis.pass", ""))
          .setDatabase(NumberUtils.createInteger(redisProps.getProperty("redis.db")))
          .build();
    

    次に、ウィンドウで統計されたPVとUVデータをJSON形式で表すRedisMapperの実装クラスを書きます.ちっとも難しくない.
      public static final class RedisWindowPvUvMapper
        implements RedisMapper {
        //         ,      
        private String itemType;
    
        public RedisStringMapper(String itemType) {
          this.itemType = itemType;
        }
    
        //     ,       ,   set
        @Override
        public RedisCommandDescription getCommandDescription() {
          return new RedisCommandDescription(RedisCommand.SET);
        }
    
        //  POJO  key
        @Override
        public String getKeyFromData(WindowedPvUvResult result) {
          StringBuilder builder = new StringBuilder("flink:log_pvuv:");
          builder.append(result.getWindowEnd());
          builder.append("_");
          builder.append(itemType);
          builder.append("_");
          builder.append(result.getItemId());
          return builder.toString();
        }
    
        //  POJO  value
        @Override
        public String getValueFromData(WindowedPvUvResult result) {
          return new JSONObject()
            .fluentPut("pv", result.getPv())
            .fluentPut("uv", result.getUv())
            .toJSONString();
        }
      }
    

    最後にRedisSinkを構築することができます.
    dataStream.addSink(new RedisSink<>(jedisPoolConfig, new RedisWindowPvUvMapper("partner")));
    

    このRedisコネクタは使いやすいが、keyを設定する期限切れ(TTL)がサポートされていない点と、流水線(pipeline)がサポートされていない点が2つある.窓口がまばらで、書き込み量がそれほど大きくない場合は、流水線はあってもなくてもいいですが、期限切れはやはり重要なので、以下は少し改造します.
    プロジェクトコードcloneをローカルに移動し、flink-connector-redisプロジェクトのRedisCommand列挙を見つけ、setexコマンドを追加します.
    SETEX(RedisDataType.STRING),
    

    次にRedisCommandsContainerインタフェースに来て、その定義はすべて具体的なコマンドロジックで、setex()メソッドの定義を加えます.
    void setex(String key, int seconds, String value);
    

    RedisCommandsContainerインタフェースには、シングルマシン用のRedisContainerとクラスタ用のRedisClusterContainerの2つの実装クラスがあり、setex()メソッドを書き込む具体的な実装があります.
        // RedisContainer.setex()
        @Override
        public void setex(final String key, final int seconds, final String value) {
            Jedis jedis = null;
            try {
                jedis = getInstance();
                jedis.setex(key, seconds, value);
            } catch (Exception e) {
                if (LOG.isErrorEnabled()) {
                    LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
                      key, e.getMessage());
                }
                throw e;
            } finally {
                releaseInstance(jedis);
            }
        }
    
        // RedisClusterContainer.setex()
        @Override
        public void setex(final String key, final int seconds, final String value) {
            try {
                jedisCluster.setex(key, seconds, value);
            } catch (Exception e) {
                if (LOG.isErrorEnabled()) {
                    LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
                      key, e.getMessage());
                }
                throw e;
            }
        }
    

    方法の具体的な実現があって、それではどのようにTTLのパラメータを受信しますか?前述したRedisMapperインタフェースに戻り、TTL秒数を取得する方法宣言を追加します.便宜上、default構文でデフォルト値を指定することもできます.
         default int getExpireSeconds(T data) {
             return 0;
         }
    

    万事そろっているのに東風に欠けているだけで、RedisSinkに来た.invoke()メソッド(前に述べたように、RichSinkFunctionのサブクラスはこのメソッドを実現しなければならない)に、私たちが前に書いたものを加えると、以下のようになります.
        @Override
        public void invoke(IN input, Context context) throws Exception {
            String key = redisSinkMapper.getKeyFromData(input);
            String value = redisSinkMapper.getValueFromData(input);
            //       
            int expireSec = redisSinkMapper.getExpireSeconds(input);
            Optional optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
    
            switch (redisCommand) {
                case RPUSH:
                    this.redisCommandsContainer.rpush(key, value);
                    break;
                case LPUSH:
                    this.redisCommandsContainer.lpush(key, value);
                    break;
                case SADD:
                    this.redisCommandsContainer.sadd(key, value);
                    break;
                case SET:
                    this.redisCommandsContainer.set(key, value);
                    break;
                //    setex  
                case SETEX:
                    if (expireSec > 0) {
                        this.redisCommandsContainer.setex(key, expireSec, value);
                    }
                    break;
                case PFADD:
                // ...    ,  
            }
        }
    

    Mavenで再構築し、パッケージ化して倉庫に公開すれば使えます.実際に適用する場合、TTLを設定する必要がある場合、ユーザーロジックのRedisMapperはこのように書くことができます.
      public static final class RedisStringMapperWithTTL
        implements RedisMapper {
        @Override
        public RedisCommandDescription getCommandDescription() {
          return new RedisCommandDescription(RedisCommand.SETEX);
        }
    
        @Override
        public String getKeyFromData(WindowedPvUvResult result) {
          // ...
        }
    
        @Override
        public String getValueFromData(WindowedPvUvResult result) {
          // ...
        }
    
        @Override
        public int getExpireSeconds(WindowedPvUvResult data) {
          return 3 * 24 * 60 * 60;   // 3 
        }
      }
    }
    

    so easy~