Spark SQL外部データソース(External Data Source)および一般的な操作
31434 ワード
1概要 Spark1.2でSpark SQLが外部データソースの正式なサポートを開始しました.Spark SQLは、開発者が実現できるように、外部データソースにアクセスする一連のインタフェースをオープンしています.Spark SQLは、mysql、hive、hdfs、hbaseなど、どこのデータもロードでき、json、parquet、avro、csvなどの多くのフォーマットをサポートします.Spark SQLに接続するために任意の外部データソースを開発し、外部データソースAPIを介して操作することができます. 外部データソースAPIを通じて各種フォーマットのデータを読み取ると、DataFrameが得られます.これは私たちがよく知っている方法ですね.DataFrameのAPIまたはSQLのAPIを使って操作することができます. 外部データソースのAPIは自動的にいくつかの列の裁断をすることができて、何が列の裁断を叫ぶことができて、あなたは1つのuser表のようにid、name、age、genderの4つの列があって、selectをする時あなたはid、nameのこのいくつかの列だけを必要として、それではその他の列は下層の最適化を通じて私达に裁断します. 保存操作SaveModeを使用して、既存のデータが存在する場合の保存方法を指定できます.
2 jsonファイルの読み込み
shellを起動してテストする
3 parquetデータの読み出し
4 hiveのデータを読み込む
5データの保存
注意:保存されたフォルダは存在しません.そうしないと、エラーが発生します(デフォルトでは、異なるモードを選択できます): テキストフォーマットにエラーが発生し、1列しかエラーが発生しません.そうしないと、エラーが発生します:
データの保存は、ディレクトリがすでに存在する場合、デフォルトのモードでエラーが発生すると説明しましたが、保存するいくつかのモードについて説明します.
保存モード
意味
SaveMode.ErrorIfExists (default)
データFrameをデータソースに保存する場合、データが既に存在する場合、例外が放出されることが予想されます.
SaveMode.Append
データ・フレームをデータ・ソースに保存する場合、データ/テーブルが既に存在する場合、データ・フレームの内容は既存のデータに添付される予定です.
SaveMode.Overwrite
上書きモードとは、データ・フレームをデータ・ソースに保存する場合、データ/テーブルが既に存在する場合、既存のデータがデータ・フレームの内容で上書きされることを意味します.
SaveMode.Ignore
無視モードとは、データ・フレームをデータ・ソースに保存する場合、データがすでに存在する場合、保存操作ではデータ・フレームの内容は保存されず、既存のデータも変更されないことを意味します.これはSQLのCREATE TABLE IF NOT EXISTSと似ています.
6 mysqlのデータを読み込む
7 spark SQL操作mysqlテーブルデータ
8パーティション推定(PartitionDiscovery)
テーブルパーティションは、Hiveのようなシステムで使用される一般的な最適化方法です.パーティションテーブルでは、通常、データは異なるディレクトリに格納され、パーティション列値は各パーティションディレクトリのパスで符号化されます.すべての内蔵ファイルソース(Text/CSV/JSON/ORC/Parquetを含む)は、パーティション情報を自動的に検出および推定できます.たとえば、次のディレクトリ構造を作成します.spark sqlを使用して外部データソースを読み込みます: 読み取りディレクトリを変更
皆さんは何か見つけましたか?Spark SQLは、パスからパーティション情報を自動的に抽出します.パーティション列のデータ型は自動的に推定されます.現在、数値データ型、日付、タイムスタンプ、文字列タイプがサポートされています.パーティション列のデータ型を自動的に推定したくない場合があります.これらの例では、自動タイプ推定は
Spark 1.6.0から開始し、デフォルトでは、パーティション発見は指定されたパスの下でのみパーティションが見つかります.上記の例では、ユーザがSparkSessionにパス/到/table/gender=maleを渡す.read....read.loadでは、性別はパーティション列と見なされません.パーティション検出を開始する基本パスを指定する必要がある場合は、データ・ソース・オプションでbasePathを設定できます.たとえば、path/to/table/gender=maleがデータパスであり、basePathをpath/to/table/に設定すると、性別はパーティション列になります.
9 Schemaマージ(Schema Merging)
Protocol Buffer、Avro、Thriftと同様に、ParquetタイプのファイルでもSchemaマージ操作がサポートされています.ユーザーは複数のParquetファイルをマージできます.Parquetデータ・ソースは、このような状況を自動的に検出し、これらのファイルのパターンをすべてマージできるようになりました.
モードマージは比較的高価な操作であり、ほとんどの場合必要ではないため、デフォルトでは1.5.0からオフにします.次の例に示すように、Parquetファイルを読み込むときにデータソースオプション設定しない場合 設定されている場合: hdfs上のファイル
2 jsonファイルの読み込み
shellを起動してテストする
//
val df=spark.read.format("json").load("path")
//
spark.read.json("path")
?
/**
* Loads a JSON file and returns the results as a `DataFrame`.
*
* See the documentation on the overloaded `json()` method with varargs for more details.
*
* @since 1.4.0
*/
def json(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
json(Seq(path): _*)
}
josn() overloaded ,
def json(paths: String*): DataFrame = format("json").load(paths : _*)
,
scala> val df=spark.read.format("json").load("file:///opt/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
3 parquetデータの読み出し
val df=spark.read.format("parquet").load("file:///opt/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
4 hiveのデータを読み込む
spark.sql("show tables").show
+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|states_raw| false|
| default|states_seq| false|
| default| t1| false|
+--------+----------+-----------+
spark.table("states_raw").show
+-----+------+
| code| name|
+-----+------+
|hello| java|
|hello|hadoop|
|hello| hive|
|hello| sqoop|
|hello| hdfs|
|hello| spark|
+-----+------+
scala> spark.sql("select name from states_raw ").show
+------+
| name|
+------+
| java|
|hadoop|
| hive|
| sqoop|
| hdfs|
| spark|
+------+
5データの保存
注意:
org.apache.spark.sql.AnalysisException: path file:/home/hadoop/data already exists.;
org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;
val df=spark.read.format("json").load("file:///opt/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
//
df.select("name").write.format("text").save("file:///home/hadoop/data/out")
:
[hadoop@hadoop out]$ pwd
/home/hadoop/data/out
[hadoop@hadoop out]$ ll
total 4
-rw-r--r--. 1 hadoop hadoop 20 Apr 24 00:34 part-00000-ed7705d2-3fdd-4f08-a743-5bc355471076-c000.txt
-rw-r--r--. 1 hadoop hadoop 0 Apr 24 00:34 _SUCCESS
[hadoop@hadoop out]$ cat part-00000-ed7705d2-3fdd-4f08-a743-5bc355471076-c000.txt
Michael
Andy
Justin
// json
df.write.format("json").save("file:///home/hadoop/data/out1")
[hadoop@hadoop data]$ cd out1
[hadoop@hadoop out1]$ ll
total 4
-rw-r--r--. 1 hadoop hadoop 71 Apr 24 00:35 part-00000-948b5b30-f104-4aa4-9ded-ddd70f1f5346-c000.json
-rw-r--r--. 1 hadoop hadoop 0 Apr 24 00:35 _SUCCESS
[hadoop@hadoop out1]$ cat part-00000-948b5b30-f104-4aa4-9ded-ddd70f1f5346-c000.json
{"name":"Michael"}
{"age":30,"name":"Andy"}
{"age":19,"name":"Justin"}
データの保存は、ディレクトリがすでに存在する場合、デフォルトのモードでエラーが発生すると説明しましたが、保存するいくつかのモードについて説明します.
保存モード
意味
SaveMode.ErrorIfExists (default)
データFrameをデータソースに保存する場合、データが既に存在する場合、例外が放出されることが予想されます.
SaveMode.Append
データ・フレームをデータ・ソースに保存する場合、データ/テーブルが既に存在する場合、データ・フレームの内容は既存のデータに添付される予定です.
SaveMode.Overwrite
上書きモードとは、データ・フレームをデータ・ソースに保存する場合、データ/テーブルが既に存在する場合、既存のデータがデータ・フレームの内容で上書きされることを意味します.
SaveMode.Ignore
無視モードとは、データ・フレームをデータ・ソースに保存する場合、データがすでに存在する場合、保存操作ではデータ・フレームの内容は保存されず、既存のデータも変更されないことを意味します.これはSQLのCREATE TABLE IF NOT EXISTSと似ています.
6 mysqlのデータを読み込む
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306")
.option("dbtable", "basic01.tbls")
.option("user", "root")
.option("password", "123456")
.load()
scala> jdbcDF.printSchema
root
|-- TBL_ID: long (nullable = false)
|-- CREATE_TIME: integer (nullable = false)
|-- DB_ID: long (nullable = true)
|-- LAST_ACCESS_TIME: integer (nullable = false)
|-- OWNER: string (nullable = true)
|-- RETENTION: integer (nullable = false)
|-- SD_ID: long (nullable = true)
|-- TBL_NAME: string (nullable = true)
|-- TBL_TYPE: string (nullable = true)
|-- VIEW_EXPANDED_TEXT: string (nullable = true)
|-- VIEW_ORIGINAL_TEXT: string (nullable = true)
jdbcDF.show
7 spark SQL操作mysqlテーブルデータ
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306",
dbtable "basic01.tbls",
user 'root',
password '123456',
driver "com.mysql.jdbc.Driver"
);
:
show tables;
default states_raw false
default states_seq false
default t1 false
jdbctable true
select * from jdbctable;
1 1519944170 6 0 hadoop 0 1 page_views MANAGED_TABLE NULL NULL
2 1519944313 6 0 hadoop 0 2 page_views_bzip2 MANAGED_TABLE NULL NULL
3 1519944819 6 0 hadoop 0 3 page_views_snappy MANAGED_TABLE NULL NULL
21 1520067771 6 0 hadoop 0 21 tt MANAGED_TABLE NULL NULL
22 1520069148 6 0 hadoop 0 22 page_views_seq MANAGED_TABLE NULL NULL
23 1520071381 6 0 hadoop 0 23 page_views_rcfile MANAGED_TABLE NULL NULL
24 1520074675 6 0 hadoop 0 24 page_views_orc_zlib MANAGED_TABLE NULL NULL
27 1520078184 6 0 hadoop 0 27 page_views_lzo_index MANAGED_TABLE NULL NULL
30 1520083461 6 0 hadoop 0 30 page_views_lzo_index1 MANAGED_TABLE NULL NULL
31 1524370014 1 0 hadoop 0 31 t1 EXTERNAL_TABLE NULL NULL
37 1524468636 1 0 hadoop 0 37 states_raw MANAGED_TABLE NULL NULL
38 1524468678 1 0 hadoop 0 38 states_seq MANAGED_TABLE NULL NULL
mysql tbls jdbctable 。
8パーティション推定(PartitionDiscovery)
テーブルパーティションは、Hiveのようなシステムで使用される一般的な最適化方法です.パーティションテーブルでは、通常、データは異なるディレクトリに格納され、パーティション列値は各パーティションディレクトリのパスで符号化されます.すべての内蔵ファイルソース(Text/CSV/JSON/ORC/Parquetを含む)は、パーティション情報を自動的に検出および推定できます.たとえば、次のディレクトリ構造を作成します.
hdfs dfs -mkdir -p /user/hive/warehouse/gender=male/country=CN
json :
people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
hdfs dfs -put people.json /user/hive/warehouse/gender=male/country=CN
val df=spark.read.format("json").load("/user/hive/warehouse/gender=male/country=CN/people.json")
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
val df=spark.read.format("json").load("/user/hive/warehouse/gender=male/")
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- country: string (nullable = true)
scala> df.show
+----+-------+-------+
| age| name|country|
+----+-------+-------+
|null|Michael| CN|
| 30| Andy| CN|
| 19| Justin| CN|
+----+-------+-------+
皆さんは何か見つけましたか?Spark SQLは、パスからパーティション情報を自動的に抽出します.パーティション列のデータ型は自動的に推定されます.現在、数値データ型、日付、タイムスタンプ、文字列タイプがサポートされています.パーティション列のデータ型を自動的に推定したくない場合があります.これらの例では、自動タイプ推定は
spark.sql.sources.partitionColumnTypeInference.enabled
で構成され、デフォルトはtrueである.タイプ推定を無効にすると、文字列タイプがパーティション列に使用されます.Spark 1.6.0から開始し、デフォルトでは、パーティション発見は指定されたパスの下でのみパーティションが見つかります.上記の例では、ユーザがSparkSessionにパス/到/table/gender=maleを渡す.read....read.loadでは、性別はパーティション列と見なされません.パーティション検出を開始する基本パスを指定する必要がある場合は、データ・ソース・オプションでbasePathを設定できます.たとえば、path/to/table/gender=maleがデータパスであり、basePathをpath/to/table/に設定すると、性別はパーティション列になります.
9 Schemaマージ(Schema Merging)
Protocol Buffer、Avro、Thriftと同様に、ParquetタイプのファイルでもSchemaマージ操作がサポートされています.ユーザーは複数のParquetファイルをマージできます.Parquetデータ・ソースは、このような状況を自動的に検出し、これらのファイルのパターンをすべてマージできるようになりました.
モードマージは比較的高価な操作であり、ほとんどの場合必要ではないため、デフォルトでは1.5.0からオフにします.次の例に示すように、Parquetファイルを読み込むときにデータソースオプション
mergeSchema
をtrueに設定するか、グローバルSQLオプションspark.sql.parquet.mergeSchema
をtrueに設定できます.val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("/schema_merge/test_table/key=1")
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("/schema_merge/test_table/key=2")
val mergedDF = spark.read.format("parquet").option("mergeSchema", "true").load("/schema_merge/test_table")
mergedDF.printSchema()
mergedDF.show
option("mergeSchema", "true")
+-----+------+---+
|value|square|key|
+-----+------+---+
| 1| 1| 1|
| 2| 4| 1|
| 3| 9| 1|
| 4| 16| 1|
| 5| 25| 1|
| 6| null| 2|
| 7| null| 2|
| 8| null| 2|
| 9| null| 2|
| 10| null| 2|
+-----+------+---+
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
| 3| 9|null| 1|
| 4| 16|null| 1|
| 5| 25|null| 1|
| 8| null| 512| 2|
| 9| null| 729| 2|
| 10| null|1000| 2|
| 1| 1|null| 1|
| 2| 4|null| 1|
| 6| null| 216| 2|
| 7| null| 343| 2|
+-----+------+----+---+
[root@hadoop ~]# hdfs dfs -ls /schema_merge/test_table
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2018-04-28 21:47 /schem_merge/test_table/key=1
drwxr-xr-x - hadoop supergroup 0 2018-04-28 21:48 /schem_merge/test_table/key=2