Flink TableはStreamをMySQLデータベースに直接書き込みます
15446 ワード
Flink TableはStreamをMySQLデータベースに直接書き込みます
Flink Tableは、信頼性の高いデータベースにSinkデータを直接転送できる
Mavenのpomを追加xml依存
コードは次のとおりです.
注意:checkpointのtrueを設定し、checkpoint間隔時間を設定する必要があります.
Flink Tableは、信頼性の高いデータベースにSinkデータを直接転送できる
JDBCAppendTableSink
を提供しています.以下、MySQLを例に挙げます.Mavenのpomを追加xml依存
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jdbc_2.11artifactId>
<version>1.8.0version>
dependency>
コードは次のとおりです.
public class SqlSinkJdbcStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// checkpoint , jdbc
env.enableCheckpointing(5000L);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Schema schema = new Schema()
.field("userId", Types.STRING)
.field("name", Types.STRING)
.field("age", Types.STRING)
.field("sex", Types.STRING)
.field("createTime", Types.BIG_DEC)
.field("updateTime", Types.BIG_DEC);
tableEnv
.connect(
new Kafka().version("0.10").topic("user").property("bootstrap.servers", "localhost:9092")
)
.withSchema(schema)
.withFormat(new Json().deriveSchema())
.inAppendMode()
.registerTableSource("Users");
Table table = tableEnv.sqlQuery("select userId,name,age,sex,createTime,updateTime from Users");
DataStream<Row> result = tableEnv.toAppendStream(table, TypeInformation.of(Row.class));
result.print();
JDBCAppendTableSink sink = new JDBCAppendTableSinkBuilder()
.setDBUrl("jdbc:mysql://localhost:3306/test?useSSL=false")
.setDrivername("com.mysql.jdbc.Driver")
.setUsername("root")
.setPassword("root")
.setBatchSize(1000)
.setQuery("REPLACE INTO user(userId,name,age,sex,createTime,updateTime) values(?,?,?,?,?,?)")
.setParameterTypes(new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.BIG_DEC, Types.BIG_DEC})
.build();
tableEnv.registerTableSink("Result",
new String[]{"userId", "name", "age", "sex", "createTime", "updateTime"},
new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.BIG_DEC, Types.BIG_DEC},
sink);
tableEnv.insertInto(table, "Result", new StreamQueryConfig());
env.execute("SqlSinkJdbcStream");
}
}
// mysql
/*
CREATE TABLE `user` (
`userId` varchar(10) NOT NULL,
`name` varchar(10) DEFAULT NULL,
`age` varchar(3) DEFAULT NULL,
`sex` varchar(10) DEFAULT NULL,
`createTime` varchar(20) DEFAULT NULL,
`updateTime` varchar(20) DEFAULT NULL,
PRIMARY KEY (`userId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
*/
注意:checkpointのtrueを設定し、checkpoint間隔時間を設定する必要があります.