Flink Kafka TumblingEventTimeWindows

8352 ワード

久しぶりにブログを书いて、更新します.このブログでは主にFlinkでのデュアルストリームjoinの操作を行います.
双流joinを話す前にwindowの定義を理解して、ブログをお勧めします.そしてwatermarkの定義を理解し、ブログをお勧めします.
私は2番目のブログを見た後、データソースをkafkaに変更しました.しかし、windowがトリガーされず、window情報が印刷されます.そしてFlinkの開発者メールボックスで質問の解答を見ました.メールボックスの回答から、なぜwindowがトリガーされなかったのかが明らかになった.One of the source parallelisms doesn't have data
傍受するtopicは10個のパーティションを与えているので、Flinkでは複数のパーティションが並列に使用されます.各パーティションから送信されるデータはwindowに入り、このときwindowをトリガーするwatermarkはパーティションの中で最も小さいwatermarkになります.問題は私がテストするたびに1つのパーティションにメッセージを送信するだけなので、windowをトリガーできません.kafkaパーティションを1に変更して問題を完璧に解決します.公式解釈
テストコード:
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", Constants.KAFKA_BROKER);
        properties.setProperty("group.id", "crm_stream_window");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        DataStream stream =
                env.addSource(new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), properties));
        DataStream> inputMap = stream.map(new MapFunction>() {
            private static final long serialVersionUID = -8812094804806854937L;

            @Override
            public Tuple2 map(String value) throws Exception {
                return new Tuple2<>(value.split("\\W+")[0], Long.valueOf(value.split("\\W+")[1]));
            }
        });
        DataStream> watermark =
                inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {

                    private static final long serialVersionUID = 8252616297345284790L;
                    Long currentMaxTimestamp = 0L;
                    Long maxOutOfOrderness = 10000L;// 10s
                    Watermark watermark = null;
                    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                        return watermark;
                    }

                    @Override
                    public long extractTimestamp(Tuple2 element, long previousElementTimestamp) {
                        Long timestamp = element.f1;
                        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                        System.out.println("timestamp : " + element.f1 + "|" + format.format(element.f1) + " currentMaxTimestamp : " + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + " watermark : " + watermark.getTimestamp() + "|" + format.format(watermark.getTimestamp()));
                        return timestamp;
                    }
                });
        watermark.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .apply(new WindowFunction, String, Tuple, TimeWindow>() {
                    private static final long serialVersionUID = 7813420265419629362L;

                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector out) throws Exception {
                        System.out.println(tuple);
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        out.collect("window  " + format.format(window.getStart()) + "   window  " + format.format(window.getEnd()));
                    }
                }).print();
        env.execute("window test");
    }
}

単一ストリームの問題が解決した後,二重ストリームjoinの解決を開始する.双流joinのEventimeWindowと単流は実は差が少ない.両方のストリームにEventTiemを指定し、遅延時間を指定します.このときwindowをトリガーするwatermarkは、2つのストリームの中で遅いwatermarkであることに注意してください.すなわち、Aストリームのwatermarkは10、Bストリームのwatermarkは20であり、ストリームjoinの場合、デフォルトのwatermarkは10である.お勧めに答えます.ここでのデュアルストリームjoinは左連であり,ウィンドウ内のすべてのデータを取り出し,論理的に必要なデータを下流に送る.
デュアルストリームコード:
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Kafka011JsonTableSource orderSource = new OrderStreamKafkaReader("test").getTableSource("crm_stream");
    tableEnv.registerTableSource("orderDetail", orderSource);
    Table orderDetailTable = tableEnv.sqlQuery("SELECT * FROM orderDetail ");
    //getItemTable()  kafka json 
    Table itemIdTable = new StreamJoinTest().getItemTable(orderDetailTable);
    DataStream dataStream = tableEnv.toAppendStream(itemIdTable, OrderDetailInfo4Stat.class)
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)) {
                private static final long serialVersionUID = -4075075569327086395L;

                @Override
                public long extractTimestamp(OrderDetailInfo4Stat element) {
                    return element.getOrderLastModifyTime().getTime();
                }
            });
    dataStream.print();
    Kafka011JsonTableSource couponCodeSource = new CouponCodeStreamKafkaReader(Constants.COUPON_CODE_INFO).getTableSource("crm_stream");
    tableEnv.registerTableSource("CouponCodeTable", couponCodeSource);
    DataStream couponCodeStream = tableEnv.toAppendStream(tableEnv.sqlQuery("select * from CouponCodeTable"), CouponCodeInfo4Stat.class).
            assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)) {
                private static final long serialVersionUID = 8867904924628932955L;

                @Override
                public long extractTimestamp(CouponCodeInfo4Stat element) {
                    return element.getLastModifyTime().getTime();
                }
            });
    couponCodeStream.print();
    DataStream realDataStream = dataStream.coGroup(couponCodeStream)
            .where((KeySelector) OrderDetailInfo4Stat::getOrderBatchId)
            .equalTo((KeySelector) CouponCodeInfo4Stat::getOrderBatchId)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .apply(new CoGroupFunction() {
                private static final long serialVersionUID = -1240462675125711867L;

                @Override
                public void coGroup(Iterable first, Iterable second, Collector out) throws Exception {
                    for (OrderDetailInfo4Stat orderDetailInfo4Stat : first) {
                        for (CouponCodeInfo4Stat couponCodeInfo4Stat : second) {
                            if (orderDetailInfo4Stat.getOrderBatchId().equals(couponCodeInfo4Stat.getOrderBatchId())) {
                                orderDetailInfo4Stat.setOrderSalesmanId(couponCodeInfo4Stat.getReceiveUserId());
                            }
                        }
                        out.collect(orderDetailInfo4Stat);
                    }
                }
            });
    realDataStream.print();
    env.execute("stream join test");
}

ここまでで終わりです.
頑張れ、ピカチュウ.