Spark読み書きcsv,txt,json,xlsx,xml,avroファイル


Spark読み書きcsv,txt,json,xlsx,xml,avroファイル
  • 1. Spark読み書きcsvファイル
  • 2. Spark読み書きtxtファイル
  • 3. Spark読み書きjsonファイル
  • 4. Spark読み書きexcelファイル
  • 5. Spark読み書きxmlファイル
  • 6. Spark avroファイル読み込み

  • Sparkがテキストファイルを読み込むとき、多くのファイルフォーマットに直面するのは頭が痛いことです.幸いdatabricksは豊富なapiを提供して解析を行います.私たちは対応する依存パッケージを導入し、Spark SqlContextを使って読み取りと解析を行うだけで、フォーマットされたデータを得ることができます.
    次にsparkがhdfsからよく見られるいくつかのテキストファイルを読み書き解析する方法について説明します.
    1.Spark読み書きcsvファイル
  • 導入する外部jarパッケージ
  • <dependency>
        <groupId>com.databricksgroupId>
        <artifactId>spark-csv_2.11artifactId>
        <version>1.4.0version>
    dependency>
    
  • csvファイルコアコードの読み込み
  • import org.apache.spark.sql.SQLContext
    val sqlContext = new SQLContext(sc)
    sqlContext.read
        .format("com.databricks.spark.csv")
        .option("delimiter", ",") //      
        .option("header", "true") //           header
        .option("inferSchema", "false") //           
        .option("codec", "none") //     
        .load(csvFile) // csv    hdfs   +    
    
  • DataFrameはcsvファイルコアコード
  • にロードされます.
    df.write.format("com.databricks.spark.csv")
    	.option("header", "true")
    	.option("codec", "none")
    	.save(tempHdfsPath) //    hdfs  ,       
    
  • spark-csvプロジェクトソース
  • https://github.com/databricks/spark-csv
    2.Spark txtファイルの読み書き
  • hdfsディレクトリ下/home/test/testTxt.txtファイル内容
  • a,b,c,d
    123,345,789,5
    34,45,90,9878
    
  • txtファイルコアコードの読み込み
  • scala> sqlContext.read.text("/home/test/testTxt.txt").show
    +-------------+
    |        value|
    +-------------+
    |      a,b,c,d|
    |123,345,789,5|
    |34,45,90,9878|
    +-------------+
    

    txtファイルは行全体で読み込まれ、関連するフィールドを取得する必要がある場合は、DataFrameの列を分割する必要があります.詳細は次のブログ「Spark DataFrame列分割とマージ」を参照してください.
  • DataFrameはtxtファイルコアコード
  • にロードされます.
    //  dateframe      
    val columnArr = df.columns.map {
          colName =>  
    	df.col(colName)
    }
    
    df.select(concat_ws(",", columnArr: _*) //            
    	.cast(StringType))
    	.write.format("text")
    	.save(tempHdfsPath) //    hdfs  ,       
    

    3.Spark jsonファイルの読み書き
  • hdfsディレクトリ下/home/test/testJson.jsonファイル内容
  • {"a":"1747","b":"id  _SDK_   ","c":1,"d":"2018112713"}
    {"a":"456","b":"232","c":10,"d":"203227324"}
    
  • jsonファイルコアコードの読み込み
  • scala> sqlContext.read.format("json").load("/home/test/testJson.json").show
    +----+------------+---+----------+                                              
    |   a|           b|  c|         d|
    +----+------------+---+----------+
    |1747|id  _SDK_   |  1|2018112713|
    | 456|         232| 10| 203227324|
    +----+------------+---+----------+
    
  • DataFrameはjsonファイルコアコード
  • にロードされます.
    df.write.format("json")
    	.save(tempHdfsPath) //    hdfs  ,       
    

    4.Spark excelファイルの読み書き
  • 導入する外部jarパッケージ
  • <dependency>
    	<groupId>com.crealyticsgroupId>
    	<artifactId>spark-excel_2.11artifactId>
    	<version>0.12.2version>
    dependency>
    
  • xlsx|xlsファイルコアコードの読み込み
  • import org.apache.spark.sql._
    
    val spark: SparkSession = ???
    val df = spark.read
        .format("com.crealytics.spark.excel")
        .option("useHeader", "true") //           
        .option("inferSchema", "false") //     schema
        .option("workbookPassword", "None") // excel       
        .load(excelFile) //excel     +    
    
  • DataFrameロードxlsx|xlsファイルコアコード
  • df.write.format("com.crealytics.spark.excel")
    	.option("useHeader", "true")
    	.option("timestampFormat", "MM-dd-yyyy HH:mm:ss")
    	.option("inferSchema", "false")
    	.option("workbookPassword", "None") 
    	.save(tempHdfsPath) //    hdfs  ,       
    
  • spark-excelプロジェクトソース
  • https://github.com/crealytics/spark-excel
    5.Spark読み書きxmlファイル
  • test.xmlファイル
  • 
    	
    		Tove
    		Jani
    		Reminder
    		Don't forget me this weekend!
    	
    	
    		ksdhf
    		Jasfdi
    		Re
    		Don't forget me
    	
    
    
  • 導入する外部jarパッケージ
  • <dependency>
    	<groupId>com.databricksgroupId>
    	<artifactId>spark-xml_2.11artifactId>
    	<version>0.6.0version>
    dependency>
    
  • xmlファイルコアコードの読み込み
  • import org.apache.spark.sql._
    
    val spark: SparkSession = ???
    val df = spark.read
        .format("com.databricks.spark.xml")
        .option("rowTag", "testXml") // xml  rowTag,    ,"testXml"    rowTag
        .load(xmlFile) //xml    +   
    
  • DataFrameはxmlファイルコアコード
  • にロードされます.
    df.write.format("com.databricks.spark.xml")
    	.option("rowTag", "testXml")
    	.option("rootTag", "catalog")
    	.save(tempHdfsPath) //    hdfs  ,       
    
  • spark-xmlプロジェクトソース
  • https://github.com/databricks/spark-xml
    6.Spark avroファイルを読み込む
  • 導入する外部jarパッケージ
  • <dependency>
    	<groupId>org.apache.sparkgroupId>
    	<artifactId>spark-avro_2.11artifactId>
    	<version>2.4.4version>
    dependency>
    
  • avroファイルコアコードの読み込み
  • import org.apache.spark.sql._
    
    val spark: SparkSession = ???
    spark.conf.set("spark.sql.avro.compression.codec", "deflate")   //  avro      
    spark.conf.set("spark.sql.avro.deflate.level", "2")
    
    val df: DataFrame = spark.read
    	.format("avro")
    	.option("avroSchema", "/.../.../test.avsc")  //  avsc   avro      
    	.load(avroFilePath)  //  avro