spark sqlのdataframeでhiveのpartition付きテーブルにsave or insertIntoするには


  • partitionByで指定して
  • saveAsTable or insertInto

でいけると思いきや、バグ or 仕様が実装されてないかで、そのままでは
save or insert できないっぽい。

なので、

  • hive tableは自前でcreate tableしておく with partition
    • df.saveAsTableでhive tableをcreateしてくれるが、partitionByが効かない。一回作って、partitionを編集して、自前でcreate tableする。
  • regsiterTempTableでtemp tableに吐き出して
  • 普通のspark上で hiveqlでinsertする。dynamic partitionで
import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._

val options = new HashMap[String, String]

options.put("spark.hadoop.mapred.output.compress", "true")
options.put("spark.hadoop.mapred.output.compression.codec", "true")
//options.put("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
options.put("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")
options.put("spark.hadoop.mapred.output.compression.type", "BLOCK")
//options.put("parquet.compression","GZIP")

//SET hive.exec.compress.output=true;
//SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
//SET mapred.output.compression.type=BLOCK;

//val sqlContext = new SQLContext(sc)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._

sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
sqlContext.setConf("parquet.compression", "SNAPPY")
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

val date="2016-07-23"
val hour="00"

// imp
val impRdd = sc.textFile(s"/data/imp/$date/$hour")
val parsedImp = impRdd.map(LogParser.parse)

val impDf = parsedImp.map(LogParser.parseForDFRow2).toDF
scala.util.Try(sqlContext.dropTempTable("temp_imp"))
inviewDf.registerTempTable("temp_imp") 
sqlContext.sql(s"INSERT overwrite TABLE sandbox.partitioned_imp partition(dt, dh) SELECT * FROM temp_imp limit 10")