Flink 1.11 kafkaデータを読み出しhiveに書き込み、未完了で続行

4011 ワード

昨夜Flink 1.11が出ましたが、今回は変更が多く、hiveの部分だけに関心を持っています.
現在、hiveをコードで何時間も読み取り、公式サイトのドキュメントをインストールしてみたが、成功せず、ホットスポットをこすって記録した.
まず依存を貼りましょう.
注意:どうせいろいろな間違いを報告して、コミュニティを見てflink-clients.jarが必要だと言いました.  インポート依存を手動でダウンロード 
 

    
        org.apache.flink
        flink-connector-kafka_2.11
        ${flink.version}
        provided
    


    
        org.apache.flink
        flink-table-api-java
        ${flink.version}
        provided
    
    
        org.apache.flink
        flink-table-api-java-bridge_2.11
        ${flink.version}
        provided
    


    
        org.apache.flink
        flink-table-planner-blink_2.11
        ${flink.version}
        provided
    

    
        org.apache.flink
        flink-table-planner_2.11
        ${flink.version}
        provided
    

    
        org.apache.hadoop
        hadoop-common
        2.6.0-cdh5.16.1
        provided
    

    
        org.apache.hadoop
        hadoop-hdfs
        2.6.0-cdh5.16.1
        provided
    

    
        org.apache.hadoop
        hadoop-client
        2.6.0-cdh5.16.1
        provided
    

    
    
        org.apache.flink
        flink-connector-hive_2.11
        ${flink.version}
        provided
    

    
        org.apache.hive
        hive-exec
        1.1.0
        provided
    



 
ここで実行して間違いを報告しないで、しかしAPIは変わって、どのように出力を印刷することを知らないで、使ってみます
table.execute().print();

エラーメッセージ
Exception in thread "main"java.lang.NoClassDefFoundError: org/apache/flink/optimizer/costs/CostEstimator
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //  EnvironmentSettings    Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //  TableEnvironment
        TableEnvironment tableEnv = TableEnvironment.create(bsSettings);
//        StreamTableEnvironment tableEnv2 = StreamTableEnvironment.create(bsSettings);

        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "G:\\xxxx\\Flink SQL    "; // hive      
        String version = "1.1.0";

        Catalog catalog = new HiveCatalog(name,defaultDatabase, hiveConfDir, version);

        tableEnv.registerCatalog("myhive", catalog);
        tableEnv.useCatalog("myhive");
        String createDbSql = "SELECT code ,total_emp FROM  sample_07 ";


        String[] strings = tableEnv.listTables();
        for (int i = 0; i < strings.length; i++) {
            System.out.println(strings[i]);
        }


        Table table = tableEnv.sqlQuery(createDbSql);
        table.printSchema();
        env.execute();

 
後でhiveのデータを直接クエリーしてkafkaを挿入してみます. 未完待機