SparkSQLDataframe(v1.5.0/1.5.1)バグ回避策
spark 1.5.1 のDataframe注意点
- DataframeをtoDF("entityId","entityType", "targetEntity", ...) で生成したあとfilter or where 関数で、=== がうまく動作しないバグを回避する方法
- https://issues.apache.org/jira/browse/SPARK-10859
- v1.5.2以降にはFixされる予定で、1.5.2-RC2 では修正済み。
- 2015/11/09 v1.5.2がリリースされました(http://spark.apache.org/news/spark-1-5-2-released.html)
間違った結果になるケース
val eventTableColumns = Seq[String](
"entityType"
, "entityId"
, "targetEntityType"
, "targetEntityId"
, "properties"
, "eventTime")
val eventDF = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF(eventTableColumns:_*)
eventDF.filter($"entityType" === "user").select("entityId").distinct.count
回避策①
- === の代わりに isin を使う
eventDF.filter($"entityType" isin lit("user")).select("entityId").distinct.count
回避策②
- case class を使う
case class Event(entityType: String, entityId: String,
targetEntityType: String, targetEntityId: String,
properties: String, eventTime: String)
val eventDF2 = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
Event(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF()
eventDF2.filter($"entityType" === "user").select("entityId").distinct.count
回避策③
- spark.sql.inMemoryColumnarStorage.partitionPruning を false にする
sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false")
eventDF.filter($"entityType" === "user").select("entityId").distinct.count
詳しいサンプルコードはこちら
val eventTableColumns = Seq[String](
"entityType"
, "entityId"
, "targetEntityType"
, "targetEntityId"
, "properties"
, "eventTime")
val eventDF = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF(eventTableColumns:_*)
eventDF.filter($"entityType" === "user").select("entityId").distinct.count
eventDF.filter($"entityType" isin lit("user")).select("entityId").distinct.count
case class Event(entityType: String, entityId: String,
targetEntityType: String, targetEntityId: String,
properties: String, eventTime: String)
val eventDF2 = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
Event(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF()
eventDF2.filter($"entityType" === "user").select("entityId").distinct.count
sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false")
eventDF.filter($"entityType" === "user").select("entityId").distinct.count
Author And Source
この問題について(SparkSQLDataframe(v1.5.0/1.5.1)バグ回避策), 我々は、より多くの情報をここで見つけました https://qiita.com/teru1000/items/faf5ef339730a56a0771著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .