SparkSQLDataframe(v1.5.0/1.5.1)バグ回避策


spark 1.5.1 のDataframe注意点

間違った結果になるケース

val eventTableColumns = Seq[String](
    "entityType"
  , "entityId"
  , "targetEntityType"
  , "targetEntityId"
  , "properties"
  , "eventTime")

val eventDF = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
  .map { e =>
    (
      e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
    )
  }.toDF(eventTableColumns:_*)

eventDF.filter($"entityType" === "user").select("entityId").distinct.count

回避策①

  • === の代わりに isin を使う
eventDF.filter($"entityType" isin lit("user")).select("entityId").distinct.count

回避策②

  • case class を使う
case class Event(entityType: String, entityId: String,
                 targetEntityType: String, targetEntityId: String,
                 properties: String, eventTime: String)

val eventDF2 = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
  .map { e =>
    Event(
      e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
    )
  }.toDF()

eventDF2.filter($"entityType" === "user").select("entityId").distinct.count

回避策③

  • spark.sql.inMemoryColumnarStorage.partitionPruning を false にする
    sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false")
    eventDF.filter($"entityType" === "user").select("entityId").distinct.count

詳しいサンプルコードはこちら