AWS接着剤3.0とデルタ湖を使用しているS 3のSQLベースの挿入物、削除とupserts


このブログ記事の目的は、どのように使用できるかを示すことですSpark SQL Engine 為すUPSERTS , DELETES , and INSERTS . 基本的に、更新.
今月始め、ブログ投稿をしましたPySpark . 以下をチェックしてください.


But, what if we want it to make it more simple and familiar?


今月、AWSリリースGlue version 3.0 ! AWS接着剤3.0はバッチおよびストリーム処理のためにパフォーマンス最適化されたApache Surf 3.1ランタイムを導入する.新しいエンジンは、データの摂取、処理および統合を高速化し、データの湖を水和させることができますし、迅速にデータから洞察力を抽出します.


しかし、これは大きな取引ですか?
さて、火花エンジンの一般的なパフォーマンスの向上の多くは別として、それはまた、現在の2010年の最新バージョンをサポートすることができますDelta Lake . 最も顕著なものはSupport for SQL Insert, Delete, Update and Merge .
あなたがデルタ湖が何であるかについて、わからないならば、あなたはそれが何であるかという一般的な考えを持つために上記の私のブログ柱をチェックすることができます.
デモを続けましょう!

目次

  • Architecture Diagram
  • Format to Delta
  • Upsert
  • Delete
  • Insert
  • Partitioned Data
  • Conclusion
  • ✅ アーキテクチャ図



    これは基本的に私たちが何をしているかの簡単なプロセスの流れです.私たちはsample csv ファイルを読み込みますS3 Bucket 次に、Glue . (オプション)その後、お気に入りのBIツールに接続することができます.

    ❗ 事前の要件


    しかし、我々がそれに着く前に、我々は若干の前仕事をする必要があります.
  • デルタ湖パッケージhere - 少しハードスポットが、探してFiles テーブルの中でjar
  • AWSアカウント❗ 接着剤ETLはフリー層に含まれていません
  • サンプルデータのダウンロードhere - しかし、あなたは私自身を使うことができます、しかし、私はこれを使います
  • コードは、私のもので見つかりますGitHub Repository
  • ✅ デルタ表形式
    まず最初に、それぞれのデータセットをデルタ形式に変換する必要があります.以下はこれを行うコードです.
    
    # Import the packages
    from delta import *
    from pyspark.sql.session import SparkSession
    
    # Initialize Spark Session along with configs for Delta Lake
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    # Read Source
    inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')
    
    # Write data as a DELTA TABLE
    inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")
    
    # Read Source
    updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')
    
    # Write data as a DELTA TABLE
    updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    ### OPTIONAL, UNCOMMENT IF YOU WANT TO VIEW ALSO THE DATA FOR UPDATES IN ATHENA
    ###
    # Generate MANIFEST file for Updates
    # updatesDeltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/updates_delta/")
    # updatesDeltaTable.generate("symlink_format_manifest")
    
    このコードは、データセットをdelta 形式.これは、両方のソースデータだけでなく、更新プログラムで行われます.
    生成後SYMLINK MANIFEST ファイルは、我々はアテナを介して表示することができます.SQLコードもリポジトリに含まれています

    🔀 茄子


    upsertは、inserts データベーステーブルへの行do not already exist , or updates 彼らがそうするならば、彼ら.
    この例では、2行分の値を更新しますship_mode , customer_name , sales , and profit . 私はちょうどランダムな文字スパムをしました、そして、私はそれを通してそれを考えませんでした😅.
    
    # Import as always
    from delta import *
    from pyspark.sql.session import SparkSession
    
    # Initialize Spark Session along with configs for Delta Lake
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    updateDF = spark.sql("""
    
    MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
    USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
    ON superstore.row_id = updates.row_id
    WHEN MATCHED THEN
      UPDATE SET *
    WHEN NOT MATCHED
      THEN INSERT *
    """)
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    ### OPTIONAL
    ## SQL-BASED GENERATION OF SYMLINK
    
    # spark.sql("""
    # GENERATE symlink_format_manifest 
    # FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
    # """)
    
    
    上のSQLコードupdates に基づいて更新テーブルにある現在のテーブルrow_id . そして、その条件を評価する.

    If row_id is matched, then UPDATE ALL the data. If not, then do an INSERT ALL.


    あなたが完全な操作意味論をチェックアウトするならMERGE を通して読むことができますthis
    その後、我々はMANIFEST ファイル再び.この世代のMANIFEST ファイルを自動的に以下のクエリを実行して更新するように設定することができます.
    ALTER TABLE delta.`<path-to-delta-table>` 
    SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
    
    より多くの情報を見つけることができますhere
    今すぐAthenaの更新テーブルを参照してくださいする必要があります.

    ❌ 削除する


    デルタ湖を介して削除は非常に簡単です.
    from delta import *
    from pyspark.sql.session import SparkSession
    
    
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    deleteDF = spark.sql("""
    DELETE 
    FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore 
    WHERE CAST(superstore.row_id as integer) <= 20
    """)
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(
        spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    ### OPTIONAL
    ## SQL-BASED GENERATION OF SYMLINK MANIFEST
    
    # spark.sql("""
    
    # GENERATE symlink_format_manifest 
    # FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
    
    # """)
    
    
    この操作は、row_id .
    SELECT * 
    FROM "default"."superstore" 
    -- Need to CAST hehe bec it is currently a STRING
    ORDER BY CAST(row_id as integer); 
    

    ⤴ インサート


    削除のように、挿入も非常に簡単です.
    
    from delta import *
    from pyspark.sql.session import SparkSession
    
    
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    insertDF = spark.sql("""
    INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
    SELECT *
    FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
    WHERE CAST(row_id as integer) <= 20
    """)
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(
        spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    ### OPTIONAL
    ## SQL-BASED GENERATION OF SYMLINK MANIFEST
    
    # spark.sql("""
    
    # GENERATE symlink_format_manifest 
    # FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
    
    # """)
    
    

    ❗ 分割データ


    簡単なデータセットのupsert、delete、insert操作を行いました.しかし、IRLはめったに起こりません.それで、我々がスパイスを上にして、分割されたデータにそれをするならば、どうですか?
    私は前に行ってスパークを介していくつかの分割を行ったpartitioned このバージョンを使うorder_date パーティションキーとして.S 3構造は次のようになります.

    ❗ あなたはどう思いますか.
    答えは:はい!また、分割されたデータでこれを行うこともできます.
    デルタ湖のコンセプトはlog history .
    デルタ湖は、各々のコミットされた取引のためにデルタ・ログを生成します.
    デルタログにはJSON ファイルの最新のスナップショットに関する操作や詳細についての情報があり、また、データの統計情報に関する情報も含まれます.
    デルタファイルは、順番にJSON ファイルと一緒にテーブルに起こったすべての変更のログを構成します.
    からData Floq
    以下の例でこれを見ることができます
    RAW DATERAND PART = 2014 - 08 - 27/

    現在の日付DELETED ROWS
    我々が開くならばparquet ファイル

    上の例から、我々のコードが新しいparquet 削除中のファイルexcluding 我々から濾過されるものdelete 操作.その後、JSON ファイルは、新しく生成されたparquet .
    さらに、Athenaで、テーブルが分割されている場合、スキーマの作成中にクエリで指定する必要があります
    
    CREATE EXTERNAL TABLE IF NOT EXISTS superstore ( 
        row_id STRING,
        order_id STRING,
        order_date STRING,
        ship_date STRING,
        ship_mode STRING,
        customer_id STRING,
        customer_name STRING,
        segment STRING,
        country STRING,
        city STRING,
        state STRING,
        postal_code STRING,
        region STRING,
        product_id STRING,
        category STRING,
        sub_category STRING,
        product_name STRING,
        sales STRING,
        quantity STRING,
        discount STRING,
        profit STRING,
        date_part STRING
    
    )
    -- Add PARTITIONED BY option
    PARTITIONED BY (date_part STRING)
    
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
    LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
    
    その後、MSCK REPAIR <table> to add the partitions .
    これらの手順をしないとエラーが出ます.

    ✅ 結論



    それだ!これは、SQL開発者になる素晴らしい時間です!を読んでいただきありがとうございます!あなたがこのポストで新しい何かを学んだという望み.
    デルタ湖を食べましたか.どのようなヒント、トリックやベストプラクティスは、コミュニティと共有できますか?下のコメントにあなたの考えを聞くのが大好きだ!
    ハッピーコーディング!