Flink Data Stream APIのData Sink
3164 ワード
基本的な紹介
writeAsText():各要素のtoString()メソッドを呼び出すことによって取得される要素を文字列形式で行単位で書き込む
print()/printToErr():各要素のtoString()メソッドの値を標準出力または標準エラー出力ストリームに印刷します.
カスタム出力addSink【kafka、redis】
Sinkフォールトトレランス保証
Sink
意味保証
コメント
hdfs
exactly once
elasticsearch
at least once
kafka produce
at least once/exactly once
Kafka 0.9 and 0.10 at least once Kafka 0.11 exactly onceを提供
file
at least once
redis
at least once
カスタムsink
カスタムsinkを実装するには、SinkFunctionインタフェースを実装するか、RichSinkFunctionを継承する必要があります.
具体的な方法はorg.を参照することができる.apache.flink.streaming.connectors.redis.RedisSink
カスタムredisSink demo
依存を追加:
writeAsText():各要素のtoString()メソッドを呼び出すことによって取得される要素を文字列形式で行単位で書き込む
print()/printToErr():各要素のtoString()メソッドの値を標準出力または標準エラー出力ストリームに印刷します.
カスタム出力addSink【kafka、redis】
Sinkフォールトトレランス保証
Sink
意味保証
コメント
hdfs
exactly once
elasticsearch
at least once
kafka produce
at least once/exactly once
Kafka 0.9 and 0.10 at least once Kafka 0.11 exactly onceを提供
file
at least once
redis
at least once
カスタムsink
カスタムsinkを実装するには、SinkFunctionインタフェースを実装するか、RichSinkFunctionを継承する必要があります.
具体的な方法はorg.を参照することができる.apache.flink.streaming.connectors.redis.RedisSink
カスタムredisSink demo
依存を追加:
org.apache.bahir
flink-connector-redis_2.11
1.0
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class StreamingDemoToRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.socketTextStream("192.168.24.141", 9000, "
");
SingleOutputStreamOperator> l_words = text.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>("l_words", value);
}
});
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.24.141").setPort(6379).build();
RedisSink> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_words.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
@Override
public String getKeyFromData(Tuple2 data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2 data) {
return data.f1;
}
}
}