Spark Streaming kafkaへデータを書くDemo


I’d suggest the following approach:
  • Use (and re-use) one KafkaProducer instance per executor process/JVM.

  • Here’s the high-level setup for this approach:
  • First, you must “wrap” Kafka’s KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to “ship” it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don’t need to worry about KafkaProducer not being serializable.
  • You “ship” the wrapped producer to each executor by using a broadcast variable.
  • Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.

  • The code snippets below work with Spark Streaming as of Spark 2.0.
    Step 1: Wrapping KafkaProducer
    import java.util.concurrent.Future
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
    
    class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
    
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
    
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
    
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    
    }
    
    object MySparkKafkaProducer {
    
      import scala.collection.JavaConversions._
    
      def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
    
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
    
          producer
        }
        new MySparkKafkaProducer(createProducerFunc)
      }
    
      def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
    
    }
    

    Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducerinstance
    import org.apache.kafka.clients.producer.ProducerConfig
    
    val ssc: StreamingContext = {
      val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
      new StreamingContext(sparkConf, Seconds(1))
    }
    
    ssc.checkpoint("checkpoint-directory")
    
    val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", "broker1:9092")
        p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
    }
    

    Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducerinstance (for each executor)
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.RecordMetadata
    
    val stream: DStream[String] = ???
    stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
          kafkaProducer.value.send("my-output-topic", record)
        }.toStream
        metadata.foreach { metadata => metadata.get() }
      }
    }
    

    Hope this helps.