Spark SparkSQLのデータのロードと着地
3434 ワード
1.データのロード
readを使用load(path)
デフォルトでロードされているのは
ロード
標準テキストファイルのロード
CSV形式ファイルのロード
ORCファイルのロード
MySQLのデータのロード
2.データの着地------DataFrameまたはDatasetに対する操作
書く
その他の形式のファイルについては、例えば
MySQLなどの関係型データベースについては、まず書き込みモードを見てみましょう
1)デフォルト書き込みテーブルが存在する場合はエラー
2)MySQLへの追加書き込み
3)MySQLへの上書き:元のテーブルのデータを上書き
4)目標出力が存在する場合、今回の書き込みは無視する
readを使用load(path)
デフォルトでロードされているのは
parquet
形式のファイルですが、他のタイプのファイルをロードする必要がある場合はformat( )
で指定する必要があります.もちろん、sparkはいくつかの主要なフォーマットのファイルのロードに対してすべてもっと簡潔なAPIの操作方式を提供しましたロード
json
フォーマットファイル----ファイルのフォーマット統一を要求spark.read.format("csv").load("file:///C:\\Users\\mycat\\Desktop/test.json")
// json
spark.read.json("file:///C:\\Users\\mycat\\Desktop/test.json")
標準テキストファイルのロード
spark.read.format("text").load("file:///D://wordin/w1.txt")
//
spark.read.textFile("file:///D://wordin/w1.txt")
CSV形式ファイルのロード
spark.read.format("csv").load("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")
//
spark.read.csv("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")//.toDF("addressid","address") // toDF
ORCファイルのロード
spark.read.format("orc").load(“file:///D://test.orc”)
//
spark.read.orc(“file:///D://test.orc”)
MySQLのデータのロード
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","xxx")
val df = spark.read.jdbc(url,table,properties)
2.データの着地------DataFrameまたはDatasetに対する操作
書く
json
フォーマットファイルval spark=SparkSession.builder().appName("dfdemo")
.master("local[*]")
.getOrCreate()
val jdf = spark.read.json("file:///D://test/p1.json")
//
jdf.write.format("json").save("ile:///D://test/p2.json")
// jdf.write.json("ile:///D://test/p2.json")
その他の形式のファイルについては、例えば
orc
,parquet
,csv
等の形式の形式データの着地は、上と同様である.MySQLなどの関係型データベースについては、まず書き込みモードを見てみましょう
SaveMode
:ErrorIfExists:// ,
Append: ,
Override: ,
Ignore:
1)デフォルト書き込みテーブルが存在する場合はエラー
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.jdbc(url,table,properties)
// mdf.write.mode(SaveMode.ErrorIfExists).jdbc(url,table,properties)
2)MySQLへの追加書き込み
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.mode(SaveMode.Append).jdbc(url,table,properties)
3)MySQLへの上書き:元のテーブルのデータを上書き
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.mode(SaveMode.Override).jdbc(url,table,properties)
4)目標出力が存在する場合、今回の書き込みは無視する
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.mode(SaveMode.Ignore).jdbc(url,table,properties)