Flink Table&SQL双流Join

29364 ワード

ここでは、Flink Table&SQLの双流Joinをまとめます。
  • Reglar Join
  • Interval Join
  • Window Join
  • Reglar Join
    Reglar Join:レギュラーJoin。
    使用時には以下の点に注意してください。
  • デフォルトでは、2つのストリームの入力をすべてStateに保存する必要があります。制限状態の無限成長のために、空き状態保持時間をQuery Configurationで設定することができる。
  • は現在、等値接続のみをサポートしています。
  • Outer JoinはADD/RETRACTメッセージを生成する。出力に注意します。
  • コードの例
    package com.bigdata.flink.streamJoin.regularJoin;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    /**
     * Author: Wang Pei
     * Summary:
     * Regular Join
     */
    public class RegularJoin {
        public static void main(String[] args) throws Exception {
    
            EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
    
            //  1
            //     {"userID":"user_1","eventType":"browse","eventTime":"2015-01-01 00:00:00"}
            String kafkaBrowse = ""
                    + "CREATE TABLE kafka_browse_log "
                    + "( "
                    + "    userID STRING, "
                    + "    eventType STRING, "
                    + "    eventTime STRING, "
                    + "    proctime as PROCTIME() "
                    + ") WITH ( "
                    + "    'connector.type' = 'kafka', "
                    + "    'connector.version' = '0.10', "
                    + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                    + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                    + "    'connector.topic' = 'topic_1', "
                    + "    'connector.properties.group.id' = 'c1', "
                    + "    'connector.startup-mode' = 'latest-offset', "
                    + "    'format.type' = 'json' "
                    + ")";
            tableEnv.sqlUpdate(kafkaBrowse);
            tableEnv.toAppendStream(tableEnv.from("kafka_browse_log"), Row.class).print();
    
            //  2
            //           {"userID":"user_1","userName":"name1","userAge":10,"userAddress":"Mars"}
            String kafkaUser = ""
                    + "CREATE TABLE kafka_user_change_log "
                    + "( "
                    + "    userID STRING, "
                    + "    userName STRING, "
                    + "    userAge INT, "
                    + "    userAddress STRING "
                    + ") WITH ( "
                    + "    'connector.type' = 'kafka', "
                    + "    'connector.version' = '0.10', "
                    + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                    + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                    + "    'connector.topic' = 'topic_2', "
                    + "    'connector.properties.group.id' = 'c1', "
                    + "    'connector.startup-mode' = 'latest-offset', "
                    + "    'format.type' = 'json' "
                    + ")";
            tableEnv.sqlUpdate(kafkaUser);
            tableEnv.toAppendStream(tableEnv.from("kafka_user_change_log"), Row.class).print();
    
            // INNER JOIN
            //String execSQL = ""
            //        + "SELECT * "
            //        + "FROM kafka_browse_log "
            //        + "INNER JOIN kafka_user_change_log "
            //        + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
            //tableEnv.toAppendStream(tableEnv.sqlQuery(execSQL), Row.class).print();
    
            // LEFT JOIN
            //String execSQL = ""
            //        + "SELECT * "
            //        + "FROM kafka_browse_log "
            //        + "LEFT JOIN kafka_user_change_log "
            //        + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
            //tableEnv.toRetractStream(tableEnv.sqlQuery(execSQL), Row.class).print();
    
            // RIGHT JOIN
            //String execSQL = ""
            //        + "SELECT * "
            //        + "FROM kafka_browse_log "
            //        + "RIGHT JOIN kafka_user_change_log "
            //        + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
            //tableEnv.toRetractStream(tableEnv.sqlQuery(execSQL), Row.class).print();
    
            // FULL JOIN
            String execSQL = ""
                    + "SELECT * "
                    + "FROM kafka_browse_log "
                    + "FULL JOIN kafka_user_change_log "
                    + "ON kafka_browse_log.userID = kafka_user_change_log.userID ";
            tableEnv.toRetractStream(tableEnv.sqlQuery(execSQL), Row.class).print();
    
            tableEnv.execute(RegularJoin.class.getSimpleName());
        }
    }
    
    Interval Join
    Interval Join:インターバル接続。シーン例:1つのストリームJoinの別のストリームの相対時間内のデータ。
    使用時には以下の点に注意してください。
  • は、Process TimeおよびEvent Timeをサポートする。
  • は、時間範囲を超えたデータを自動的に消去し、Stateが大きすぎないようにします。
  • は、非等値接続をサポートする。RetractStream>=><=<BETWEEN ... AND ...
  • Flink DataStreamは、Interval Joinに基づいてリアルタイムでJoinの過去の期間のデータ
  • に基づく。
    コードの例
    package com.bigdata.flink.streamJoin.regularJoin;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    /**
     * Author: Wang Pei
     * Summary:
     * Interval Join
     */
    public class IntervalJoin {
        public static void main(String[] args) throws Exception {
    
            EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
    
            //  1
            //     {"userID":"user_1","eventType":"browse","eventTime":"2015-01-01 00:00:00"}
            String kafkaBrowse = ""
                    + "CREATE TABLE kafka_browse_log "
                    + "( "
                    + "    userID STRING, "
                    + "    eventType STRING, "
                    + "    eventTime STRING, "
                    + "    proctime as PROCTIME() "
                    + ") WITH ( "
                    + "    'connector.type' = 'kafka', "
                    + "    'connector.version' = '0.10', "
                    + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                    + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                    + "    'connector.topic' = 'topic_1', "
                    + "    'connector.properties.group.id' = 'c1', "
                    + "    'connector.startup-mode' = 'latest-offset', "
                    + "    'format.type' = 'json' "
                    + ")";
            tableEnv.sqlUpdate(kafkaBrowse);
            tableEnv.toAppendStream(tableEnv.from("kafka_browse_log"), Row.class).print();
    
            //  2
            //           {"userID":"user_1","userName":"name1","userAge":10,"userAddress":"Mars"}
            String kafkaUser = ""
                    + "CREATE TABLE kafka_user_change_log "
                    + "( "
                    + "    userID STRING, "
                    + "    userName STRING, "
                    + "    userAge INT, "
                    + "    userAddress STRING, "
                    + "    proctime as PROCTIME() "
                    + ") WITH ( "
                    + "    'connector.type' = 'kafka', "
                    + "    'connector.version' = '0.10', "
                    + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                    + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                    + "    'connector.topic' = 'topic_2', "
                    + "    'connector.properties.group.id' = 'c1', "
                    + "    'connector.startup-mode' = 'latest-offset', "
                    + "    'format.type' = 'json' "
                    + ")";
            tableEnv.sqlUpdate(kafkaUser);
            tableEnv.toAppendStream(tableEnv.from("kafka_user_change_log"), Row.class).print();
    
            // Interval Join
            String execSQL = ""
                    + "SELECT * "
                    + "FROM kafka_browse_log browse, "
                    + "     kafka_user_change_log `user` "
                    + "WHERE  "
                    + "     browse.userID = `user`.userID "
                    + "     AND `user`.proctime BETWEEN browse.proctime - INTERVAL '30' SECOND AND browse.proctime";
            tableEnv.toAppendStream(tableEnv.sqlQuery(execSQL), Row.class).print();
    
            tableEnv.execute(IntervalJoin.class.getSimpleName());
        }
    }
    
    Window Join
    Window Join:2つのストリームは同じKeyで、同じウィンドウ内のデータはJoinとします。
    使用時には以下の点に注意してください。
  • はまだFink Data StreamにおけるWindow Join(Tumbling Window Join/Sliding Window Join/Session Window Join)の良いSQL化表現を見つけていません。
  • Fink Data StreamにおけるWindow Join参照:A.Flank Data Stream Window Join:Inner Join、Left Join、Right Join B.Flak Data Stream Window Joinは、例
  • との間で実現されます。