FlankにおけるTable API&SQL実現(スクロール/スライド)ウィンドウ
29716 ワード
本文の冒頭に添付します:Flink学習コースシリーズ^u^
注意してください。いいですね。気をつけてください。道に迷わないようにしてください。
ブロガーは全ての知識点が正しいとは保証できませんが、手打ちが保証されますので、ミスも指摘してください。ω・)ノ
:
, ID, ID,
1582703711000,u1,p1,5
1582703712000,u1,p1,5
1582703712000,u2,p1,3
1582703713000,u1,p1,5
1582703719999,u2,p1,3
1582703729999,u1,p1,5
1.スクロールウィンドウの実現/**
* TODO Table API (10s )
*
* @author liuzebiao
* @Date 2020-2-26 15:37
*/
public class TumblingEventTimeWindowsTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Row> rowDataStream = streamSource.map(new MapFunction<String, Row>() {
@Override
public Row map(String line) throws Exception {
String[] fields = line.split(",");
Long time = Long.parseLong(fields[0]);
String uid = fields[1];
String pid = fields[2];
Double money = Double.parseDouble(fields[3]);
return Row.of(time, uid, pid, money);
}
}).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.DOUBLE));
SingleOutputStreamOperator<Row> waterMarksRow = rowDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
@Override
public long extractTimestamp(Row row) {
return (long) row.getField(0);
}
});
tableEnv.registerDataStream("t_orders", waterMarksRow, "atime,uid,pid,money,rowtime.rowtime");
/******Flink Table API start ******/
Table table = tableEnv.scan("t_orders")
.window(Tumble.over("10.seconds").on("rowtime").as("win"))
.groupBy("uid,win")
.select("uid,win.start,win.end,win.rowtime,money.sum as total");
/******Flink Table API end ******/
/******Flink SQL start ******/
String sql = "select uid,sum(money),tumble_start(rowtime,interval '10' SECOND) as wstart,TUMBLE_END(rowtime,interval '10' second) as wend" +
" from t_orders group by tumble(rowtime,interval '10' second),uid";
Table table = tableEnv.sqlQuery(sql);
/******Flink SQL end ******/
DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);
dataStream.print();
env.execute("TumblingEventTimeWindowsTable");
}
}
:
u1,2020-02-26 07:55:10.0,2020-02-26 07:55:20.0,2020-02-26 07:55:19.999,15.0
u2,2020-02-26 07:55:10.0,2020-02-26 07:55:20.0,2020-02-26 07:55:19.999,6.0
2.スライドウィンドウを実現する/**
* TODO Table API ( 10s, 2s)
*
* @author liuzebiao
* @Date 2020-2-26 15:37
*/
public class SlidingEventTimeWindowsTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Row> rowDataStream = streamSource.map(new MapFunction<String, Row>() {
@Override
public Row map(String line) throws Exception {
String[] fields = line.split(",");
Long time = Long.parseLong(fields[0]);
String uid = fields[1];
String pid = fields[2];
Double money = Double.parseDouble(fields[3]);
return Row.of(time, uid, pid, money);
}
}).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.DOUBLE));
SingleOutputStreamOperator<Row> waterMarksRow = rowDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
@Override
public long extractTimestamp(Row row) {
return (long) row.getField(0);
}
});
tableEnv.registerDataStream("t_orders", waterMarksRow, "atime,uid,pid,money,rowtime.rowtime");
/******Flink Table API start ******/
Table table = tableEnv.scan("t_orders")
.window(Slide.over("10.seconds").every("2.seconds").on("rowtime").as("win"))
.groupBy("uid,win")
.select("uid,win.start,win.end,win.rowtime,money.sum as total");
/******Flink Table API end ******/
/******Flink SQL start ******/
// String sql = "select uid,sum(money),hop_end(rowtime,interval '2' SECOND,interval '10' second) as widEnd" +
// " from t_orders group by hop(rowtime,interval '2' second,interval '10' second),uid";
//
// Table table = tableEnv.sqlQuery(sql);
/******Flink SQL end ******/
DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);
dataStream.print();
env.execute("TumblingEventTimeWindowsTable");
}
}
ブロガーは文章を書くのが容易ではないので、注目してみましょう。注意してください。いいですね。気をつけてください。道に迷わないようにしてください。
ブロガーは全ての知識点が正しいとは保証できませんが、手打ちが保証されますので、ミスも指摘してください。ω・)ノ