Flink Table APIとSQLの分析と使用(一)

57509 ワード

Flinkは、標準的なストリーム処理とバッチ処理の2つのリレーショナルAPI:Table APIとSQLを提供します.Table APIはselect、filter、joinなどの操作を直接行うことができる.Flink SQLはApache Calciteに基づいて標準的なSQLを実現し、SQL言語と一致し、ほとんどの開発者に適しています.
Flink Table APIとSQLはFlink-Table依存にバンドルされており、使用する場合は依存を追加する:Flink 1.7.2を例に
		<!--java-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!--scala-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

Table APIとSQLの基本的な使用
一、まずTableEnvironmentを作成する必要があります.TableEnvironmentでは、次の機能を実現できます.
  • 内部ディレクトリからテーブル
  • を作成する.
  • 外部ディレクトリからテーブル
  • を作成する.
  • sqlクエリ
  • を実行
  • 登録ユーザカスタムFuction
  • DataStreamとDataSetをTable
  • に変換する.
    ストリームデータ照会
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    

    バッチ・データ・クエリー
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    

    二、取得したTable EnvironmentオブジェクトによってTableオブジェクトを作成する.2種類のTableオブジェクトがある:入力Table(Table Source)と出力Table(Table Sink)
    TableSource
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
            
            //CsvTableSource:     、   、    
            TableSource csvSource = new CsvTableSource("path",new String[]{
         "name","age"},new TypeInformation[]{
         Types.STRING,Types.INT});
            //    TableSource,  CsvTable
            tableEnv.registerTableSource("CsvTable", csvSource);
    

    TableSink
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
            
    		//  TableSink       
            //    TableSink
            TableSink csvSink = new CsvTableSink("path","        ,");
            //        
            String[] fieldNames = {
         "cid", "cname", "revsum"};
            TypeInformation[] filedTypes = {
         Types.INT, Types.STRING, Types.INT};
            //    TableSink
            tableEnv.registerTableSink("CsvSinkTable", fieldNames, filedTypes, csvSink);
    

    三、Table APIとSQL操作の使用
    SQL
            //  SQL  table
            //         
            Table revenue = tableEnv.sqlQuery("select cid, cname, sum(revenue) as revsum " +
                    "from orders" +
                    "where country = 'france'" +
                    "group by cid,cname");
    

    Table API
    		Table orders = tableEnv.scan("orders");
    		Table revenue = orders.filter("count == 'france'").groupBy("cid, cname").select("cid, cname, revenue.sum as revSum");
    

    四、DataStream、DataSetとTable間の変換
    Table->DataStream
            // Table       DataStream
            DataStream<Row> dsRow = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(   , Row.class);
    
            // Table       DataStream
            TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
            DataStream<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(revenue, tupleType);
    

    Table->DataSet
            // Table       DataStream
            DataSet<Row> dsRow = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(   , Row.class);
            // Table       DataStream
            TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
            DataSet<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(revenue, tupleType);
    

    DataStream->Table
            // DataStream   Table
            DataStream<Tuple2<String,String>> stream = ...;
            ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).registerDataStream("mytable", stream);
    

    完全なコード
    package com.basic;
    
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.StreamTableEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.sinks.CsvTableSink;
    import org.apache.flink.table.sinks.TableSink;
    import org.apache.flink.table.sources.CsvTableSource;
    import org.apache.flink.table.sources.TableSource;
    import org.apache.flink.types.Row;
    
    /**
     * FlinkTable         
     */
    public class FlinkTableJobStream {
         
        public static void main(String[] args) {
         
            //1.  TableEnvironment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
            //    TableSource
            //CsvTableSource:     、   、    
            TableSource csvSource = new CsvTableSource("path",new String[]{
         "name","age"},new TypeInformation[]{
         Types.STRING,Types.INT});
            //    TableSource,  CsvTable
            tableEnv.registerTableSource("CsvTable", csvSource);
    
            //  SQL  table
            //         
            Table revenue = tableEnv.sqlQuery("select cid, cname, sum(revenue) as revsum " +
                    "from orders" +
                    "where country = 'france'" +
                    "group by cid,cname");
    
            // Table       DataStream
            DataStream<Row> dsRow = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(revenue, Row.class);
    
            // Table       DataStream
            TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
            DataStream<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(revenue, tupleType);
    
            //  TableSink       
            //    TableSink
            TableSink csvSink = new CsvTableSink("path","        ,");
            //        
            String[] fieldNames = {
         "cid", "cname", "revsum"};
            TypeInformation[] filedTypes = {
         Types.INT, Types.STRING, Types.INT};
            //    TableSink
            tableEnv.registerTableSink("CsvSinkTable", fieldNames, filedTypes, csvSink);
    
            //      TableSink 
            revenue.insertInto("CsvSinkTable");
        }
    }
    
    
    package com.basic;
    
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    import org.apache.flink.table.api.BatchTableEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.sinks.CsvTableSink;
    import org.apache.flink.table.sinks.TableSink;
    import org.apache.flink.types.Row;
    
    /**
     * FlinkTable         
     */
    public class FlinkTableBatch {
         
        public static void main(String[] args) {
         
            //1.  TableEnvironment
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    
            //  SQL  table
            //         
            Table revenue = tableEnv.sqlQuery("select cid, cname, sum(revenue) as revsum " +
                    "from orders" +
                    "where country = 'france'" +
                    "group by cid,cname");
    
            // Table       DataStream
            DataSet<Row> dsRow = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(revenue, Row.class);
            // Table       DataStream
            TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
            DataSet<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(revenue, tupleType);
    
            //  TableSink       
            //    TableSink
            TableSink csvSink = new CsvTableSink("path","        ,");
            //        
            String[] fieldNames = {
         "cid", "cname", "revsum"};
            TypeInformation[] filedTypes = {
         Types.INT, Types.STRING, Types.INT};
            //    TableSink
            tableEnv.registerTableSink("CsvSinkTable", fieldNames, filedTypes, csvSink);
    
            //      TableSink 
            revenue.insertInto("CsvSinkTable");
        }
    }
    
    

    pomファイル
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <!--flink table   -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.7.2</version>
            </dependency>
        </dependencies>