SparkR で timestamp 型のカラムがうまく表示されない不具合を魔改造でどうにかする #rstatsj
Spark 1.5.0 でバグ修正されたようです。2015/09/09
今までファイルで扱っていたアクセスログをすべて Parquet にして SparkR で集計するようにしています。
しかし、DataFrame に java.sql.Timestamp 由来のカラムが一つでもあると、SparkR がエラーを出します。
ちょっと再現してみましょう。
まずは、Scala で java.sql.Timestamp のカラムだけからなる DataFrame を作り、Parquet で保存します。
$ spark-shell
scala> import java.sql.Timestamp
scala> case class TimestampTest(timestamp: Timestamp)
scala> import sqlContext.implicits._
scala> val rdd = sc.parallelize(1 to 3).map(i => TimestampTest(Timestamp.valueOf("2015-01-01 00:00:0" + i)))
scala> val dataframe = rdd.toDF
scala> dataframe.write.parquet("timestamp-test.parquet")
これを R 側から読み込んで DataFrame に戻します。
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
df <- parquetFile(sqlContext, "timestamp-test.parquet")
print(df)
DataFrame[timestamp:timestamp]
ちゃんと DataFrame として読み込めたようです。
しかし、この DataFrame に対して head()
をしてみると、
head(df)
Error in as.data.frame.default(x[[i]], optional = TRUE) :
cannot coerce class ""jobj"" to a data.frame
エラーが出ます。
原因は、SparkR では timestamp 型として java.sql.Time のみをサポートしており、java.sql.Timestamp は Java オブジェクトとしてそのまま保持されてしまうためのようです。
しかし、Parquet は timestamp 型として java.sql.Timestamp しかサポートしておらず、java.sql.Time では保存できません。
これは困りました。
しょうがないので、SparkR を魔改造して collect()
関数の中身を、java.sql.Timestamp を見つけたら POSIXct へ変換するように書き換えてしまいましょう。
これによって、head()
や take()
などの関連する関数の挙動も変わります。
# Override SparkR::collect --------------------------------------------------------
collect <- function(x, stringsAsFactors = FALSE) {
# listCols is a list of raw vectors, one per column
listCols <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
cols <- lapply(listCols, function(col) {
objRaw <- rawConnection(col)
numRows <- SparkR:::readInt(objRaw)
col <- SparkR:::readCol(objRaw, numRows)
close(objRaw)
### begin added area to read Timestamp ###
if(is.list(col) && length(col) > 0) {
obj <- col[[1]]
class <- SparkR:::callJMethod(obj, "getClass")
class_name <- SparkR:::callJMethod(class, "getName")
if(class_name == "java.sql.Timestamp") {
times <- lapply(col, function(x) {
SparkR:::callJMethod(x, "getTime")
})
times <- unlist(times, use.names = FALSE) / 1000
col <- as.POSIXct(times / 1000, origin = "1970-01-01")
}
}
### end added area ###
col
})
names(cols) <- columns(x)
do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
}
SparkR_env <- asNamespace("SparkR")
environment(collect) <- SparkR_env
invisible(eval(setMethod("collect", signature(x = "DataFrame"), collect), SparkR_env))
それでは再び head()
を実行してみましょう。
head(df)
timestamp
1 2015-01-01 00:00:01
2 2015-01-01 00:00:02
3 2015-01-01 00:00:03
できました!
というわけで、この機能を SparkRext パッケージに追加しました。
Enjoy!
参考
Author And Source
この問題について(SparkR で timestamp 型のカラムがうまく表示されない不具合を魔改造でどうにかする #rstatsj), 我々は、より多くの情報をここで見つけました https://qiita.com/hoxo_m/items/1a4147e59bf574c99d53著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .