DataStream APIのsink(8)

2256 ワード

writeAsText(ストリーミング処理でファイルに書くことは少ない)
各要素のtoString()メソッドを呼び出すことによって取得される文字列形式で要素を行単位で書き込みます.
 
print()/printToErr():各要素のtoString()メソッドの値を標準出力または標準エラー出力ストリームに印刷します.
 
カスタム出力addSink【kafka、redis】
        
        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
        

サンプルコード:
/**
 *    socket  ,      redis 
 *
 *  list
 *
 *  lpush list key value
 *
 */
public class StreamingDemoToRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource text = env.socketTextStream("hadoop",9000,"
"); //lpush l_words word // DataStream> l_wordsData = text.map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { return new Tuple2<>("l_words",value); } }); // redis FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop").setPort(6379).build(); // redisSink RedisSink> redisSink = new RedisSink>(conf,new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis"); } public static class MyRedisMapper implements RedisMapper>{ @Override public RedisCommandDescription getCommandDescription() { return null; } // redis Key @Override public String getKeyFromData(Tuple2 data) { return data.f0; } // redis Value @Override public String getValueFromData(Tuple2 data) { return data.f1; } } }