Flink Table APIとSQLの分析と使用(一)
57509 ワード
Flinkは、標準的なストリーム処理とバッチ処理の2つのリレーショナルAPI:Table APIとSQLを提供します.Table APIはselect、filter、joinなどの操作を直接行うことができる.Flink SQLはApache Calciteに基づいて標準的なSQLを実現し、SQL言語と一致し、ほとんどの開発者に適しています.
Flink Table APIとSQLはFlink-Table依存にバンドルされており、使用する場合は依存を追加する:Flink 1.7.2を例に
Table APIとSQLの基本的な使用
一、まずTableEnvironmentを作成する必要があります.TableEnvironmentでは、次の機能を実現できます.内部ディレクトリからテーブル を作成する.外部ディレクトリからテーブル を作成する. sqlクエリ を実行登録ユーザカスタムFuction DataStreamとDataSetをTable に変換する.
ストリームデータ照会
バッチ・データ・クエリー
二、取得したTable EnvironmentオブジェクトによってTableオブジェクトを作成する.2種類のTableオブジェクトがある:入力Table(Table Source)と出力Table(Table Sink)
TableSource
TableSink
三、Table APIとSQL操作の使用
SQL
Table API
四、DataStream、DataSetとTable間の変換
Table->DataStream
Table->DataSet
DataStream->Table
完全なコード
pomファイル
Flink Table APIとSQLはFlink-Table依存にバンドルされており、使用する場合は依存を追加する:Flink 1.7.2を例に
<!--java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!--scala-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Table APIとSQLの基本的な使用
一、まずTableEnvironmentを作成する必要があります.TableEnvironmentでは、次の機能を実現できます.
ストリームデータ照会
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
バッチ・データ・クエリー
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
二、取得したTable EnvironmentオブジェクトによってTableオブジェクトを作成する.2種類のTableオブジェクトがある:入力Table(Table Source)と出力Table(Table Sink)
TableSource
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
//CsvTableSource: 、 、
TableSource csvSource = new CsvTableSource("path",new String[]{
"name","age"},new TypeInformation[]{
Types.STRING,Types.INT});
// TableSource, CsvTable
tableEnv.registerTableSource("CsvTable", csvSource);
TableSink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// TableSink
// TableSink
TableSink csvSink = new CsvTableSink("path"," ,");
//
String[] fieldNames = {
"cid", "cname", "revsum"};
TypeInformation[] filedTypes = {
Types.INT, Types.STRING, Types.INT};
// TableSink
tableEnv.registerTableSink("CsvSinkTable", fieldNames, filedTypes, csvSink);
三、Table APIとSQL操作の使用
SQL
// SQL table
//
Table revenue = tableEnv.sqlQuery("select cid, cname, sum(revenue) as revsum " +
"from orders" +
"where country = 'france'" +
"group by cid,cname");
Table API
Table orders = tableEnv.scan("orders");
Table revenue = orders.filter("count == 'france'").groupBy("cid, cname").select("cid, cname, revenue.sum as revSum");
四、DataStream、DataSetとTable間の変換
Table->DataStream
// Table DataStream
DataStream<Row> dsRow = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream( , Row.class);
// Table DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
DataStream<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(revenue, tupleType);
Table->DataSet
// Table DataStream
DataSet<Row> dsRow = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet( , Row.class);
// Table DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
DataSet<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(revenue, tupleType);
DataStream->Table
// DataStream Table
DataStream<Tuple2<String,String>> stream = ...;
((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).registerDataStream("mytable", stream);
完全なコード
package com.basic;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
/**
* FlinkTable
*/
public class FlinkTableJobStream {
public static void main(String[] args) {
//1. TableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// TableSource
//CsvTableSource: 、 、
TableSource csvSource = new CsvTableSource("path",new String[]{
"name","age"},new TypeInformation[]{
Types.STRING,Types.INT});
// TableSource, CsvTable
tableEnv.registerTableSource("CsvTable", csvSource);
// SQL table
//
Table revenue = tableEnv.sqlQuery("select cid, cname, sum(revenue) as revsum " +
"from orders" +
"where country = 'france'" +
"group by cid,cname");
// Table DataStream
DataStream<Row> dsRow = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(revenue, Row.class);
// Table DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
DataStream<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.StreamTableEnvironment) tableEnv).toAppendStream(revenue, tupleType);
// TableSink
// TableSink
TableSink csvSink = new CsvTableSink("path"," ,");
//
String[] fieldNames = {
"cid", "cname", "revsum"};
TypeInformation[] filedTypes = {
Types.INT, Types.STRING, Types.INT};
// TableSink
tableEnv.registerTableSink("CsvSinkTable", fieldNames, filedTypes, csvSink);
// TableSink
revenue.insertInto("CsvSinkTable");
}
}
package com.basic;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
/**
* FlinkTable
*/
public class FlinkTableBatch {
public static void main(String[] args) {
//1. TableEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
// SQL table
//
Table revenue = tableEnv.sqlQuery("select cid, cname, sum(revenue) as revsum " +
"from orders" +
"where country = 'france'" +
"group by cid,cname");
// Table DataStream
DataSet<Row> dsRow = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(revenue, Row.class);
// Table DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING, Types.INT);
DataSet<Tuple2<String, Integer>> dsTuple = ((org.apache.flink.table.api.java.BatchTableEnvironment) tableEnv).toDataSet(revenue, tupleType);
// TableSink
// TableSink
TableSink csvSink = new CsvTableSink("path"," ,");
//
String[] fieldNames = {
"cid", "cname", "revsum"};
TypeInformation[] filedTypes = {
Types.INT, Types.STRING, Types.INT};
// TableSink
tableEnv.registerTableSink("CsvSinkTable", fieldNames, filedTypes, csvSink);
// TableSink
revenue.insertInto("CsvSinkTable");
}
}
pomファイル
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!--flink table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>