Kafka手動メンテナンスオフセット(offset)
4208 ワード
ケース:オフセット量をMysqlに手動で維持
●API
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
手動でoffsetをコミットし、データが失われないことを保証する 具体的なコード実装
●API
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
import java.sql.{DriverManager, ResultSet}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
*
* Date 2019/8/8 10:47
* Desc Spark-Kafka-0-10 , , MySQL
*/
object SparkKafkaDemo2 {
def main(args: Array[String]): Unit = {
//1. StreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 5 RDD
// Kafka
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node-01:9092,node-02:9092,node-03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_kafka")
//2. KafkaUtil Kafak
// :
// MySQL offset, , latest
// MySQL offset, offset
val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaDemo","spark_kafka")
val recordDStream: InputDStream[ConsumerRecord[String, String]] = if(offsetMap.size > 0){// offset
println("MySQL offset, offset ")
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,// , , Spark Executor Kafka Broker
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetMap))// ,
}else{// offset
println(" offset, , latest ")
// /opt/soft/kafka/bin/kafka-console-producer.sh --broker-list node-01:9092 --topic spark_kafka
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,// , , Spark Executor Kafka Broker
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// ,
}
//3.
// : , , offset
// DStream RDD, DStream RDD
// DStream RDD API transform( ) foreachRDD( )
recordDStream.foreachRDD(rdd=>{
if(rdd.count() > 0){//
rdd.foreach(record => println(" Kafk :" + record))
// Kafk :ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...)
// : , offset,
// .... transform
// , offset , offset / ,spark , offset
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o rs.getLong("offset")
}
rs.close()
pstmt.close()
connection.close()
offsetMap
}
/**
*
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
//replace into ,
val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
for (o