Spark読み書きCSVの共通構成項目
12685 ワード
Spark 2.0の前に、Spark SQLはCSV形式のファイルを読み書きし、Databricksが公式に提供するspark-csvライブラリが必要です.Spark 2.0以降、Spark SQLはCSV形式のファイルの読み書きをサポートします.テストのタイトル付きファイルは次のとおりです.
出力結果は次のとおりです.
パラメータの説明: delimiterセパレータ、デフォルトはカンマ、 nullValue null値 を表す文字列を指定します. quote引用符文字、デフォルトは二重引用符" ヘッダの1行目はデータ内容とせず、ヘッダ とする. inferSchema自動推定フィールドタイプ 自動推定フィールドタイプは折衷シナリオにすぎません.より良いシナリオは、フィールドタイプを指定することです.
タイトルのないファイルをテストします.
タイプを指定:
結果は上の結果と同じです
CSVファイルを書く:
ファイルの内容と読み取りの出力にはいくつかの変化があります.
最初の変更点:書き出すファイルは二重引用符"を増加して、引用符のある地方で更に引用符を増加して、二重引用符がデフォルトの値なため、もし増加したくないならば、注釈を開けて、引用符を空に設定します
2つ目の変化:darren前後のスペースがなくなりました.Spark 2.1.1では、CSV形式のファイルをSpark SQLで保存します.デフォルトでは、文字列の前後のスペースが自動的に切り捨てられます.
このようなデフォルトの動作は、Spark 2.2.0以降、設定をオフにして機能を変更することができます.
パラメータの説明: ignoreLeadingWhiteSpace前のスペースを切り取る ignoreTrailingWhiteSpace裁断後のスペース nullValue空の値の設定は、任意の記号を空の値として使用したくない場合はnullを割り当てることができます .
質問:では、spark読み書きCSVにはいったいどれだけの属性が設定できるのでしょうか.
答え:何個あるかを示す資料は見つかりませんでしたが、ソースコードが見つかり、何個あるか判断できます.
ソースコードは次のとおりです.
テキストリンク:http://dyingbleed.com/spark-ii-5/
id|name|age
1| darren |18
2|anne|18
3|"test"|18
4|'test2'|18
package com.darren.spark.sql.csv
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
object CSVReader {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV Reader")
.master("local")
.getOrCreate()
val result = spark.read.format("csv")
.option("delimiter", "|")
.option("header", "true")
.option("quote", "'")
.option("nullValue", "\\N")
.option("inferSchema", "true")
.load("test-in/csv/csv_with_header.csv")
result.show()
result.printSchema()
}
}
出力結果は次のとおりです.
+---+--------+----+
| id| name| age|
+---+--------+----+
| 1| darren | 18|
| 2| anne| 18|
| 3| "test"| 18|
| 4| test2|null|
+---+--------+----+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
パラメータの説明:
package com.darren.spark.sql.csv
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
case class User(id: Int, name: String, age: Int)
タイトルのないファイルをテストします.
1| darren |18
2|anne|18
3|"test"|18
4|'test2'|\N
package com.darren.spark.sql.csv
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
object CSVReader {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV Reader")
.master("local")
.getOrCreate()
val result = spark.read.format("csv")
.option("delimiter", "|")
//.option("header", "true")
.option("quote", "'")
.option("nullValue", "\\N")
.option("inferSchema", "true")
.load("test-in/csv/csv_without_header.csv")
.toDF("id", "name", "age")
result.show()
result.printSchema()
}
}
+---+--------+----+
| id| name| age|
+---+--------+----+
| 1| darren | 18|
| 2| anne| 18|
| 3| "test"| 18|
| 4| test2|null|
+---+--------+----+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
タイプを指定:
package com.darren.spark.sql.csv
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
object CSVReader {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV Reader")
.master("local")
.getOrCreate()
val result = spark.read.format("csv")
.option("delimiter", "|")
//.option("header", "true")
.option("quote", "'")
.option("nullValue", "\\N")
.option("inferSchema", "true")
.schema(ScalaReflection.schemaFor[User].dataType.asInstanceOf[StructType])
.load("test-in/csv/csv_without_header.csv")
//.toDF("id", "name", "age")
result.show()
result.printSchema()
}
}
結果は上の結果と同じです
CSVファイルを書く:
result.write
.mode(SaveMode.Overwrite)
.option("delimiter", "|")
// .option("quote", "")
.format("csv")
.save("test-out/csv/")
1|darren|18
2|anne|18
3|"\"test\""|18
4|test2|
ファイルの内容と読み取りの出力にはいくつかの変化があります.
最初の変更点:書き出すファイルは二重引用符"を増加して、引用符のある地方で更に引用符を増加して、二重引用符がデフォルトの値なため、もし増加したくないならば、注釈を開けて、引用符を空に設定します
2つ目の変化:darren前後のスペースがなくなりました.Spark 2.1.1では、CSV形式のファイルをSpark SQLで保存します.デフォルトでは、文字列の前後のスペースが自動的に切り捨てられます.
このようなデフォルトの動作は、Spark 2.2.0以降、設定をオフにして機能を変更することができます.
result.write
.mode(SaveMode.Overwrite)
.option("delimiter", "|")
// .option("quote", "")
.option("ignoreLeadingWhiteSpace", false)
.option("ignoreTrailingWhiteSpace", false)
.option("nullValue", null)
.format("csv")
.save("test-out/csv/")
パラメータの説明:
質問:では、spark読み書きCSVにはいったいどれだけの属性が設定できるのでしょうか.
答え:何個あるかを示す資料は見つかりませんでしたが、ソースコードが見つかり、何個あるか判断できます.
ソースコードは次のとおりです.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
class CSVOptions(
@transient private val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
def this(
parameters: Map[String, String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String = "") = {
this(
CaseInsensitiveMap(parameters),
defaultTimeZoneId,
defaultColumnNameOfCorruptRecord)
}
private def getChar(paramName: String, default: Char): Char = {
val paramValue = parameters.get(paramName)
paramValue match {
case None => default
case Some(null) => default
case Some(value) if value.length == 0 => '\u0000'
case Some(value) if value.length == 1 => value.charAt(0)
case _ => throw new RuntimeException(s"$paramName cannot be more than one character")
}
}
private def getInt(paramName: String, default: Int): Int = {
val paramValue = parameters.get(paramName)
paramValue match {
case None => default
case Some(null) => default
case Some(value) => try {
value.toInt
} catch {
case e: NumberFormatException =>
throw new RuntimeException(s"$paramName should be an integer. Found $value")
}
}
}
private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = parameters.getOrElse(paramName, default.toString)
if (param == null) {
default
} else if (param.toLowerCase(Locale.ROOT) == "true") {
true
} else if (param.toLowerCase(Locale.ROOT) == "false") {
false
} else {
throw new Exception(s"$paramName flag can be true or false")
}
}
val delimiter = CSVUtils.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
val quote = getChar("quote", '\"')
val escape = getChar("escape", '\\')
val comment = getChar("comment", '\u0000')
val headerFlag = getBool("header")
val inferSchemaFlag = getBool("inferSchema")
val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", default = false)
val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", default = false)
// For write, both options were `true` by default. We leave it as `true` for
// backwards compatibility.
val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLeadingWhiteSpace", default = true)
val ignoreTrailingWhiteSpaceFlagInWrite = getBool("ignoreTrailingWhiteSpace", default = true)
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
val nullValue = parameters.getOrElse("nullValue", "")
val nanValue = parameters.getOrElse("nanValue", "NaN")
val positiveInf = parameters.getOrElse("positiveInf", "Inf")
val negativeInf = parameters.getOrElse("negativeInf", "-Inf")
val compressionCodec: Option[String] = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CompressionCodecs.getCodecClassName)
}
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
val maxColumns = getInt("maxColumns", 20480)
val maxCharsPerColumn = getInt("maxCharsPerColumn", -1)
val escapeQuotes = getBool("escapeQuotes", true)
val quoteAll = getBool("quoteAll", false)
val inputBufferSize = 128
val isCommentSet = this.comment != '\u0000'
def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
format.setComment(comment)
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
writerSettings.setNullValue(nullValue)
writerSettings.setEmptyValue(nullValue)
writerSettings.setSkipEmptyLines(true)
writerSettings.setQuoteAllFields(quoteAll)
writerSettings.setQuoteEscapingEnabled(escapeQuotes)
writerSettings
}
def asParserSettings: CsvParserSettings = {
val settings = new CsvParserSettings()
val format = settings.getFormat
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
format.setComment(comment)
settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
settings.setReadInputOnSeparateThread(false)
settings.setInputBufferSize(inputBufferSize)
settings.setMaxColumns(maxColumns)
settings.setNullValue(nullValue)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings
}
}
テキストリンク:http://dyingbleed.com/spark-ii-5/