SparkStreaming_Kafka_Redis統合

7775 ワード

1.kafka streamingとredisを統合して語周波数統計を実現する


   Producer.class生成データdaokafka
package day14;

/**
 *              key     
 *                  redis
 */
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;

public class GenerateWord {
    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");//kafka brokers  
        //key value      ,               
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //             
        KafkaProducer kafkaProducer = new KafkaProducer<>(props);
        while(true){
            Thread.sleep(500);
            String key = UUID.randomUUID().toString();
            int value = new Random().nextInt(26) + 97;
            char word = (char)value;
            ProducerRecord record = new ProducerRecord("wordcount",key,String.valueOf(word));
            kafkaProducer.send(record);
            System.out.println("record = " + record);

        }

    }
}
MyNetWordCountRedis.scala
package day14


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.JPools


/**
  *  kafka  streaming   redis         
  */
object MyNetWordCountRedis {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //  SparkStreaming   
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
    conf.set("spark.streaming.kafka.maxRatePerPartition","5")   // kafka       (5)*(    )*(      )
    conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")     //       
    val ssc = new StreamingContext(conf,Seconds(2))

    //       
    val groupId = "day14_001"
    //      
    val topic = "wordcount"

    /**
      * kafka    
      */
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //   kafka   
    val stream = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,          //    (   Executor       )
      ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParams))

    //   Master   Driver
    stream.foreachRDD(rdd => {
      val reduced = rdd.map(x => (x.value(),1)).reduceByKey(_+_)
      reduced.foreachPartition(rdd => {
        //  redis
        val redis = JPools.getJedis

        rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})

        redis.close()
      })
    })

    ssc.start()

    ssc.awaitTermination()

  }

}

Jpools.scala(Redis接続プール)
package utils

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

/**
  *      redis   
  */
object JPools {

  private val poolConfi = new GenericObjectPoolConfig()
  poolConfi.setMaxIdle(5)   //                     ,   8
  poolConfi.setMaxTotal(2000)   //             8

  //                
  private lazy val jedisPool = new JedisPool(poolConfi,"hadoop02")

  def getJedis={
    val jedis = jedisPool.getResource
    jedis.select(0)
    jedis
  }
}

2.Kafka OffsetをRedisで管理する


 
JedisOffSet.scala(  Redis    groupId Offset)
package day14

import java.util

import org.apache.kafka.common.TopicPartition
import utils.JPools

object JedisOffSet {

  def apply(groupid: String)={
    var fromDbOffset = Map[TopicPartition, Long]()
    val jedis = JPools.getJedis

    val topicPartitionOffset: util.Map[String, String] = jedis.hgetAll(groupid)
    import scala.collection.JavaConversions._
    val topicPartitionOffsetList: List[(String, String)] = topicPartitionOffset.toList

    for (topicPL  topicPL._2.toLong)
    }
    fromDbOffset
  }
}
Streaming_Kafka_Redis_Offset.scala
package day14

import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import utils.JPools


/**
  *   streaming, kafka, redis      wordcount
  *    redis        
  */
object Streaming_Kafka_Redis_Offset {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //  SparkStreaming   
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
    conf.set("spark.streaming.kafka.maxRatePerPartition","5")   // kafka       (5)*(    )*(      )
    conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")     //       
    val ssc = new StreamingContext(conf,Seconds(2))

    //       
    val groupId = "day14_001"
    //      
    val topic = "wordcount"

    /**
      * kafka    
      */
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val formOffset: Map[TopicPartition, Long] = JedisOffSet(groupId)

    //   kafka   
    val stream = if(formOffset.size == 0) {
      KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams))
    } else {
      KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Assign[String, String](formOffset.keys, kafkaParams, formOffset))
    }
    //   Master   Driver
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      val reduced = rdd.map(x => (x.value(),1)).reduceByKey(_+_)

      reduced.foreachPartition(rdd => {
        //  redis
        val redis = JPools.getJedis

        rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})

        redis.close()
      })

      //      redis
      val jedis = JPools.getJedis
      for(o