Flink SocketWindowWordCountソース分析

3065 ワード

今日はFlinkスタンドアロン環境に乗り、持参した単語統計プログラムを試してみましたが、できるだけ早くFlink開発を上手に使うために単語統計のソースコードを見てみました~
public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {

        //  final        ,     
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port '");
            return;
        }

        // (1)      
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // (2)     ,          socket          
        DataStream text = env.socketTextStream("localhost", port, "
"); // (3)transformation , DataStream windowCounts = text // , , out Collector .flatMap(new FlatMapFunction() { public void flatMap(String value, Collector out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) // , key 。 , , reduce .keyBy("word") // , 1 5 .timeWindow(Time.seconds(5), Time.seconds(1)) // KeyedDataStream “ ” reduce 。 reduce element , 。 // , , .reduce(new ReduceFunction() { public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // , public static class WordWithCount { // public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }

1.タイムウィンドウ:
(1)timeWindow(Time.seconds(2))は、スクロールタイムウィンドウ(Tumbling window)であることを示すパラメータが1つしかない、つまり重複しないタイムウィンドウであり、本ウィンドウ内のデータのみを統計することを示す;(2)timeWindow(Time.seconds(5)、Time.seconds(1))は、スライドタイムウィンドウ(Sliding window)であることを示す2つのパラメータがあり、すなわちt 2時間毎に、前t 1時間のデータを統計する.この例では、毎秒5秒前のデータを計算する.
2.DataStreamのtransformation操作API
参照先:https://www.cnblogs.com/lanyun0520/p/5730403.html