Spark Streaming編3:Spark Streamingはデータをkafkaに送り、重量削減を実現

87470 ワード

Spark Streaming編3:Spark Streamingはデータをkafkaに送り、重量削減を実現
余計なことは言わないで,直接干物に行く.
package com.iflytek.kafka

import java.util.Properties

import com.iflytek.kafkaManager.KafkaSink
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession, _}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
   :
   sparkstreaming 30s 
 */
object WC2Kafka2 {
  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(Level.ERROR)
    System.setProperty("HADOOP_USER_NAME","root")
    System.setProperty("user.name","root")
    val warehouse = "hdfs://cdh01:8020/user/hive/warehouse/carbon.store"
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("xx2")
      .master("local[2]")
      .config("spark.testing.memory", "471859200")
      .config("spark.steaming.kafka.maxRatePerPartition","10")
      .config("spark.sql.streaming.schemaInference", "true")
      .getOrCreate()
    val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(3))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "cdh01:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "xytest1101",
      "auto.offset.reset" -> "latest",//earliest latest
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("pd_ry_txjl")
    val stream: InputDStream[ConsumerRecord[String, String]] ={
      KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams))}
    val value: DStream[String] = stream.map(_.value())
    val kafkaProducerConfig = {
      val p = new Properties()
      p.setProperty("bootstrap.servers","cdh01:9092")
      p.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
      p.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
      p
    }
    val kafkaProducer: Broadcast[KafkaSink[String, String]] =
    {sparkSession.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}
    val windowWords: DStream[String] = value.window(Seconds(6), Seconds(6))
    val schema = {StructType(Seq(
      StructField("id",StringType,true),
      StructField("name",StringType,true),
      StructField("age",StringType,true),
      StructField("time",TimestampType,true)))}
    windowWords.foreachRDD(rdd=>{
      import org.apache.spark.sql.functions._
      import sparkSession.implicits._
      val rdd1: RDD[String] = rdd.cache()
      val ds: Dataset[String] = sparkSession.createDataset(rdd1)
      val frameAll: Dataset[Row] = {// 2 json
        ds.select(get_json_object(col("value").cast("string"), "$.body") as ("body"))
          .select(get_json_object(col("body"), "$.body") as ("body"))
          .select(from_json(col("body"), schema) as ("parsed_value"))
          .select(
            col("parsed_value").getItem("id") as ("id"),
            col("parsed_value").getItem("name") as ("name"),
            col("parsed_value").getItem("age") as ("age"),
            from_unixtime((col("parsed_value").getItem("time").cast("double")/1000), "yyyy-MM-dd HH:mm:ss").cast("TIMESTAMP") as ("time"))
          .filter("id is not null")
      }
      frameAll.createOrReplaceTempView("frameAll")
      sparkSession.sqlContext.cacheTable("frameAll")
      val distancted: Dataset[Row] = frameAll.dropDuplicates(Array("name","id"))
      distancted.createOrReplaceTempView("distancted")
      sparkSession.sqlContext.cacheTable("distancted")
      sendKafka(distancted,kafkaProducer)
      sparkSession.sqlContext.uncacheTable("frameAll")
      sparkSession.sqlContext.uncacheTable("distancted")
      rdd1.unpersist(true)
    })
    ssc.start()
    ssc.awaitTermination()
  }

// 2 json, kafka
  def sendKafka(distancted:DataFrame,kafkaProducer:Broadcast[KafkaSink[String, String]]):Unit={
    if(!distancted.rdd.isEmpty()){
      distancted.toJSON.withColumnRenamed("value", "body")
        .withColumn("skynet.priority", lit("4"))
        .toJSON.withColumnRenamed("value", "properties")
        .toJSON.withColumnRenamed("value", "body")
        .withColumn("skynet.msg.location", lit("LOCAL"))
        .withColumn("skynet.priority", lit("4"))
        .toJSON.withColumnRenamed("value", "properties")
        .toJSON.foreach(rdd=>{
        kafkaProducer.value.send("xytest",rdd.toString)
      })
    }
  }
}

KafkaSink汎用クラス
package com.iflytek.kafkaManager

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V])
  extends Serializable {
// 
  @transient
  lazy val producer = createProducer()
// 
  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))
}

object KafkaSink {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
     val createProducerFunc = () => {
//       kafka Producer
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
//         excutor jvm 
//        kafka producer  kafka
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}


pomは以下の通りである.
    <properties>
        <spark.version>2.3.2</spark.version>
        <scala.version>2.11.8</scala.version>
        <hbase.version>1.2.1</hbase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
            <scope>compile</scope>
        </dependency>
        
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.31</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.1.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.jolbox</groupId>
            <artifactId>bonecp</artifactId>
            <version>0.8.0.RELEASE</version>
            <scope>compile</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.13</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <scope>compile</scope>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.apache.hadoop</groupId>-->
            <!--<artifactId>hadoop-client</artifactId>-->
            <!--<version>2.7.2</version>-->
        <!--</dependency>-->
        <!--&lt;!&ndash;guava hadoop &ndash;&gt;-->
        <!--<dependency>-->
            <!--<groupId>com.google.guava</groupId>-->
            <!--<artifactId>guava</artifactId>-->
            <!--<version>18.0</version>-->
        <!--</dependency>-->

            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <scope>compile</scope>
            </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>2.11.8</scalaVersion>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerArgs>
                        <arg>-extdirs</arg>
                        <arg>${project.basedir}/lib</arg>
                    </compilerArgs>
                </configuration>
            </plugin>
        </plugins>
    </build>