spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange
7588 ワード
1.問題の再現
redisでoffsetをメンテナンスしようと思ったとき、習慣的に以下のプログラムを書きました.
inputDS.map(log => log.value()).foreachRDD { rdd =>
//redis offset
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
RedisOffsetUtil.saveOffsetToRedis(ranges, kafkaParams("group.id"))
//
bc = BroadcastProcess.monitorFilterRule(ssc, bc, jedis)
classifyBC = BroadcastProcess.monitorClassifyRule(ssc, classifyBC, jedis)
......
ここで、inputDSは私が作成したDstream、RedisOffsetUtilです.saveOffsetToRedis(ranges,kafkaParams(「group.id」)は、offsetを保存するためのカスタムメソッドです.
org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges
2.疑惑
foreachRDDメソッド、返すタイプはrddなのに、どうしてMapPartitionRDDになったの???では、どのようなRDDがOffsetRangeに変換できますか?
3.解決
資料を調べてエラーの原因を得た:
private[spark] class KafkaRDD[K, V]( ... ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges
KafkaRDD OffsetRange
InputDStream KafkaRDD
つまり、私がInputDSで実行した変換クラス演算子は、KafkaRDDを非kafkaRDDにします.このとき、変換を行うと必ずエラーが発生します.
そこで、私が解決策を得たのは、オフセット量を一歩前に取ってから正常な操作を行うことです.
inputDS.foreachRDD { rdd =>
//redis offset
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
RedisOffsetUtil.saveOffsetToRedis(ranges, kafkaParams("group.id"))
}
inputDS.map(log => log.value()).foreachRDD { rdd =>
//
bc = BroadcastProcess.monitorFilterRule(ssc, bc, jedis)
classifyBC = BroadcastProcess.monitorClassifyRule(ssc, classifyBC, jedis)
queryAndBookRulesBC = BroadcastProcess.monitorQueryAndBookRule(sc, queryAndBookRulesBC, jedis)
ipBlackListBC = BroadcastProcess.monitorIpBlackListRule(sc, ipBlackListBC, jedis)
// rdd
rdd.cache()
方法は愚かかもしれないが,有効で,この問題を解決する.