SparkR で timestamp 型のカラムがうまく表示されない不具合を魔改造でどうにかする #rstatsj


Spark 1.5.0 でバグ修正されたようです。2015/09/09

今までファイルで扱っていたアクセスログをすべて Parquet にして SparkR で集計するようにしています。
しかし、DataFrame に java.sql.Timestamp 由来のカラムが一つでもあると、SparkR がエラーを出します。

ちょっと再現してみましょう。

まずは、Scala で java.sql.Timestamp のカラムだけからなる DataFrame を作り、Parquet で保存します。

spark-shell
$ spark-shell
scala> import java.sql.Timestamp
scala> case class TimestampTest(timestamp: Timestamp)
scala> import sqlContext.implicits._
scala> val rdd = sc.parallelize(1 to 3).map(i => TimestampTest(Timestamp.valueOf("2015-01-01 00:00:0" + i)))
scala> val dataframe = rdd.toDF
scala> dataframe.write.parquet("timestamp-test.parquet")

これを R 側から読み込んで DataFrame に戻します。

R
library(SparkR)

sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)

df <- parquetFile(sqlContext, "timestamp-test.parquet")
print(df)
結果
DataFrame[timestamp:timestamp]

ちゃんと DataFrame として読み込めたようです。

しかし、この DataFrame に対して head() をしてみると、

R
head(df)
Error in as.data.frame.default(x[[i]], optional = TRUE) : 
  cannot coerce class ""jobj"" to a data.frame

エラーが出ます。

原因は、SparkR では timestamp 型として java.sql.Time のみをサポートしており、java.sql.Timestamp は Java オブジェクトとしてそのまま保持されてしまうためのようです。
しかし、Parquet は timestamp 型として java.sql.Timestamp しかサポートしておらず、java.sql.Time では保存できません。

これは困りました。

しょうがないので、SparkR を魔改造して collect() 関数の中身を、java.sql.Timestamp を見つけたら POSIXct へ変換するように書き換えてしまいましょう。
これによって、head()take() などの関連する関数の挙動も変わります。

R
# Override SparkR::collect --------------------------------------------------------  
collect <- function(x, stringsAsFactors = FALSE) {
  # listCols is a list of raw vectors, one per column
  listCols <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
  cols <- lapply(listCols, function(col) {
    objRaw <- rawConnection(col)
    numRows <- SparkR:::readInt(objRaw)
    col <- SparkR:::readCol(objRaw, numRows)
    close(objRaw)
    ### begin added area to read Timestamp ###
    if(is.list(col) && length(col) > 0) {
      obj <- col[[1]]
      class <- SparkR:::callJMethod(obj, "getClass")
      class_name <- SparkR:::callJMethod(class, "getName")
      if(class_name == "java.sql.Timestamp") {
        times <- lapply(col, function(x) {
          SparkR:::callJMethod(x, "getTime")
        })
        times <- unlist(times, use.names = FALSE) / 1000
        col <- as.POSIXct(times / 1000, origin = "1970-01-01")
      }
    }
    ### end added area ###
    col
  })
  names(cols) <- columns(x)
  do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
}
SparkR_env <- asNamespace("SparkR")
environment(collect) <- SparkR_env
invisible(eval(setMethod("collect", signature(x = "DataFrame"), collect), SparkR_env))

それでは再び head() を実行してみましょう。

R
head(df)
結果
            timestamp
1 2015-01-01 00:00:01
2 2015-01-01 00:00:02
3 2015-01-01 00:00:03

できました!

というわけで、この機能を SparkRext パッケージに追加しました。

Enjoy!

参考