FlankにおけるTable API&SQL実現(スクロール/スライド)ウィンドウ

29716 ワード

本文の冒頭に添付します:Flink学習コースシリーズ^u^
  ,  ID,  ID,  
1582703711000,u1,p1,5
1582703712000,u1,p1,5
1582703712000,u2,p1,3
1582703713000,u1,p1,5
1582703719999,u2,p1,3
1582703729999,u1,p1,5
1.スクロールウィンドウの実現
/**
 * TODO Table API       (10s   )
 *
 * @author liuzebiao
 * @Date 2020-2-26 15:37
 */
public class TumblingEventTimeWindowsTableDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Row> rowDataStream = streamSource.map(new MapFunction<String, Row>() {
            @Override
            public Row map(String line) throws Exception {
                String[] fields = line.split(",");
                Long time = Long.parseLong(fields[0]);
                String uid = fields[1];
                String pid = fields[2];
                Double money = Double.parseDouble(fields[3]);
                return Row.of(time, uid, pid, money);
            }
        }).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.DOUBLE));

        SingleOutputStreamOperator<Row> waterMarksRow = rowDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(Row row) {
                return (long) row.getField(0);
            }
        });

        tableEnv.registerDataStream("t_orders", waterMarksRow, "atime,uid,pid,money,rowtime.rowtime");


        /******Flink Table API      start ******/
        Table table = tableEnv.scan("t_orders")
                .window(Tumble.over("10.seconds").on("rowtime").as("win"))
                .groupBy("uid,win")
                .select("uid,win.start,win.end,win.rowtime,money.sum as total");
        /******Flink Table API      end ******/
        
        

        /******Flink SQL      start ******/
        String sql = "select uid,sum(money),tumble_start(rowtime,interval '10' SECOND) as wstart,TUMBLE_END(rowtime,interval '10' second) as wend" +
                " from t_orders group by tumble(rowtime,interval '10' second),uid";

        Table table = tableEnv.sqlQuery(sql);
        /******Flink SQL      end ******/
        

        DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);

        dataStream.print();

        env.execute("TumblingEventTimeWindowsTable");
    }
}
u1,2020-02-26 07:55:10.0,2020-02-26 07:55:20.0,2020-02-26 07:55:19.999,15.0
u2,2020-02-26 07:55:10.0,2020-02-26 07:55:20.0,2020-02-26 07:55:19.999,6.0
2.スライドウィンドウを実現する
/**
 * TODO Table API       (    10s,    2s)
 *
 * @author liuzebiao
 * @Date 2020-2-26 15:37
 */
public class SlidingEventTimeWindowsTableDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Row> rowDataStream = streamSource.map(new MapFunction<String, Row>() {
            @Override
            public Row map(String line) throws Exception {
                String[] fields = line.split(",");
                Long time = Long.parseLong(fields[0]);
                String uid = fields[1];
                String pid = fields[2];
                Double money = Double.parseDouble(fields[3]);
                return Row.of(time, uid, pid, money);
            }
        }).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.DOUBLE));

        SingleOutputStreamOperator<Row> waterMarksRow = rowDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(Row row) {
                return (long) row.getField(0);
            }
        });

        tableEnv.registerDataStream("t_orders", waterMarksRow, "atime,uid,pid,money,rowtime.rowtime");


        /******Flink Table API      start ******/
        Table table = tableEnv.scan("t_orders")
                .window(Slide.over("10.seconds").every("2.seconds").on("rowtime").as("win"))
                .groupBy("uid,win")
                .select("uid,win.start,win.end,win.rowtime,money.sum as total");
        /******Flink Table API      end ******/


        /******Flink SQL      start ******/
//        String sql = "select uid,sum(money),hop_end(rowtime,interval '2' SECOND,interval '10' second) as widEnd" +
//                " from t_orders group by hop(rowtime,interval '2' second,interval '10' second),uid";
//
//        Table table = tableEnv.sqlQuery(sql);
        /******Flink SQL      end ******/


        DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);

        dataStream.print();

        env.execute("TumblingEventTimeWindowsTable");
    }
}
ブロガーは文章を書くのが容易ではないので、注目してみましょう。
注意してください。いいですね。気をつけてください。道に迷わないようにしてください。
ブロガーは全ての知識点が正しいとは保証できませんが、手打ちが保証されますので、ミスも指摘してください。ω・)ノ