Flink Kafka TumblingEventTimeWindows
8352 ワード
久しぶりにブログを书いて、更新します.このブログでは主にFlinkでのデュアルストリームjoinの操作を行います.
双流joinを話す前にwindowの定義を理解して、ブログをお勧めします.そしてwatermarkの定義を理解し、ブログをお勧めします.
私は2番目のブログを見た後、データソースをkafkaに変更しました.しかし、windowがトリガーされず、window情報が印刷されます.そしてFlinkの開発者メールボックスで質問の解答を見ました.メールボックスの回答から、なぜwindowがトリガーされなかったのかが明らかになった.One of the source parallelisms doesn't have data
傍受するtopicは10個のパーティションを与えているので、Flinkでは複数のパーティションが並列に使用されます.各パーティションから送信されるデータはwindowに入り、このときwindowをトリガーするwatermarkはパーティションの中で最も小さいwatermarkになります.問題は私がテストするたびに1つのパーティションにメッセージを送信するだけなので、windowをトリガーできません.kafkaパーティションを1に変更して問題を完璧に解決します.公式解釈
テストコード:
単一ストリームの問題が解決した後,二重ストリームjoinの解決を開始する.双流joinのEventimeWindowと単流は実は差が少ない.両方のストリームにEventTiemを指定し、遅延時間を指定します.このときwindowをトリガーするwatermarkは、2つのストリームの中で遅いwatermarkであることに注意してください.すなわち、Aストリームのwatermarkは10、Bストリームのwatermarkは20であり、ストリームjoinの場合、デフォルトのwatermarkは10である.お勧めに答えます.ここでのデュアルストリームjoinは左連であり,ウィンドウ内のすべてのデータを取り出し,論理的に必要なデータを下流に送る.
デュアルストリームコード:
ここまでで終わりです.
頑張れ、ピカチュウ.
双流joinを話す前にwindowの定義を理解して、ブログをお勧めします.そしてwatermarkの定義を理解し、ブログをお勧めします.
私は2番目のブログを見た後、データソースをkafkaに変更しました.しかし、windowがトリガーされず、window情報が印刷されます.そしてFlinkの開発者メールボックスで質問の解答を見ました.メールボックスの回答から、なぜwindowがトリガーされなかったのかが明らかになった.One of the source parallelisms doesn't have data
傍受するtopicは10個のパーティションを与えているので、Flinkでは複数のパーティションが並列に使用されます.各パーティションから送信されるデータはwindowに入り、このときwindowをトリガーするwatermarkはパーティションの中で最も小さいwatermarkになります.問題は私がテストするたびに1つのパーティションにメッセージを送信するだけなので、windowをトリガーできません.kafkaパーティションを1に変更して問題を完璧に解決します.公式解釈
テストコード:
public class WatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", Constants.KAFKA_BROKER);
properties.setProperty("group.id", "crm_stream_window");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
DataStream stream =
env.addSource(new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), properties));
DataStream> inputMap = stream.map(new MapFunction>() {
private static final long serialVersionUID = -8812094804806854937L;
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>(value.split("\\W+")[0], Long.valueOf(value.split("\\W+")[1]));
}
});
DataStream> watermark =
inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {
private static final long serialVersionUID = 8252616297345284790L;
Long currentMaxTimestamp = 0L;
Long maxOutOfOrderness = 10000L;// 10s
Watermark watermark = null;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return watermark;
}
@Override
public long extractTimestamp(Tuple2 element, long previousElementTimestamp) {
Long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
System.out.println("timestamp : " + element.f1 + "|" + format.format(element.f1) + " currentMaxTimestamp : " + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + " watermark : " + watermark.getTimestamp() + "|" + format.format(watermark.getTimestamp()));
return timestamp;
}
});
watermark.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new WindowFunction, String, Tuple, TimeWindow>() {
private static final long serialVersionUID = 7813420265419629362L;
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector out) throws Exception {
System.out.println(tuple);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
out.collect("window " + format.format(window.getStart()) + " window " + format.format(window.getEnd()));
}
}).print();
env.execute("window test");
}
}
単一ストリームの問題が解決した後,二重ストリームjoinの解決を開始する.双流joinのEventimeWindowと単流は実は差が少ない.両方のストリームにEventTiemを指定し、遅延時間を指定します.このときwindowをトリガーするwatermarkは、2つのストリームの中で遅いwatermarkであることに注意してください.すなわち、Aストリームのwatermarkは10、Bストリームのwatermarkは20であり、ストリームjoinの場合、デフォルトのwatermarkは10である.お勧めに答えます.ここでのデュアルストリームjoinは左連であり,ウィンドウ内のすべてのデータを取り出し,論理的に必要なデータを下流に送る.
デュアルストリームコード:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Kafka011JsonTableSource orderSource = new OrderStreamKafkaReader("test").getTableSource("crm_stream");
tableEnv.registerTableSource("orderDetail", orderSource);
Table orderDetailTable = tableEnv.sqlQuery("SELECT * FROM orderDetail ");
//getItemTable() kafka json
Table itemIdTable = new StreamJoinTest().getItemTable(orderDetailTable);
DataStream dataStream = tableEnv.toAppendStream(itemIdTable, OrderDetailInfo4Stat.class)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)) {
private static final long serialVersionUID = -4075075569327086395L;
@Override
public long extractTimestamp(OrderDetailInfo4Stat element) {
return element.getOrderLastModifyTime().getTime();
}
});
dataStream.print();
Kafka011JsonTableSource couponCodeSource = new CouponCodeStreamKafkaReader(Constants.COUPON_CODE_INFO).getTableSource("crm_stream");
tableEnv.registerTableSource("CouponCodeTable", couponCodeSource);
DataStream couponCodeStream = tableEnv.toAppendStream(tableEnv.sqlQuery("select * from CouponCodeTable"), CouponCodeInfo4Stat.class).
assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)) {
private static final long serialVersionUID = 8867904924628932955L;
@Override
public long extractTimestamp(CouponCodeInfo4Stat element) {
return element.getLastModifyTime().getTime();
}
});
couponCodeStream.print();
DataStream realDataStream = dataStream.coGroup(couponCodeStream)
.where((KeySelector) OrderDetailInfo4Stat::getOrderBatchId)
.equalTo((KeySelector) CouponCodeInfo4Stat::getOrderBatchId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction() {
private static final long serialVersionUID = -1240462675125711867L;
@Override
public void coGroup(Iterable first, Iterable second, Collector out) throws Exception {
for (OrderDetailInfo4Stat orderDetailInfo4Stat : first) {
for (CouponCodeInfo4Stat couponCodeInfo4Stat : second) {
if (orderDetailInfo4Stat.getOrderBatchId().equals(couponCodeInfo4Stat.getOrderBatchId())) {
orderDetailInfo4Stat.setOrderSalesmanId(couponCodeInfo4Stat.getReceiveUserId());
}
}
out.collect(orderDetailInfo4Stat);
}
}
});
realDataStream.print();
env.execute("stream join test");
}
ここまでで終わりです.
頑張れ、ピカチュウ.