Flink 1.11 kafkaデータを読み出しhiveに書き込み、未完了で続行
4011 ワード
昨夜Flink 1.11が出ましたが、今回は変更が多く、hiveの部分だけに関心を持っています.
現在、hiveをコードで何時間も読み取り、公式サイトのドキュメントをインストールしてみたが、成功せず、ホットスポットをこすって記録した.
まず依存を貼りましょう.
注意:どうせいろいろな間違いを報告して、コミュニティを見てflink-clients.jarが必要だと言いました. インポート依存を手動でダウンロード
ここで実行して間違いを報告しないで、しかしAPIは変わって、どのように出力を印刷することを知らないで、使ってみます
エラーメッセージ
Exception in thread "main"java.lang.NoClassDefFoundError: org/apache/flink/optimizer/costs/CostEstimator
後でhiveのデータを直接クエリーしてkafkaを挿入してみます. 未完待機
現在、hiveをコードで何時間も読み取り、公式サイトのドキュメントをインストールしてみたが、成功せず、ホットスポットをこすって記録した.
まず依存を貼りましょう.
注意:どうせいろいろな間違いを報告して、コミュニティを見てflink-clients.jarが必要だと言いました. インポート依存を手動でダウンロード
org.apache.flink
flink-connector-kafka_2.11
${flink.version}
provided
org.apache.flink
flink-table-api-java
${flink.version}
provided
org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}
provided
org.apache.flink
flink-table-planner-blink_2.11
${flink.version}
provided
org.apache.flink
flink-table-planner_2.11
${flink.version}
provided
org.apache.hadoop
hadoop-common
2.6.0-cdh5.16.1
provided
org.apache.hadoop
hadoop-hdfs
2.6.0-cdh5.16.1
provided
org.apache.hadoop
hadoop-client
2.6.0-cdh5.16.1
provided
org.apache.flink
flink-connector-hive_2.11
${flink.version}
provided
org.apache.hive
hive-exec
1.1.0
provided
ここで実行して間違いを報告しないで、しかしAPIは変わって、どのように出力を印刷することを知らないで、使ってみます
table.execute().print();
エラーメッセージ
Exception in thread "main"java.lang.NoClassDefFoundError: org/apache/flink/optimizer/costs/CostEstimator
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// EnvironmentSettings Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// TableEnvironment
TableEnvironment tableEnv = TableEnvironment.create(bsSettings);
// StreamTableEnvironment tableEnv2 = StreamTableEnvironment.create(bsSettings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "G:\\xxxx\\Flink SQL "; // hive
String version = "1.1.0";
Catalog catalog = new HiveCatalog(name,defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", catalog);
tableEnv.useCatalog("myhive");
String createDbSql = "SELECT code ,total_emp FROM sample_07 ";
String[] strings = tableEnv.listTables();
for (int i = 0; i < strings.length; i++) {
System.out.println(strings[i]);
}
Table table = tableEnv.sqlQuery(createDbSql);
table.printSchema();
env.execute();
後でhiveのデータを直接クエリーしてkafkaを挿入してみます. 未完待機