Spark SQL外部データソース(External Data Source)および一般的な操作


1概要
  • Spark1.2でSpark SQLが外部データソースの正式なサポートを開始しました.Spark SQLは、開発者が実現できるように、外部データソースにアクセスする一連のインタフェースをオープンしています.Spark SQLは、mysql、hive、hdfs、hbaseなど、どこのデータもロードでき、json、parquet、avro、csvなどの多くのフォーマットをサポートします.Spark SQLに接続するために任意の外部データソースを開発し、外部データソースAPIを介して操作することができます.
  • 外部データソースAPIを通じて各種フォーマットのデータを読み取ると、DataFrameが得られます.これは私たちがよく知っている方法ですね.DataFrameのAPIまたはSQLのAPIを使って操作することができます.
  • 外部データソースのAPIは自動的にいくつかの列の裁断をすることができて、何が列の裁断を叫ぶことができて、あなたは1つのuser表のようにid、name、age、genderの4つの列があって、selectをする時あなたはid、nameのこのいくつかの列だけを必要として、それではその他の列は下層の最適化を通じて私达に裁断します.
  • 保存操作SaveModeを使用して、既存のデータが存在する場合の保存方法を指定できます.

  • 2 jsonファイルの読み込み
    shellを起動してテストする
    //    
    val df=spark.read.format("json").load("path")
    //      
    spark.read.json("path")
    
                    ?
    /**
       * Loads a JSON file and returns the results as a `DataFrame`.
       *
       * See the documentation on the overloaded `json()` method with varargs for more details.
       *
       * @since 1.4.0
       */
      def json(path: String): DataFrame = {
        // This method ensures that calls that explicit need single argument works, see SPARK-16009
        json(Seq(path): _*)
      }
        josn()         overloaded ,      
     def json(paths: String*): DataFrame = format("json").load(paths : _*)
    scala> val df=spark.read.format("json").load("file:///opt/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
    
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    df.printSchema
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    
     df.show
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    

    3 parquetデータの読み出し
    val df=spark.read.format("parquet").load("file:///opt/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
    df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
    
    df.show
    +------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
    
    

    4 hiveのデータを読み込む
    spark.sql("show tables").show
    +--------+----------+-----------+
    |database| tableName|isTemporary|
    +--------+----------+-----------+
    | default|states_raw|      false|
    | default|states_seq|      false|
    | default|        t1|      false|
    +--------+----------+-----------+
    
    spark.table("states_raw").show
    +-----+------+
    | code|  name|
    +-----+------+
    |hello|  java|
    |hello|hadoop|
    |hello|  hive|
    |hello| sqoop|
    |hello|  hdfs|
    |hello| spark|
    +-----+------+
    
    scala> spark.sql("select name from states_raw ").show
    +------+
    |  name|
    +------+
    |  java|
    |hadoop|
    |  hive|
    | sqoop|
    |  hdfs|
    | spark|
    +------+
    

    5データの保存
    注意:
  • 保存されたフォルダは存在しません.そうしないと、エラーが発生します(デフォルトでは、異なるモードを選択できます):org.apache.spark.sql.AnalysisException: path file:/home/hadoop/data already exists.;
  • テキストフォーマットにエラーが発生し、1列しかエラーが発生しません.そうしないと、エラーが発生します:org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;
  • val df=spark.read.format("json").load("file:///opt/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
    //  
    df.select("name").write.format("text").save("file:///home/hadoop/data/out")[hadoop@hadoop out]$ pwd
    /home/hadoop/data/out
    [hadoop@hadoop out]$ ll
    total 4
    -rw-r--r--. 1 hadoop hadoop 20 Apr 24 00:34 part-00000-ed7705d2-3fdd-4f08-a743-5bc355471076-c000.txt
    -rw-r--r--. 1 hadoop hadoop  0 Apr 24 00:34 _SUCCESS
    [hadoop@hadoop out]$ cat part-00000-ed7705d2-3fdd-4f08-a743-5bc355471076-c000.txt 
    Michael
    Andy
    Justin
    
    
    //   json  
    df.write.format("json").save("file:///home/hadoop/data/out1")
    
      
    [hadoop@hadoop data]$ cd out1
    [hadoop@hadoop out1]$ ll
    total 4
    -rw-r--r--. 1 hadoop hadoop 71 Apr 24 00:35 part-00000-948b5b30-f104-4aa4-9ded-ddd70f1f5346-c000.json
    -rw-r--r--. 1 hadoop hadoop  0 Apr 24 00:35 _SUCCESS
    [hadoop@hadoop out1]$ cat part-00000-948b5b30-f104-4aa4-9ded-ddd70f1f5346-c000.json 
    {"name":"Michael"}
    {"age":30,"name":"Andy"}
    {"age":19,"name":"Justin"}
    
    
    

    データの保存は、ディレクトリがすでに存在する場合、デフォルトのモードでエラーが発生すると説明しましたが、保存するいくつかのモードについて説明します.
    保存モード
    意味
    SaveMode.ErrorIfExists (default)
    データFrameをデータソースに保存する場合、データが既に存在する場合、例外が放出されることが予想されます.
    SaveMode.Append
    データ・フレームをデータ・ソースに保存する場合、データ/テーブルが既に存在する場合、データ・フレームの内容は既存のデータに添付される予定です.
    SaveMode.Overwrite
    上書きモードとは、データ・フレームをデータ・ソースに保存する場合、データ/テーブルが既に存在する場合、既存のデータがデータ・フレームの内容で上書きされることを意味します.
    SaveMode.Ignore
    無視モードとは、データ・フレームをデータ・ソースに保存する場合、データがすでに存在する場合、保存操作ではデータ・フレームの内容は保存されず、既存のデータも変更されないことを意味します.これはSQLのCREATE TABLE IF NOT EXISTSと似ています.
    6 mysqlのデータを読み込む
    val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306")
    .option("dbtable", "basic01.tbls")
    .option("user", "root")
    .option("password", "123456")
    .load()
    
    scala> jdbcDF.printSchema
    root
     |-- TBL_ID: long (nullable = false)
     |-- CREATE_TIME: integer (nullable = false)
     |-- DB_ID: long (nullable = true)
     |-- LAST_ACCESS_TIME: integer (nullable = false)
     |-- OWNER: string (nullable = true)
     |-- RETENTION: integer (nullable = false)
     |-- SD_ID: long (nullable = true)
     |-- TBL_NAME: string (nullable = true)
     |-- TBL_TYPE: string (nullable = true)
     |-- VIEW_EXPANDED_TEXT: string (nullable = true)
     |-- VIEW_ORIGINAL_TEXT: string (nullable = true)
     
    jdbcDF.show
    
    

    7 spark SQL操作mysqlテーブルデータ
    CREATE TEMPORARY VIEW jdbcTable
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url "jdbc:mysql://localhost:3306",
      dbtable "basic01.tbls",
      user 'root',
      password '123456',
      driver "com.mysql.jdbc.Driver"
    );
    
      :
    show tables;
    default states_raw      false
    default states_seq      false
    default t1      false
    jdbctable       true
    
    select * from jdbctable;
    1       1519944170      6       0       hadoop  0       1       page_views      MANAGED_TABLE   NULL    NULL
    2       1519944313      6       0       hadoop  0       2       page_views_bzip2        MANAGED_TABLE   NULL    NULL
    3       1519944819      6       0       hadoop  0       3       page_views_snappy       MANAGED_TABLE   NULL    NULL
    21      1520067771      6       0       hadoop  0       21      tt      MANAGED_TABLE   NULL    NULL
    22      1520069148      6       0       hadoop  0       22      page_views_seq  MANAGED_TABLE   NULL    NULL
    23      1520071381      6       0       hadoop  0       23      page_views_rcfile       MANAGED_TABLE   NULL    NULL
    24      1520074675      6       0       hadoop  0       24      page_views_orc_zlib     MANAGED_TABLE   NULL    NULL
    27      1520078184      6       0       hadoop  0       27      page_views_lzo_index    MANAGED_TABLE   NULL    NULL
    30      1520083461      6       0       hadoop  0       30      page_views_lzo_index1   MANAGED_TABLE   NULL    NULL
    31      1524370014      1       0       hadoop  0       31      t1      EXTERNAL_TABLE  NULL    NULL
    37      1524468636      1       0       hadoop  0       37      states_raw      MANAGED_TABLE   NULL    NULL
    38      1524468678      1       0       hadoop  0       38      states_seq      MANAGED_TABLE   NULL    NULL
    
    mysql  tbls       jdbctable   。
    

    8パーティション推定(PartitionDiscovery)
    テーブルパーティションは、Hiveのようなシステムで使用される一般的な最適化方法です.パーティションテーブルでは、通常、データは異なるディレクトリに格納され、パーティション列値は各パーティションディレクトリのパスで符号化されます.すべての内蔵ファイルソース(Text/CSV/JSON/ORC/Parquetを含む)は、パーティション情報を自動的に検出および推定できます.たとえば、次のディレクトリ構造を作成します.
    hdfs dfs -mkdir -p /user/hive/warehouse/gender=male/country=CN
    
      json  :
    people.json 
    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    
     hdfs dfs -put people.json /user/hive/warehouse/gender=male/country=CN
    
  • spark sqlを使用して外部データソースを読み込みます:
  • val df=spark.read.format("json").load("/user/hive/warehouse/gender=male/country=CN/people.json")
    
    scala> df.printSchema
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    
    
    scala> df.show
    
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
  • 読み取りディレクトリを変更
  • val df=spark.read.format("json").load("/user/hive/warehouse/gender=male/")
    scala> df.printSchema
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
     |-- country: string (nullable = true)
    
    
    scala> df.show
    +----+-------+-------+
    | age|   name|country|
    +----+-------+-------+
    |null|Michael|     CN|
    |  30|   Andy|     CN|
    |  19| Justin|     CN|
    +----+-------+-------+
    

    皆さんは何か見つけましたか?Spark SQLは、パスからパーティション情報を自動的に抽出します.パーティション列のデータ型は自動的に推定されます.現在、数値データ型、日付、タイムスタンプ、文字列タイプがサポートされています.パーティション列のデータ型を自動的に推定したくない場合があります.これらの例では、自動タイプ推定はspark.sql.sources.partitionColumnTypeInference.enabledで構成され、デフォルトはtrueである.タイプ推定を無効にすると、文字列タイプがパーティション列に使用されます.
    Spark 1.6.0から開始し、デフォルトでは、パーティション発見は指定されたパスの下でのみパーティションが見つかります.上記の例では、ユーザがSparkSessionにパス/到/table/gender=maleを渡す.read....read.loadでは、性別はパーティション列と見なされません.パーティション検出を開始する基本パスを指定する必要がある場合は、データ・ソース・オプションでbasePathを設定できます.たとえば、path/to/table/gender=maleがデータパスであり、basePathをpath/to/table/に設定すると、性別はパーティション列になります.
    9 Schemaマージ(Schema Merging)
    Protocol Buffer、Avro、Thriftと同様に、ParquetタイプのファイルでもSchemaマージ操作がサポートされています.ユーザーは複数のParquetファイルをマージできます.Parquetデータ・ソースは、このような状況を自動的に検出し、これらのファイルのパターンをすべてマージできるようになりました.
    モードマージは比較的高価な操作であり、ほとんどの場合必要ではないため、デフォルトでは1.5.0からオフにします.次の例に示すように、Parquetファイルを読み込むときにデータソースオプションmergeSchemaをtrueに設定するか、グローバルSQLオプションspark.sql.parquet.mergeSchemaをtrueに設定できます.
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("/schema_merge/test_table/key=1") 
     
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("/schema_merge/test_table/key=2") 
    
    val mergedDF = spark.read.format("parquet").option("mergeSchema", "true").load("/schema_merge/test_table")
    
    mergedDF.printSchema()
    mergedDF.show
    
  • 設定しない場合option("mergeSchema", "true")
  • +-----+------+---+
    |value|square|key|
    +-----+------+---+
    |    1|     1|  1|
    |    2|     4|  1|
    |    3|     9|  1|
    |    4|    16|  1|
    |    5|    25|  1|
    |    6|  null|  2|
    |    7|  null|  2|
    |    8|  null|  2|
    |    9|  null|  2|
    |   10|  null|  2|
    +-----+------+---+
    
  • 設定されている場合:
  • +-----+------+----+---+
    |value|square|cube|key|
    +-----+------+----+---+
    |    3|     9|null|  1|
    |    4|    16|null|  1|
    |    5|    25|null|  1|
    |    8|  null| 512|  2|
    |    9|  null| 729|  2|
    |   10|  null|1000|  2|
    |    1|     1|null|  1|
    |    2|     4|null|  1|
    |    6|  null| 216|  2|
    |    7|  null| 343|  2|
    +-----+------+----+---+
    
  • hdfs上のファイル
  • [root@hadoop ~]# hdfs dfs -ls /schema_merge/test_table
    Found 2 items
    drwxr-xr-x   - hadoop supergroup          0 2018-04-28 21:47 /schem_merge/test_table/key=1
    drwxr-xr-x   - hadoop supergroup          0 2018-04-28 21:48 /schem_merge/test_table/key=2