Flink Data Stream共通演算子
map DataStream-->DataStream:マッピングと理解され、各要素を一定の変換を行った後、別の要素にマッピングされます.
flatMap DataStream-->DataStream:パラメータを入力し、0、1または複数の出力を生成します.これは、分割操作 に使用することが多いです. flatMapとmapメソッドの使用は似ていますが、一般的なJavaメソッドの戻り値の結果はすべて1つであるため、flatMapを導入すると、処理後の複数の結果を1つのCollectionsセット(複数の結果を返すのと同様) に配置できます.
filter DataStream→DataStream:フィルタは、データストリーム中の各要素をフィルタ判定し、trueと判断する要素が次のデータストリーム に入る.
keyBy DataStream → KeyedStream はデータストリームをkeyに従って複数の交差しないパーティションに分割し、同じkeyのレコードは同じパーティションに分割され、keyBy()はハッシュパーティションによって実現される. pojoクラスの1つ以上のプロパティをkeyとしても、tupleの要素をkeyとしても構いませんが、keyとしては使用できない2つのタイプがあります. hashCodeメソッドを複写せず、objectのhashCodeメソッドをデフォルトで継承するpojoクラス のみ配列タイプ
reduce KeyedStream→DataStream:データストリーム内の各パーティションのデータをまとめ、min()、max()、avg、countなどのデータを集約することができ、reduceによって を実現することができる.
fold KeyedStream → DataStream 初期値を持つパケット・データ・ストリームのスクロール・折り畳み操作:現在の要素と前回の折り畳み操作の結果をマージし、新しい値を生成します. A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
union DataStreamでunion演算子を使用すると、複数の同じタイプのデータストリームをマージし、同じタイプの新しいデータストリームを生成できます.つまり、複数のDataStreamを新しいDataStreamにマージできます.
join指定キーに従って2つのストリームを関連付ける .
coGroupは、2つのストリームを関連付け、関連付けられていないものも保持します.
split DataStream→SplitStream:指定するDataStreamストリームを複数のストリームに分割し、 をSplitStreamで表す.
select SplitStream→DataStream:1つのSplitStreamストリームから.select()メソッドで所望のストリーム を得る.
#Java map
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo {
public static void main(String[] args) throws Exception {
//
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
DataStream socketTextStream = env.socketTextStream("localhost", 9000, "n");
//lambda
DataStream result3 = socketTextStream.map( value -> value + "love");
// dataStream
result3.print();
//
env.execute();
}
}
#Scala map
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object MapDemoScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("127.0.0.1", 9875)
text.map(x => x + "love")
}
}
flatMap
#Java flatMap
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
//
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
DataStream socketTextStream = env.socketTextStream("localhost", 9000, "n");
DataStream result = socketTextStream.flatMap((String s, Collector collector) ->{
for(String str: s.split(" ")){
collector.collect(str);
}
});
// dataStream
result.print();
//
env.execute();
}
}
#Scala flatMap
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object FlatMapDemoScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("127.0.0.1", 9002)
text.flatMap{str => str.split(" ")}
text.flatMap{_.split(" ")}
}
}
filter
#Java
DataStream res = socketTestStream.filter(new FilterFunction() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("S");
}
});
#Scala
text.filter{_.startsWith("S")}
.print()
.setParallelism(1)
keyBy
#Java in POJO
SingleOutputStreamOperator streamOperator = socketTextStream
.flatMap((String value, Collector out) -> {
Arrays.stream(value.split(" ")).
forEach(str -> out.collect(WordCountPOJO.of(value, 1)));
}).returns(WordCountPOJO.class);
KeyedStream keyedStream = streamOperator.keyBy("word");
SingleOutputStreamOperator summed = keyedStream.sum("count");
#Java in Tuple
SingleOutputStreamOperator> singleOutputStreamOperator = dataStreamSource
.flatMap((String value, Collector> out)-> {
Arrays.stream(value.split(" "))
.forEach(str -> out.collect(Tuple2.of(value, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream, Tuple> keyedStream = singleOutputStreamOperator.keyBy(0);
SingleOutputStreamOperator> sum = keyedStream.sum(1);
#Scala in POJO
text.flatMap{_.split(" ")}
.map(x => WordCountPOJO(x,1))
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
.print()
.setParallelism(1)
reduce
public class StudentPOJO {
private String name;
private String gender;
private String className;
private double score;
public StudentPOJO() {
}
public StudentPOJO(String name, String gender, String className, double score) {
this.name = name;
this.gender = gender;
this.className = className;
this.score = score;
}
public static StudentPOJO of(String name, String gender, String className, double score) {
return new StudentPOJO(name,gender, className,score);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
@Override
public String toString() {
return "StudentPOJO{" +
"name='" + name + ''' +
", gender='" + gender + ''' +
", className='" + className + ''' +
", score=" + score +
'}';
}
}
#Java in POJO
SingleOutputStreamOperator flatMapSocketTextStream = socketTextStream
.flatMap((String value, Collector out) -> {
String[] values = value.split(" ");
out.collect(new StudentPOJO(values[0], values[1], values[2], Double.valueOf(values[3])));
}).returns(StudentPOJO.class);
DataStream res = flatMapSocketTextStream
.keyBy("className")
.reduce((s1, s2) ->
s1.getScore() > s2.getScore() ? s1 : s2
);
#Java in Tuple
DataStream> res1 = socketTextStream
.map(value -> Tuple2.of(value.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(10))
.reduce((Tuple2 t1, Tuple2 t2) ->
new Tuple2<>(t1.f0, t1.f1 + t2.f1));
DataStream> res2 = socketTextStream
.map(value -> Tuple2.of(value.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(10))
.reduce((old, news) -> {
old.f1 += news.f1;
return old;
}).returns(Types.TUPLE(Types.STRING, Types.INT));
fold
#TODO
DataStream res = socketTextStream.map(value -> Tuple2.of(value.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.fold(" :",(String current, Tuple2 t2) -> current + t2.f0 + ",");
union
DataStream streamSource01 = env.socketTextStream("localhost", 8888);
DataStream streamSource02 = env.socketTextStream("localhost", 9922);
DataStream mapStreamSource01 = streamSource01.map(value -> " 8888 : " + value);
DataStream mapStreamSource02 = streamSource02.map(value -> " 9922 : " + value);
DataStream res = mapStreamSource01.union(mapStreamSource02);
join
DataStream> mapStreamSource01 = streamSource01
.map(value -> Tuple2.of(value, " 8888 " + value))
.returns(Types.TUPLE(Types.STRING, Types.STRING));
DataStream> mapStreamSource02 = streamSource02
.map(value -> Tuple2.of(value, " 9922 " + value))
.returns(Types.TUPLE(Types.STRING, Types.STRING));
DataStream res = mapStreamSource01.join(mapStreamSource02)
.where(t1->t1.getField(0))
.equalTo(t2->t2.getField(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply((t1,t2) -> t1.getField(1) + "|" + t2.getField(1));
coGroup
DataStream> mapStreamSource01 = streamSource01
.map(value -> Tuple2.of(value, "8888 : " + value))
.returns(Types.TUPLE(Types.STRING, Types.STRING));
DataStream> mapStreamSource02 = streamSource02
.map(value -> Tuple2.of(value, "9922 : " + value))
.returns(Types.TUPLE(Types.STRING, Types.STRING));
DataStream res = mapStreamSource01.coGroup(mapStreamSource02)
.where(t1 -> t1.getField(0))
.equalTo(t2 -> t2.getField(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new CoGroupFunction, Tuple2, String>() {
@Override
public void coGroup(Iterable> iterable1, Iterable> iterable2, Collector collector) throws Exception {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(" 8888 stream--");
for (Tuple2 item : iterable1) {
stringBuffer.append(item.f1 + " | ");
}
stringBuffer.append(" 9922 stream--");
for (Tuple2 item : iterable2) {
stringBuffer.append(item.f1);
}
collector.collect(stringBuffer.toString());
}
});
split
select
SplitStream> splitStream = streamSource
.map(values -> Tuple2.of(values.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.split( t -> {
List list = new ArrayList<>();
if (isNumeric(t.f0)) {
list.add("num");
} else {
list.add("str");
}
return list;
});
DataStream> strDataStream1 = splitStream.select("str")
.map( t -> Tuple2.of(" : " + t.f0, t.f1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);
DataStream> strDataStream2 = splitStream.select("num")
.map( t -> Tuple2.of(" : " + t.f0, t.f1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);