【Flink 1.11新機能】DataGenとprintの便利なテスト
2670 ワード
Datagen connectorによるデータ生成
print connectorで結果をコンソールに印刷し、結果テストを簡単に表示できます。
package com.otis.scala.test
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
object DataGenTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tableEnv = StreamTableEnvironment.create(env, settings)
//
val dataGenStr = " CREATE TABLE source_table (" +
" id INT," +
" score INT," +
" address STRING," +
" ts AS localtimestamp," +
" WATERMARK FOR ts AS ts" +
" ) WITH (" +
"'connector' = 'datagen'," +
"'rows-per-second'='5'," +
"'fields.id.kind'='sequence'," +
"'fields.id.start'='1'," +
"'fields.id.end'='100'," +
"'fields.score.min'='1'," +
"'fields.score.max'='100'," +
"'fields.address.length'='10'" +
")"
//print table
val print_table= " CREATE TABLE print_table (" +
" id INT," +
" score INT," +
" address STRING" +
" ) WITH (" +
" 'connector' = 'print'" +
" )"
tableEnv.executeSql(dataGenStr)
tableEnv.executeSql(print_table)
tableEnv.executeSql("insert into print_table select id,score,address from source_table")
}
}
コンソール
+I(1,98,9c0ecff134)
+I(2,28,2413fe8222)
+I(3,28,94eb800307)
+I(4,72,1912d2d787)
+I(5,66,b174af3c91)
+I(6,47,798819640e)