SparkStreaming updateStateByKeyレコード情報の保存

4976 ワード

object SparkStreaming_StateFul {

def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
.set("spark.executor.memory", "2g")
.set("spark.cores.max", "8")
.setJars(Array("E:\\ScalaSpace\\Spark_Streaming\\out\\artifacts\\Spark_Streaming.jar"))
val context = new SparkContext(conf)

val updateFunc = (values : Seq[Int],state : Option[Int]) => {
val currentCount = values.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0)
Some//step1 create streaming context
val new ,Seconds10"."//step2 create a networkInputStream on get ip:port and count the words in input stream of
delimited text
val "218.193.154.79",12345val " "val ,1// updateStateByKey
val Int

ssc.checkPoint                 :
org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[9] at print at SparkStreaming_StateFul.scala:43(0) has different number of partitions from original RDD MapPartitionsRDD[8] at updateStateByKey at SparkStreaming_StateFul.scala:41(2)
	at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:73)
	at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74)

            hdfs      





From WizNote
転載先:https://www.cnblogs.com/zDanica/p/5471611.html