Structured-Streaming学習二-WordCount-kafka

4690 ワード

structured streamingがkafkaとどのように結合しているかを分析します.ローカルに単機のkafkaをインストールしてテストすることができます.テスト中はproducerを開き、wordcount単語を1行1行入力してください.後でproducerのコードを書く方法を紹介します.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */     

      

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaStructuredKafkaWordCount   
 *    The Kafka "bootstrap.servers" configuration. A
 *   comma-separated list of host:port.
 *    There are three kinds of type, i.e. 'assign', 'subscribe',
 *   'subscribePattern'.
 *   |-  Specific TopicPartitions to consume. Json string
 *   |  {"topicA":[0,1],"topicB":[2,4]}.
 *   |-  The topic list to subscribe. A comma-separated list of
 *   |  topics.
 *   |-  The pattern used to subscribe to topic(s).
 *   |  Java regex string.
 *   |- Only one of "assign, "subscribe" or "subscribePattern" options can be
 *   |  specified for Kafka source.
 *    Different value format depends on the value of 'subscribe-type'.
 *
 * Example:
 *    `$ bin/run-example \
 *      sql.streaming.JavaStructuredKafkaWordCount host1:port1,host2:port2 \
 *      subscribe topic1,topic2`
 */
public final class WC_kafka{

    public static void main(String[] args) throws Exception {
        args = (String[]) Arrays.asList("host:port","subscribe","topic50").toArray();
        if (args.length < 3) {
            System.err.println("Usage: JavaStructuredKafkaWordCount  " +
                    " ");
            System.exit(1);
        }

        String bootstrapServers = args[0];
        String subscribeType = args[1];
        String topics = args[2];


        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("JavaStructuredKafkaWordCount")
                .getOrCreate();

        // Create DataSet representing the stream of input lines from kafka
        Dataset lines = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers)
                .option(subscribeType, topics)
                .option("startingOffsets","earliest")   // [Structured Streaming + Kafka](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
                .load()
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING());

        // Generate running word count
        Dataset wordCounts = lines.flatMap(
                (FlatMapFunction) x -> Arrays.asList(x.split(" ")).iterator(),
                Encoders.STRING()).groupBy("value").count();
        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                //.format("kafka")
                //.option("kafka.bootstrap.servers", bootstrapServers)
                //.option(subscribeType, topics)
                .outputMode("complete")
                .format("console")
                .option("numRows",100)   // [Structured-Streaming Programming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes)
// console Sink
                .start();

        query.awaitTermination();
    }
}


Structured Streaming + Kafka Structured-Streaming Programming