Flink Data Stream共通演算子

12911 ワード

map
  • DataStream-->DataStream:マッピングと理解され、各要素を一定の変換を行った後、別の要素にマッピングされます.
  • #Java  map
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    public class MapDemo {
         public static void main(String[] args) throws Exception {
             //      
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
             //     
             DataStream socketTextStream = env.socketTextStream("localhost", 9000, "n");
             //lambda  
             DataStream result3 = socketTextStream.map( value -> value + "love");
             //  dataStream  
             result3.print();
             //  
             env.execute();
         }
    }
    
    #Scala  map
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.api.scala._
    object MapDemoScala {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val text = env.socketTextStream("127.0.0.1", 9875)
        text.map(x => x + "love")
      }
    }

    flatMap
  • DataStream-->DataStream:パラメータを入力し、0、1または複数の出力を生成します.これは、分割操作
  • に使用することが多いです.
  • flatMapとmapメソッドの使用は似ていますが、一般的なJavaメソッドの戻り値の結果はすべて1つであるため、flatMapを導入すると、処理後の複数の結果を1つのCollectionsセット(複数の結果を返すのと同様)
  • に配置できます.
    #Java  flatMap
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    public class FlatMapDemo {
         public static void main(String[] args) throws Exception {
             //      
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
             //     
             DataStream socketTextStream = env.socketTextStream("localhost", 9000, "n");
             DataStream result = socketTextStream.flatMap((String s, Collector collector) ->{
                        for(String str: s.split(" ")){
                            collector.collect(str);
                        }
             });
             //  dataStream  
             result.print();
             //  
             env.execute();
         }
    }
    
    #Scala  flatMap
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.api.scala._
    
    object FlatMapDemoScala {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val text = env.socketTextStream("127.0.0.1", 9002)
        text.flatMap{str => str.split(" ")}
        text.flatMap{_.split(" ")}
      }
    }

    filter
  • DataStream→DataStream:フィルタは、データストリーム中の各要素をフィルタ判定し、trueと判断する要素が次のデータストリーム
  • に入る.
    #Java
    DataStream res = socketTestStream.filter(new FilterFunction() {
        @Override
         public boolean filter(String s) throws Exception {
                return s.startsWith("S");
         }
    });
    
    #Scala
    text.filter{_.startsWith("S")}
      .print()
      .setParallelism(1)

    keyBy
  • DataStream → KeyedStream
  • はデータストリームをkeyに従って複数の交差しないパーティションに分割し、同じkeyのレコードは同じパーティションに分割され、keyBy()はハッシュパーティションによって実現される.
  • pojoクラスの1つ以上のプロパティをkeyとしても、tupleの要素をkeyとしても構いませんが、keyとしては使用できない2つのタイプがあります.
  • hashCodeメソッドを複写せず、objectのhashCodeメソッドをデフォルトで継承するpojoクラス
  • のみ
  • 配列タイプ
  • #Java in POJO
    SingleOutputStreamOperator streamOperator = socketTextStream
            .flatMap((String value, Collector out) -> {
                Arrays.stream(value.split(" ")).
                forEach(str -> out.collect(WordCountPOJO.of(value, 1)));
            }).returns(WordCountPOJO.class);
            
    KeyedStream keyedStream = streamOperator.keyBy("word");
    SingleOutputStreamOperator summed = keyedStream.sum("count");
    
    #Java in Tuple
    SingleOutputStreamOperator> singleOutputStreamOperator = dataStreamSource
                .flatMap((String value, Collector> out)-> {
                     Arrays.stream(value.split(" "))
                    .forEach(str -> out.collect(Tuple2.of(value, 1)));
                }).returns(Types.TUPLE(Types.STRING, Types.INT));
     
    KeyedStream, Tuple> keyedStream = singleOutputStreamOperator.keyBy(0);
    SingleOutputStreamOperator> sum = keyedStream.sum(1);
         
    #Scala in POJO
    text.flatMap{_.split(" ")}
      .map(x => WordCountPOJO(x,1))
      .keyBy("word")
      .timeWindow(Time.seconds(5))
      .sum("count")
      .print()
      .setParallelism(1)

    reduce
  • KeyedStream→DataStream:データストリーム内の各パーティションのデータをまとめ、min()、max()、avg、countなどのデータを集約することができ、reduceによって
  • を実現することができる.
    public class StudentPOJO {
         private String name;
         private String gender;
         private String className;
         private double score;
         public StudentPOJO() {
    
         }
        public StudentPOJO(String name, String gender, String className, double score) {
             this.name = name;
             this.gender = gender;
             this.className = className;
             this.score = score;
        }
        public static StudentPOJO of(String name, String gender, String className, double score) {
            return new StudentPOJO(name,gender, className,score);
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getGender() {
            return gender;
        }
        public void setGender(String gender) {
            this.gender = gender;
        }
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public double getScore() {
            return score;
        }
        public void setScore(double score) {
            this.score = score;
        }
        @Override
        public String toString() {
            return "StudentPOJO{" +
                    "name='" + name + ''' +
                    ", gender='" + gender + ''' +
                    ", className='" + className + ''' +
                    ", score=" + score +
                    '}';
        }
    }
    
    #Java in POJO
    SingleOutputStreamOperator flatMapSocketTextStream = socketTextStream
            .flatMap((String value, Collector out) -> {
                 String[] values = value.split(" ");
                 out.collect(new StudentPOJO(values[0], values[1], values[2], Double.valueOf(values[3])));
            }).returns(StudentPOJO.class);
            
    DataStream res = flatMapSocketTextStream
            .keyBy("className")
            .reduce((s1, s2) ->
                s1.getScore() > s2.getScore() ? s1 : s2
            );
            
    #Java in Tuple
    DataStream> res1 = socketTextStream
            .map(value -> Tuple2.of(value.trim(), 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(10))
            .reduce((Tuple2 t1, Tuple2 t2) ->
                    new Tuple2<>(t1.f0, t1.f1 + t2.f1));
                    
    DataStream> res2 = socketTextStream
            .map(value -> Tuple2.of(value.trim(), 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(10))
            .reduce((old, news) -> {
                old.f1 += news.f1;
             return old;
             }).returns(Types.TUPLE(Types.STRING, Types.INT));

    fold
  • KeyedStream → DataStream
  • 初期値を持つパケット・データ・ストリームのスクロール・折り畳み操作:現在の要素と前回の折り畳み操作の結果をマージし、新しい値を生成します.
  • A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
  • #TODO          
    DataStream res = socketTextStream.map(value -> Tuple2.of(value.trim(), 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .fold("  :",(String current, Tuple2 t2) -> current + t2.f0 + ",");

    union
  • DataStreamでunion演算子を使用すると、複数の同じタイプのデータストリームをマージし、同じタイプの新しいデータストリームを生成できます.つまり、複数のDataStreamを新しいDataStreamにマージできます.
  • DataStream streamSource01 = env.socketTextStream("localhost", 8888);
    DataStream streamSource02 = env.socketTextStream("localhost", 9922);
    
    DataStream mapStreamSource01 = streamSource01.map(value -> "  8888     : " + value);
    DataStream mapStreamSource02 = streamSource02.map(value -> "  9922     : " + value);
    
    DataStream res = mapStreamSource01.union(mapStreamSource02);

    join
  • 指定キーに従って2つのストリームを関連付ける
  • .
    DataStream> mapStreamSource01 = streamSource01
            .map(value -> Tuple2.of(value, "  8888     " + value))
            .returns(Types.TUPLE(Types.STRING, Types.STRING));
            
    DataStream> mapStreamSource02 = streamSource02
            .map(value -> Tuple2.of(value, "  9922     " + value))
            .returns(Types.TUPLE(Types.STRING, Types.STRING));
            
    DataStream res = mapStreamSource01.join(mapStreamSource02)
            .where(t1->t1.getField(0))
            .equalTo(t2->t2.getField(0))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .apply((t1,t2) -> t1.getField(1) + "|" + t2.getField(1));

    coGroup
  • は、2つのストリームを関連付け、関連付けられていないものも保持します.
  • DataStream> mapStreamSource01 = streamSource01
            .map(value -> Tuple2.of(value, "8888    : " + value))
            .returns(Types.TUPLE(Types.STRING, Types.STRING));
            
    DataStream> mapStreamSource02 = streamSource02
            .map(value -> Tuple2.of(value, "9922    : " + value))
            .returns(Types.TUPLE(Types.STRING, Types.STRING));
            
    DataStream res = mapStreamSource01.coGroup(mapStreamSource02)
            .where(t1 -> t1.getField(0))
            .equalTo(t2 -> t2.getField(0))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .apply(new CoGroupFunction, Tuple2, String>() {
                @Override
                public void coGroup(Iterable> iterable1, Iterable> iterable2, Collector collector) throws Exception {
                     StringBuffer stringBuffer = new StringBuffer();
                     stringBuffer.append("  8888 stream--");
                     for (Tuple2 item : iterable1) {
                                        stringBuffer.append(item.f1 + " | ");
                     }
                     stringBuffer.append("  9922 stream--");
                     for (Tuple2 item : iterable2) {
                                        stringBuffer.append(item.f1);
                     }
                    collector.collect(stringBuffer.toString());
                }      
            });

    split
  • DataStream→SplitStream:指定するDataStreamストリームを複数のストリームに分割し、
  • をSplitStreamで表す.
    select
  • SplitStream→DataStream:1つのSplitStreamストリームから.select()メソッドで所望のストリーム
  • を得る.
    SplitStream> splitStream = streamSource
            .map(values -> Tuple2.of(values.trim(), 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .split( t -> {
                List list = new ArrayList<>();
                 if (isNumeric(t.f0)) {
                    list.add("num");
                 } else {
                    list.add("str");
                 }
                 return list;
            });
     
    DataStream>  strDataStream1 = splitStream.select("str")
            .map( t -> Tuple2.of("   : " + t.f0, t.f1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .sum(1);
            
    DataStream>  strDataStream2 = splitStream.select("num")
            .map( t -> Tuple2.of("  : " + t.f0, t.f1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .sum(1);