DataStream APIのsink(8)
2256 ワード
writeAsText(ストリーミング処理でファイルに書くことは少ない)
各要素のtoString()メソッドを呼び出すことによって取得される文字列形式で要素を行単位で書き込みます.
print()/printToErr():各要素のtoString()メソッドの値を標準出力または標準エラー出力ストリームに印刷します.
カスタム出力addSink【kafka、redis】
サンプルコード:
各要素のtoString()メソッドを呼び出すことによって取得される文字列形式で要素を行単位で書き込みます.
print()/printToErr():各要素のtoString()メソッドの値を標準出力または標準エラー出力ストリームに印刷します.
カスタム出力addSink【kafka、redis】
org.apache.bahir
flink-connector-redis_2.11
1.0
サンプルコード:
/**
* socket , redis
*
* list
*
* lpush list key value
*
*/
public class StreamingDemoToRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource text = env.socketTextStream("hadoop",9000,"
");
//lpush l_words word
//
DataStream> l_wordsData = text.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>("l_words",value);
}
});
// redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop").setPort(6379).build();
// redisSink
RedisSink> redisSink = new RedisSink>(conf,new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper>{
@Override
public RedisCommandDescription getCommandDescription() {
return null;
}
// redis Key
@Override
public String getKeyFromData(Tuple2 data) {
return data.f0;
}
// redis Value
@Override
public String getValueFromData(Tuple2 data) {
return data.f1;
}
}
}