spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange


1.問題の再現


redisでoffsetをメンテナンスしようと思ったとき、習慣的に以下のプログラムを書きました.
    inputDS.map(log => log.value()).foreachRDD { rdd =>
      //redis  offset
      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      RedisOffsetUtil.saveOffsetToRedis(ranges, kafkaParams("group.id"))
      //      
      bc = BroadcastProcess.monitorFilterRule(ssc, bc, jedis)
      classifyBC = BroadcastProcess.monitorClassifyRule(ssc, classifyBC, jedis)
      ......

ここで、inputDSは私が作成したDstream、RedisOffsetUtilです.saveOffsetToRedis(ranges,kafkaParams(「group.id」)は、offsetを保存するためのカスタムメソッドです.
org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges

2.疑惑


foreachRDDメソッド、返すタイプはrddなのに、どうしてMapPartitionRDDになったの???では、どのようなRDDがOffsetRangeに変換できますか?

3.解決


資料を調べてエラーの原因を得た:
private[spark] class KafkaRDD[K, V](   ...   ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges 

  KafkaRDD      OffsetRange
     InputDStream            KafkaRDD

つまり、私がInputDSで実行した変換クラス演算子は、KafkaRDDを非kafkaRDDにします.このとき、変換を行うと必ずエラーが発生します.
そこで、私が解決策を得たのは、オフセット量を一歩前に取ってから正常な操作を行うことです.
 inputDS.foreachRDD { rdd =>
      //redis  offset
      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      RedisOffsetUtil.saveOffsetToRedis(ranges, kafkaParams("group.id"))
    }

    inputDS.map(log => log.value()).foreachRDD { rdd =>

      //      
      bc = BroadcastProcess.monitorFilterRule(ssc, bc, jedis)
      classifyBC = BroadcastProcess.monitorClassifyRule(ssc, classifyBC, jedis)
      queryAndBookRulesBC = BroadcastProcess.monitorQueryAndBookRule(sc, queryAndBookRulesBC, jedis)
      ipBlackListBC = BroadcastProcess.monitorIpBlackListRule(sc, ipBlackListBC, jedis)
      // rdd   
      rdd.cache()

方法は愚かかもしれないが,有効で,この問題を解決する.