scalaのinsertintoにhiveデータを挿入して重複または文字化けあるいは空です。
2409 ワード
データ読み書き
詳細は公式サイトをご覧くださいhttp://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
hiveデータ
読み込み:
1、テーブルにパーティションがある場合、使用する
動的挿入の設定:
この時間、もしパーティションがないなら、saveAstable overwriteを使って完全に上書きできます。コマンドは以下の通りです。
詳細は公式サイトをご覧くださいhttp://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
hiveデータ
読み込み:
# hive enableHiveSupport(), hql hive
spark = SparkSession.builder.enableHiveSupport().master("local[*]").appName("read_hive").getOrCreate()
df=spark.sql("select * from age")
df.show()
+--------------+------+
| country|median|
+--------------+------+
| New Zealand| 39.0|
+--------------+------+
only showing top 20 rows
書き込み:#
spark.sql('create table if not exists age2(name string, num int)')
# dataframe
df.write.insertInto("age2")# ,
df.write.mode("overwrite).saveAsTable("age2")#
#
spark.sql('select * from age2 sort by num limit 1 ').show()
+-----------+---+
| name|num|
+-----------+---+
|New Zealand| 39|
+-----------+---+
ノート:1、テーブルにパーティションがある場合、使用する
動的挿入の設定:
val sparkConf = new SparkConf()
//KryoSerialization , JavaSerializer 10 , org.apache.spark.serializer 。 Kryo
// java.io.Serializable 。 JavaSerializer
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.speculation.interval", "10000ms")
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("hive.exec.dynamic.partition", "true")
sparkConf.set("hive.exec.dynamic.partition.mode", "nonstrict")
val spark = SparkSession.builder()
.appName("")
.enableHiveSupport()
.config(sparkConf).getOrCreate()
次のようにパーティションを正常に挿入し、パーティションを上書きします。 val tagDF = spark.sql(s"select id,device_uuid,aver_score,'$dt' from tmp_3)//dt
tagDF.repartition(100)
.write
.mode("overwrite")
.insertInto("pmp_3")
テストしましたが、パーティションがないテーブルに対してinsertintoデータを使うと重複した空きが発生します。この時間、もしパーティションがないなら、saveAstable overwriteを使って完全に上書きできます。コマンドは以下の通りです。
val tagDf = spark.sql("select id,device_uuid,score from pmp_tmp_1")
tagDf.repartition(100)
.write
.mode("overwrite")
.saveAsTable("pmp_tmp_1")