Flink SocketWindowWordCountソース分析
3065 ワード
今日はFlinkスタンドアロン環境に乗り、持参した単語統計プログラムを試してみましたが、できるだけ早くFlink開発を上手に使うために単語統計のソースコードを見てみました~
1.タイムウィンドウ:
(1)timeWindow(Time.seconds(2))は、スクロールタイムウィンドウ(Tumbling window)であることを示すパラメータが1つしかない、つまり重複しないタイムウィンドウであり、本ウィンドウ内のデータのみを統計することを示す;(2)timeWindow(Time.seconds(5)、Time.seconds(1))は、スライドタイムウィンドウ(Sliding window)であることを示す2つのパラメータがあり、すなわちt 2時間毎に、前t 1時間のデータを統計する.この例では、毎秒5秒前のデータを計算する.
2.DataStreamのtransformation操作API
参照先:https://www.cnblogs.com/lanyun0520/p/5730403.html
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// final ,
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port '");
return;
}
// (1)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// (2) , socket
DataStream text = env.socketTextStream("localhost", port, "
");
// (3)transformation ,
DataStream windowCounts = text
// , , out Collector
.flatMap(new FlatMapFunction() {
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
// , key 。 , , reduce
.keyBy("word")
// , 1 5
.timeWindow(Time.seconds(5), Time.seconds(1))
// KeyedDataStream “ ” reduce 。 reduce element , 。
// , ,
.reduce(new ReduceFunction() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
//
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ,
public static class WordWithCount {
//
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
1.タイムウィンドウ:
(1)timeWindow(Time.seconds(2))は、スクロールタイムウィンドウ(Tumbling window)であることを示すパラメータが1つしかない、つまり重複しないタイムウィンドウであり、本ウィンドウ内のデータのみを統計することを示す;(2)timeWindow(Time.seconds(5)、Time.seconds(1))は、スライドタイムウィンドウ(Sliding window)であることを示す2つのパラメータがあり、すなわちt 2時間毎に、前t 1時間のデータを統計する.この例では、毎秒5秒前のデータを計算する.
2.DataStreamのtransformation操作API
参照先:https://www.cnblogs.com/lanyun0520/p/5730403.html