Flink Data Stream APIのData Sink

3164 ワード

基本的な紹介
writeAsText():各要素のtoString()メソッドを呼び出すことによって取得される要素を文字列形式で行単位で書き込む
print()/printToErr():各要素のtoString()メソッドの値を標準出力または標準エラー出力ストリームに印刷します.
カスタム出力addSink【kafka、redis】
Sinkフォールトトレランス保証
Sink
意味保証
コメント
hdfs
exactly once
 
elasticsearch
at least once
 
kafka produce
at least once/exactly once
Kafka 0.9 and 0.10 at least once Kafka 0.11 exactly onceを提供
file
at least once
 
redis
at least once
 
カスタムsink
カスタムsinkを実装するには、SinkFunctionインタフェースを実装するか、RichSinkFunctionを継承する必要があります.
具体的な方法はorg.を参照することができる.apache.flink.streaming.connectors.redis.RedisSink
カスタムredisSink demo
依存を追加:

  org.apache.bahir
  flink-connector-redis_2.11
  1.0
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class StreamingDemoToRedis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream text = env.socketTextStream("192.168.24.141", 9000, "
"); SingleOutputStreamOperator> l_words = text.map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { return new Tuple2<>("l_words", value); } }); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.24.141").setPort(6379).build(); RedisSink> redisSink = new RedisSink<>(conf, new MyRedisMapper()); l_words.addSink(redisSink); env.execute("StreamingDemoToRedis"); } public static class MyRedisMapper implements RedisMapper> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } @Override public String getKeyFromData(Tuple2 data) { return data.f0; } @Override public String getValueFromData(Tuple2 data) { return data.f1; } } }