Flink Redis Sinkの使い方とサポートの有効期限の改善
6732 ワード
Flinkのデフォルトでは、Kafka、RabbitMQ、HDFS、ElasticSearchなどとドッキングするコネクタなど、開梱用のコネクタが多数用意されています.他にも、Apache Bahirプロジェクト(公式サイトは粗末ですが、ここを参照)で提供されているコネクタがあります.その中にはRedis Sinkが含まれています.このプロジェクトのドキュメントは少し不足しているので、本稿ではまず使い方を記録します.
以下のようにMaven依存性を導入した.現在bahir-flinkプロジェクトは停滞しており、最新バージョンは1.1-SNAPSHOTである.
最も一般的なスタンドアロンRedisシナリオでは、プラグインが提供するコアクラスは3つあります.それぞれ: FlinkJedisPoolConfigクラス:Jedis接続プールに関するパラメータ; RedisMapperインタフェース:ユーザデータからキー値を抽出し、Redisコマンドを構成するマッパーは、ユーザ自身が実現する必要がある. RedisSinkクラス:構築されたFlinkJedisPoolConfigおよびRedisMapperに基づいてストリームデータがRedisに書き込まれます.
氏はFlinkJedisPoolConfigの例になります.
次に、ウィンドウで統計されたPVとUVデータをJSON形式で表すRedisMapperの実装クラスを書きます.ちっとも難しくない.
最後にRedisSinkを構築することができます.
このRedisコネクタは使いやすいが、keyを設定する期限切れ(TTL)がサポートされていない点と、流水線(pipeline)がサポートされていない点が2つある.窓口がまばらで、書き込み量がそれほど大きくない場合は、流水線はあってもなくてもいいですが、期限切れはやはり重要なので、以下は少し改造します.
プロジェクトコードcloneをローカルに移動し、flink-connector-redisプロジェクトのRedisCommand列挙を見つけ、setexコマンドを追加します.
次にRedisCommandsContainerインタフェースに来て、その定義はすべて具体的なコマンドロジックで、setex()メソッドの定義を加えます.
RedisCommandsContainerインタフェースには、シングルマシン用のRedisContainerとクラスタ用のRedisClusterContainerの2つの実装クラスがあり、setex()メソッドを書き込む具体的な実装があります.
方法の具体的な実現があって、それではどのようにTTLのパラメータを受信しますか?前述したRedisMapperインタフェースに戻り、TTL秒数を取得する方法宣言を追加します.便宜上、default構文でデフォルト値を指定することもできます.
万事そろっているのに東風に欠けているだけで、RedisSinkに来た.invoke()メソッド(前に述べたように、RichSinkFunctionのサブクラスはこのメソッドを実現しなければならない)に、私たちが前に書いたものを加えると、以下のようになります.
Mavenで再構築し、パッケージ化して倉庫に公開すれば使えます.実際に適用する場合、TTLを設定する必要がある場合、ユーザーロジックのRedisMapperはこのように書くことができます.
so easy~
以下のようにMaven依存性を導入した.現在bahir-flinkプロジェクトは停滞しており、最新バージョンは1.1-SNAPSHOTである.
org.apache.bahir
flink-connector-redis_${scala.bin.version}
${bahir.version}
compile
最も一般的なスタンドアロンRedisシナリオでは、プラグインが提供するコアクラスは3つあります.それぞれ:
氏はFlinkJedisPoolConfigの例になります.
// ParameterUtil , ,
// , , Maven profile
Properties redisProps = ParameterUtil.getFromResourceFile("redis.properties");
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost(redisProps.getProperty("redis.host"))
.setPort(NumberUtils.createInteger(redisProps.getProperty("redis.port")))
.setPassword(redisProps.getProperty("redis.pass", ""))
.setDatabase(NumberUtils.createInteger(redisProps.getProperty("redis.db")))
.build();
次に、ウィンドウで統計されたPVとUVデータをJSON形式で表すRedisMapperの実装クラスを書きます.ちっとも難しくない.
public static final class RedisWindowPvUvMapper
implements RedisMapper {
// ,
private String itemType;
public RedisStringMapper(String itemType) {
this.itemType = itemType;
}
// , , set
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
// POJO key
@Override
public String getKeyFromData(WindowedPvUvResult result) {
StringBuilder builder = new StringBuilder("flink:log_pvuv:");
builder.append(result.getWindowEnd());
builder.append("_");
builder.append(itemType);
builder.append("_");
builder.append(result.getItemId());
return builder.toString();
}
// POJO value
@Override
public String getValueFromData(WindowedPvUvResult result) {
return new JSONObject()
.fluentPut("pv", result.getPv())
.fluentPut("uv", result.getUv())
.toJSONString();
}
}
最後にRedisSinkを構築することができます.
dataStream.addSink(new RedisSink<>(jedisPoolConfig, new RedisWindowPvUvMapper("partner")));
このRedisコネクタは使いやすいが、keyを設定する期限切れ(TTL)がサポートされていない点と、流水線(pipeline)がサポートされていない点が2つある.窓口がまばらで、書き込み量がそれほど大きくない場合は、流水線はあってもなくてもいいですが、期限切れはやはり重要なので、以下は少し改造します.
プロジェクトコードcloneをローカルに移動し、flink-connector-redisプロジェクトのRedisCommand列挙を見つけ、setexコマンドを追加します.
SETEX(RedisDataType.STRING),
次にRedisCommandsContainerインタフェースに来て、その定義はすべて具体的なコマンドロジックで、setex()メソッドの定義を加えます.
void setex(String key, int seconds, String value);
RedisCommandsContainerインタフェースには、シングルマシン用のRedisContainerとクラスタ用のRedisClusterContainerの2つの実装クラスがあり、setex()メソッドを書き込む具体的な実装があります.
// RedisContainer.setex()
@Override
public void setex(final String key, final int seconds, final String value) {
Jedis jedis = null;
try {
jedis = getInstance();
jedis.setex(key, seconds, value);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
key, e.getMessage());
}
throw e;
} finally {
releaseInstance(jedis);
}
}
// RedisClusterContainer.setex()
@Override
public void setex(final String key, final int seconds, final String value) {
try {
jedisCluster.setex(key, seconds, value);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
key, e.getMessage());
}
throw e;
}
}
方法の具体的な実現があって、それではどのようにTTLのパラメータを受信しますか?前述したRedisMapperインタフェースに戻り、TTL秒数を取得する方法宣言を追加します.便宜上、default構文でデフォルト値を指定することもできます.
default int getExpireSeconds(T data) {
return 0;
}
万事そろっているのに東風に欠けているだけで、RedisSinkに来た.invoke()メソッド(前に述べたように、RichSinkFunctionのサブクラスはこのメソッドを実現しなければならない)に、私たちが前に書いたものを加えると、以下のようになります.
@Override
public void invoke(IN input, Context context) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
//
int expireSec = redisSinkMapper.getExpireSeconds(input);
Optional optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
// setex
case SETEX:
if (expireSec > 0) {
this.redisCommandsContainer.setex(key, expireSec, value);
}
break;
case PFADD:
// ... ,
}
}
Mavenで再構築し、パッケージ化して倉庫に公開すれば使えます.実際に適用する場合、TTLを設定する必要がある場合、ユーザーロジックのRedisMapperはこのように書くことができます.
public static final class RedisStringMapperWithTTL
implements RedisMapper {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SETEX);
}
@Override
public String getKeyFromData(WindowedPvUvResult result) {
// ...
}
@Override
public String getValueFromData(WindowedPvUvResult result) {
// ...
}
@Override
public int getExpireSeconds(WindowedPvUvResult data) {
return 3 * 24 * 60 * 60; // 3
}
}
}
so easy~