Flink Table&SQL双流Join
29364 ワード
ここでは、Flink Table&SQLの双流Joinをまとめます。 Reglar Join Interval Join Window Join Reglar Join
Reglar Join:レギュラーJoin。
使用時には以下の点に注意してください。デフォルトでは、2つのストリームの入力をすべてStateに保存する必要があります。制限状態の無限成長のために、空き状態保持時間を は現在、等値接続のみをサポートしています。 Outer Joinは コードの例
Interval Join:インターバル接続。シーン例:1つのストリームJoinの別のストリームの相対時間内のデータ。
使用時には以下の点に注意してください。は、Process TimeおよびEvent Timeをサポートする。 は、時間範囲を超えたデータを自動的に消去し、Stateが大きすぎないようにします。 は、非等値接続をサポートする。 Flink DataStreamは、Interval Joinに基づいてリアルタイムでJoinの過去の期間のデータ に基づく。
コードの例
Window Join:2つのストリームは同じKeyで、同じウィンドウ内のデータはJoinとします。
使用時には以下の点に注意してください。はまだFink Data StreamにおけるWindow Join( Fink Data StreamにおけるWindow Join参照:A.Flank Data Stream Window Join:Inner Join、Left Join、Right Join B.Flak Data Stream Window Joinは、例 との間で実現されます。
Reglar Join:レギュラーJoin。
使用時には以下の点に注意してください。
Query Configuration
で設定することができる。ADD/RETRACT
メッセージを生成する。出力に注意します。package com.bigdata.flink.streamJoin.regularJoin;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Author: Wang Pei
* Summary:
* Regular Join
*/
public class RegularJoin {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// 1
// {"userID":"user_1","eventType":"browse","eventTime":"2015-01-01 00:00:00"}
String kafkaBrowse = ""
+ "CREATE TABLE kafka_browse_log "
+ "( "
+ " userID STRING, "
+ " eventType STRING, "
+ " eventTime STRING, "
+ " proctime as PROCTIME() "
+ ") WITH ( "
+ " 'connector.type' = 'kafka', "
+ " 'connector.version' = '0.10', "
+ " 'connector.properties.bootstrap.servers' = 'kafka01:9092', "
+ " 'connector.properties.zookeeper.connect' = 'kafka01:2181', "
+ " 'connector.topic' = 'topic_1', "
+ " 'connector.properties.group.id' = 'c1', "
+ " 'connector.startup-mode' = 'latest-offset', "
+ " 'format.type' = 'json' "
+ ")";
tableEnv.sqlUpdate(kafkaBrowse);
tableEnv.toAppendStream(tableEnv.from("kafka_browse_log"), Row.class).print();
// 2
// {"userID":"user_1","userName":"name1","userAge":10,"userAddress":"Mars"}
String kafkaUser = ""
+ "CREATE TABLE kafka_user_change_log "
+ "( "
+ " userID STRING, "
+ " userName STRING, "
+ " userAge INT, "
+ " userAddress STRING "
+ ") WITH ( "
+ " 'connector.type' = 'kafka', "
+ " 'connector.version' = '0.10', "
+ " 'connector.properties.bootstrap.servers' = 'kafka01:9092', "
+ " 'connector.properties.zookeeper.connect' = 'kafka01:2181', "
+ " 'connector.topic' = 'topic_2', "
+ " 'connector.properties.group.id' = 'c1', "
+ " 'connector.startup-mode' = 'latest-offset', "
+ " 'format.type' = 'json' "
+ ")";
tableEnv.sqlUpdate(kafkaUser);
tableEnv.toAppendStream(tableEnv.from("kafka_user_change_log"), Row.class).print();
// INNER JOIN
//String execSQL = ""
// + "SELECT * "
// + "FROM kafka_browse_log "
// + "INNER JOIN kafka_user_change_log "
// + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
//tableEnv.toAppendStream(tableEnv.sqlQuery(execSQL), Row.class).print();
// LEFT JOIN
//String execSQL = ""
// + "SELECT * "
// + "FROM kafka_browse_log "
// + "LEFT JOIN kafka_user_change_log "
// + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
//tableEnv.toRetractStream(tableEnv.sqlQuery(execSQL), Row.class).print();
// RIGHT JOIN
//String execSQL = ""
// + "SELECT * "
// + "FROM kafka_browse_log "
// + "RIGHT JOIN kafka_user_change_log "
// + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
//tableEnv.toRetractStream(tableEnv.sqlQuery(execSQL), Row.class).print();
// FULL JOIN
String execSQL = ""
+ "SELECT * "
+ "FROM kafka_browse_log "
+ "FULL JOIN kafka_user_change_log "
+ "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
tableEnv.toRetractStream(tableEnv.sqlQuery(execSQL), Row.class).print();
tableEnv.execute(RegularJoin.class.getSimpleName());
}
}
Interval JoinInterval Join:インターバル接続。シーン例:1つのストリームJoinの別のストリームの相対時間内のデータ。
使用時には以下の点に注意してください。
RetractStream
、>=
、>
、<=
、<
、BETWEEN ... AND ...
。コードの例
package com.bigdata.flink.streamJoin.regularJoin;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Author: Wang Pei
* Summary:
* Interval Join
*/
public class IntervalJoin {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// 1
// {"userID":"user_1","eventType":"browse","eventTime":"2015-01-01 00:00:00"}
String kafkaBrowse = ""
+ "CREATE TABLE kafka_browse_log "
+ "( "
+ " userID STRING, "
+ " eventType STRING, "
+ " eventTime STRING, "
+ " proctime as PROCTIME() "
+ ") WITH ( "
+ " 'connector.type' = 'kafka', "
+ " 'connector.version' = '0.10', "
+ " 'connector.properties.bootstrap.servers' = 'kafka01:9092', "
+ " 'connector.properties.zookeeper.connect' = 'kafka01:2181', "
+ " 'connector.topic' = 'topic_1', "
+ " 'connector.properties.group.id' = 'c1', "
+ " 'connector.startup-mode' = 'latest-offset', "
+ " 'format.type' = 'json' "
+ ")";
tableEnv.sqlUpdate(kafkaBrowse);
tableEnv.toAppendStream(tableEnv.from("kafka_browse_log"), Row.class).print();
// 2
// {"userID":"user_1","userName":"name1","userAge":10,"userAddress":"Mars"}
String kafkaUser = ""
+ "CREATE TABLE kafka_user_change_log "
+ "( "
+ " userID STRING, "
+ " userName STRING, "
+ " userAge INT, "
+ " userAddress STRING, "
+ " proctime as PROCTIME() "
+ ") WITH ( "
+ " 'connector.type' = 'kafka', "
+ " 'connector.version' = '0.10', "
+ " 'connector.properties.bootstrap.servers' = 'kafka01:9092', "
+ " 'connector.properties.zookeeper.connect' = 'kafka01:2181', "
+ " 'connector.topic' = 'topic_2', "
+ " 'connector.properties.group.id' = 'c1', "
+ " 'connector.startup-mode' = 'latest-offset', "
+ " 'format.type' = 'json' "
+ ")";
tableEnv.sqlUpdate(kafkaUser);
tableEnv.toAppendStream(tableEnv.from("kafka_user_change_log"), Row.class).print();
// Interval Join
String execSQL = ""
+ "SELECT * "
+ "FROM kafka_browse_log browse, "
+ " kafka_user_change_log `user` "
+ "WHERE "
+ " browse.userID = `user`.userID "
+ " AND `user`.proctime BETWEEN browse.proctime - INTERVAL '30' SECOND AND browse.proctime";
tableEnv.toAppendStream(tableEnv.sqlQuery(execSQL), Row.class).print();
tableEnv.execute(IntervalJoin.class.getSimpleName());
}
}
Window JoinWindow Join:2つのストリームは同じKeyで、同じウィンドウ内のデータはJoinとします。
使用時には以下の点に注意してください。
Tumbling Window Join
/Sliding Window Join
/Session Window Join
)の良いSQL化表現を見つけていません。