SparkStreaming_Kafka_Redis統合
7775 ワード
1.kafka streamingとredisを統合して語周波数統計を実現する
Producer.class生成データdaokafka
package day14;
/**
* key
* redis
*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
public class GenerateWord {
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");//kafka brokers
//key value ,
props.setProperty("key.serializer", StringSerializer.class.getName());
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//
KafkaProducer kafkaProducer = new KafkaProducer<>(props);
while(true){
Thread.sleep(500);
String key = UUID.randomUUID().toString();
int value = new Random().nextInt(26) + 97;
char word = (char)value;
ProducerRecord record = new ProducerRecord("wordcount",key,String.valueOf(word));
kafkaProducer.send(record);
System.out.println("record = " + record);
}
}
}
MyNetWordCountRedis.scala
package day14
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.JPools
/**
* kafka streaming redis
*/
object MyNetWordCountRedis {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
// SparkStreaming
val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
conf.set("spark.streaming.kafka.maxRatePerPartition","5") // kafka (5)*( )*( )
conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true") //
val ssc = new StreamingContext(conf,Seconds(2))
//
val groupId = "day14_001"
//
val topic = "wordcount"
/**
* kafka
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// kafka
val stream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent, // ( Executor )
ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParams))
// Master Driver
stream.foreachRDD(rdd => {
val reduced = rdd.map(x => (x.value(),1)).reduceByKey(_+_)
reduced.foreachPartition(rdd => {
// redis
val redis = JPools.getJedis
rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})
redis.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
Jpools.scala(Redis接続プール)
package utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
/**
* redis
*/
object JPools {
private val poolConfi = new GenericObjectPoolConfig()
poolConfi.setMaxIdle(5) // , 8
poolConfi.setMaxTotal(2000) // 8
//
private lazy val jedisPool = new JedisPool(poolConfi,"hadoop02")
def getJedis={
val jedis = jedisPool.getResource
jedis.select(0)
jedis
}
}
2.Kafka OffsetをRedisで管理する
JedisOffSet.scala( Redis groupId Offset)
package day14
import java.util
import org.apache.kafka.common.TopicPartition
import utils.JPools
object JedisOffSet {
def apply(groupid: String)={
var fromDbOffset = Map[TopicPartition, Long]()
val jedis = JPools.getJedis
val topicPartitionOffset: util.Map[String, String] = jedis.hgetAll(groupid)
import scala.collection.JavaConversions._
val topicPartitionOffsetList: List[(String, String)] = topicPartitionOffset.toList
for (topicPL topicPL._2.toLong)
}
fromDbOffset
}
}
Streaming_Kafka_Redis_Offset.scala
package day14
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import utils.JPools
/**
* streaming, kafka, redis wordcount
* redis
*/
object Streaming_Kafka_Redis_Offset {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
// SparkStreaming
val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
conf.set("spark.streaming.kafka.maxRatePerPartition","5") // kafka (5)*( )*( )
conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true") //
val ssc = new StreamingContext(conf,Seconds(2))
//
val groupId = "day14_001"
//
val topic = "wordcount"
/**
* kafka
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val formOffset: Map[TopicPartition, Long] = JedisOffSet(groupId)
// kafka
val stream = if(formOffset.size == 0) {
KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams))
} else {
KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](formOffset.keys, kafkaParams, formOffset))
}
// Master Driver
stream.foreachRDD(rdd => {
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val reduced = rdd.map(x => (x.value(),1)).reduceByKey(_+_)
reduced.foreachPartition(rdd => {
// redis
val redis = JPools.getJedis
rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})
redis.close()
})
// redis
val jedis = JPools.getJedis
for(o