scalaのinsertintoにhiveデータを挿入して重複または文字化けあるいは空です。


データ読み書き
詳細は公式サイトをご覧くださいhttp://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
 
hiveデータ
読み込み:
#   hive  enableHiveSupport(),     hql hive    
spark = SparkSession.builder.enableHiveSupport().master("local[*]").appName("read_hive").getOrCreate()
df=spark.sql("select * from age")
df.show()
+--------------+------+
|       country|median|
+--------------+------+
|   New Zealand|  39.0|
+--------------+------+
only showing top 20 rows
書き込み:
#     
spark.sql('create table if not exists age2(name string, num int)')
#  dataframe    
df.write.insertInto("age2")#       ,        

df.write.mode("overwrite).saveAsTable("age2")#  
#       
spark.sql('select * from age2 sort by num limit 1 ').show()
+-----------+---+
|       name|num|
+-----------+---+
|New Zealand| 39|
+-----------+---+
ノート:
1、テーブルにパーティションがある場合、使用する
動的挿入の設定:
    val sparkConf = new SparkConf()
    //KryoSerialization   , JavaSerializer 10   ,       org.apache.spark.serializer   。 Kryo         
    // java.io.Serializable      。   JavaSerializer
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.speculation.interval", "10000ms")
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("hive.exec.dynamic.partition", "true")
    sparkConf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    val spark = SparkSession.builder()
      .appName("")
      .enableHiveSupport()
      .config(sparkConf).getOrCreate()
次のようにパーティションを正常に挿入し、パーティションを上書きします。
 val tagDF = spark.sql(s"select id,device_uuid,aver_score,'$dt' from tmp_3)//dt     
 tagDF.repartition(100)
      .write
      .mode("overwrite")
      .insertInto("pmp_3")
テストしましたが、パーティションがないテーブルに対してinsertintoデータを使うと重複した空きが発生します。
この時間、もしパーティションがないなら、saveAstable overwriteを使って完全に上書きできます。コマンドは以下の通りです。
   val tagDf = spark.sql("select id,device_uuid,score from  pmp_tmp_1")
    tagDf.repartition(100)
     .write
      .mode("overwrite")
     .saveAsTable("pmp_tmp_1")