Spark読み書きcsv,txt,json,xlsx,xml,avroファイル
Spark読み書きcsv,txt,json,xlsx,xml,avroファイル 1. Spark読み書きcsvファイル 2. Spark読み書きtxtファイル 3. Spark読み書きjsonファイル 4. Spark読み書きexcelファイル 5. Spark読み書きxmlファイル 6. Spark avroファイル読み込み
Sparkがテキストファイルを読み込むとき、多くのファイルフォーマットに直面するのは頭が痛いことです.幸いdatabricksは豊富なapiを提供して解析を行います.私たちは対応する依存パッケージを導入し、Spark SqlContextを使って読み取りと解析を行うだけで、フォーマットされたデータを得ることができます.
次にsparkがhdfsからよく見られるいくつかのテキストファイルを読み書き解析する方法について説明します.
1.Spark読み書きcsvファイル導入する外部jarパッケージ csvファイルコアコードの読み込み DataFrameはcsvファイルコアコード にロードされます.spark-csvプロジェクトソース https://github.com/databricks/spark-csv
2.Spark txtファイルの読み書きhdfsディレクトリ下/home/test/testTxt.txtファイル内容 txtファイルコアコードの読み込み
txtファイルは行全体で読み込まれ、関連するフィールドを取得する必要がある場合は、DataFrameの列を分割する必要があります.詳細は次のブログ「Spark DataFrame列分割とマージ」を参照してください.DataFrameはtxtファイルコアコード にロードされます.
3.Spark jsonファイルの読み書きhdfsディレクトリ下/home/test/testJson.jsonファイル内容 jsonファイルコアコードの読み込み DataFrameはjsonファイルコアコード にロードされます.
4.Spark excelファイルの読み書き導入する外部jarパッケージ xlsx|xlsファイルコアコードの読み込み DataFrameロードxlsx|xlsファイルコアコード spark-excelプロジェクトソース https://github.com/crealytics/spark-excel
5.Spark読み書きxmlファイル test.xmlファイル 導入する外部jarパッケージ xmlファイルコアコードの読み込み DataFrameはxmlファイルコアコード にロードされます.spark-xmlプロジェクトソース https://github.com/databricks/spark-xml
6.Spark avroファイルを読み込む導入する外部jarパッケージ avroファイルコアコードの読み込み
Sparkがテキストファイルを読み込むとき、多くのファイルフォーマットに直面するのは頭が痛いことです.幸いdatabricksは豊富なapiを提供して解析を行います.私たちは対応する依存パッケージを導入し、Spark SqlContextを使って読み取りと解析を行うだけで、フォーマットされたデータを得ることができます.
次にsparkがhdfsからよく見られるいくつかのテキストファイルを読み書き解析する方法について説明します.
1.Spark読み書きcsvファイル
<dependency>
<groupId>com.databricksgroupId>
<artifactId>spark-csv_2.11artifactId>
<version>1.4.0version>
dependency>
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter", ",") //
.option("header", "true") // header
.option("inferSchema", "false") //
.option("codec", "none") //
.load(csvFile) // csv hdfs +
df.write.format("com.databricks.spark.csv")
.option("header", "true")
.option("codec", "none")
.save(tempHdfsPath) // hdfs ,
2.Spark txtファイルの読み書き
a,b,c,d
123,345,789,5
34,45,90,9878
scala> sqlContext.read.text("/home/test/testTxt.txt").show
+-------------+
| value|
+-------------+
| a,b,c,d|
|123,345,789,5|
|34,45,90,9878|
+-------------+
txtファイルは行全体で読み込まれ、関連するフィールドを取得する必要がある場合は、DataFrameの列を分割する必要があります.詳細は次のブログ「Spark DataFrame列分割とマージ」を参照してください.
// dateframe
val columnArr = df.columns.map {
colName =>
df.col(colName)
}
df.select(concat_ws(",", columnArr: _*) //
.cast(StringType))
.write.format("text")
.save(tempHdfsPath) // hdfs ,
3.Spark jsonファイルの読み書き
{"a":"1747","b":"id _SDK_ ","c":1,"d":"2018112713"}
{"a":"456","b":"232","c":10,"d":"203227324"}
scala> sqlContext.read.format("json").load("/home/test/testJson.json").show
+----+------------+---+----------+
| a| b| c| d|
+----+------------+---+----------+
|1747|id _SDK_ | 1|2018112713|
| 456| 232| 10| 203227324|
+----+------------+---+----------+
df.write.format("json")
.save(tempHdfsPath) // hdfs ,
4.Spark excelファイルの読み書き
<dependency>
<groupId>com.crealyticsgroupId>
<artifactId>spark-excel_2.11artifactId>
<version>0.12.2version>
dependency>
import org.apache.spark.sql._
val spark: SparkSession = ???
val df = spark.read
.format("com.crealytics.spark.excel")
.option("useHeader", "true") //
.option("inferSchema", "false") // schema
.option("workbookPassword", "None") // excel
.load(excelFile) //excel +
df.write.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.option("timestampFormat", "MM-dd-yyyy HH:mm:ss")
.option("inferSchema", "false")
.option("workbookPassword", "None")
.save(tempHdfsPath) // hdfs ,
5.Spark読み書きxmlファイル
Tove
Jani
Reminder
Don't forget me this weekend!
ksdhf
Jasfdi
Re
Don't forget me
<dependency>
<groupId>com.databricksgroupId>
<artifactId>spark-xml_2.11artifactId>
<version>0.6.0version>
dependency>
import org.apache.spark.sql._
val spark: SparkSession = ???
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "testXml") // xml rowTag, ,"testXml" rowTag
.load(xmlFile) //xml +
df.write.format("com.databricks.spark.xml")
.option("rowTag", "testXml")
.option("rootTag", "catalog")
.save(tempHdfsPath) // hdfs ,
6.Spark avroファイルを読み込む
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-avro_2.11artifactId>
<version>2.4.4version>
dependency>
import org.apache.spark.sql._
val spark: SparkSession = ???
spark.conf.set("spark.sql.avro.compression.codec", "deflate") // avro
spark.conf.set("spark.sql.avro.deflate.level", "2")
val df: DataFrame = spark.read
.format("avro")
.option("avroSchema", "/.../.../test.avsc") // avsc avro
.load(avroFilePath) // avro