Spark SparkSQLのデータのロードと着地

3434 ワード

1.データのロード
readを使用load(path)
デフォルトでロードされているのはparquet形式のファイルですが、他のタイプのファイルをロードする必要がある場合はformat( )で指定する必要があります.もちろん、sparkはいくつかの主要なフォーマットのファイルのロードに対してすべてもっと簡潔なAPIの操作方式を提供しました
ロードjsonフォーマットファイル----ファイルのフォーマット統一を要求
spark.read.format("csv").load("file:///C:\\Users\\mycat\\Desktop/test.json")
//  json  
spark.read.json("file:///C:\\Users\\mycat\\Desktop/test.json")

標準テキストファイルのロード
spark.read.format("text").load("file:///D://wordin/w1.txt")
//  
spark.read.textFile("file:///D://wordin/w1.txt")

CSV形式ファイルのロード
spark.read.format("csv").load("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")
//   
spark.read.csv("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")//.toDF("addressid","address")   //       toDF        

ORCファイルのロード
spark.read.format("orc").load(“file:///D://test.orc”)
//   
spark.read.orc(“file:///D://test.orc”)

MySQLのデータのロード
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","xxx")
val df = spark.read.jdbc(url,table,properties)

2.データの着地------DataFrameまたはDatasetに対する操作
書くjsonフォーマットファイル
val spark=SparkSession.builder().appName("dfdemo")
  .master("local[*]")
  .getOrCreate()
val jdf = spark.read.json("file:///D://test/p1.json")

//   
jdf.write.format("json").save("ile:///D://test/p2.json")
// jdf.write.json("ile:///D://test/p2.json")

その他の形式のファイルについては、例えばorc,parquet,csv等の形式の形式データの着地は、上と同様である.
MySQLなどの関係型データベースについては、まず書き込みモードを見てみましょうSaveMode:
ErrorIfExists://  ,       
Append:    ,          
Override:    ,         
Ignore:        

1)デフォルト書き込みテーブルが存在する場合はエラー
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.jdbc(url,table,properties)
//     mdf.write.mode(SaveMode.ErrorIfExists).jdbc(url,table,properties)

2)MySQLへの追加書き込み
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.mode(SaveMode.Append).jdbc(url,table,properties)

3)MySQLへの上書き:元のテーブルのデータを上書き
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.mode(SaveMode.Override).jdbc(url,table,properties)

4)目標出力が存在する場合、今回の書き込みは無視する
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.mode(SaveMode.Ignore).jdbc(url,table,properties)