เริ่มใช้งาน SQLベースの挿入、削除、およびupsertsโดยใช้ AWS接着剤3.0และ デルタ湖


  • クレジットは、情報錬金術でカイルデータサイエンスの専門家によって書かれた、それは素晴らしい作品です!
  • โพสต์นี้เราจะมาดูวิธีการใช้ スパークSQLエンジンโดยใช้ AWS接着剤ในการทำ を返します.และ インサート
    AWS Glue คืออะไร?
    ในเดือนนี้ AWSออก 接着剤เวอร์ชั่น 3.0!AWS接着剤3.0สิ่งที่เพิ่มมาคือ การทำให้ パフォーマンスApacheスパーク3.1สำหรับงานที่เป็นการประมวลผลแบบแบตช์และสตรีม เพิ่มความเร็วทั้งในส่วนการทำ 摂食・加工และ 積分​! ลองมาดู ベンチマークกัน


    ได้ 思いやりのเพิ่มขึ้นหลายเท่า!

    目次
  • Architecture Diagram
  • Pre-requisites
  • Format to Delta
  • Upsert
  • Delete
  • Insert
  • Partitioned Data
  • Conclusion

  • アーキテクチャ図
    デルタ湖คืออะไร
    ストレージ層オープンソースプロジェクトที่อำนวยความสะดวกให้เราสามารถสร้าง レイクハウス建築บน データレイクโดยคุณสมบัติประกอบไปด้วย รองรับ 酸取引คล้าย RDBMSการจัดการ メタデータและสามารถสร้าง ストリーミングหรือ バッチ処理บนพวก データ湖貯留ต่างๆได้เช่น アマゾンS 3
    รายละเอียดเพิ่มเติมที่นี่
    https://docs.delta.io/latest/delta-intro.html
    ทฏษดีพอละ! ลอง ビルドเล่นกันดู! เราจะสร้าง フローง่ายๆคือจะเอาข้อมูลใน CSVファイルมาแล้ว アップロードไปที่ S 3バケットแล้วจะ プロセスโดยใช้ AWS接着剤!หลังจากนั้นจะ クエリข้อมูลผ่าน アマゾンアテナ

    事前の要件
  • โหลด the Delta Lake package ลิ้งนี้ - หา ファイルในตารางที่ลงท้ายด้วย .ジャー
  • AWSアカウント❗ 接着剤ไม่อยู่ใน フリーティアนะจ๊ะ..
  • โหลดข้อมูลก่อนเลย! ลิ้งนี้
  • コードสามารถอ้างจาก ギタブเจ้าของเดิมได้ ที่นี่

  • デルタ表形式

    อันดับแรกเราจะ convert dataset ของเราก่อน

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก スパークスクリプトエディタ
  • เลือก Boilerplateコードで新しいスクリプトを作成する
  • # Import the packages
    from delta import *
    from pyspark.sql.session import SparkSession
    
    # Initialize Spark Session along with configs for Delta Lake
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    # Read Source
    inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')
    
    # Write data as a DELTA TABLE
    inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")
    
    # Read Source
    updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')
    
    # Write data as a DELTA TABLE
    updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    หลักๆคือการ 変換ตัว データเป็น デルタ形式ซึ่งทำ 2データセットที่เป็นข้อมูลตั้งต้นกับข้อมูลที่จะใช้ 更新.หลังจาก 変換เราจะมีการสร้าง symlinkマニフェストファイルซึ่งเราจะใช้ในการดูผ่าน アテナ
    ลองสร้าง テーブルใน Athena บน デルタ湖貯留
    CREATE EXTERNAL TABLE IF NOT EXISTS superstore ( 
        row_id STRING,
        order_id STRING,
        order_date STRING,
        ship_date STRING,
        ship_mode STRING,
        customer_id STRING,
        customer_name STRING,
        segment STRING,
        country STRING,
        city STRING,
        state STRING,
        postal_code STRING,
        region STRING,
        product_id STRING,
        category STRING,
        sub_category STRING,
        product_name STRING,
        sales STRING,
        quantity STRING,
        discount STRING,
        profit STRING
    ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
    LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
    
    ลอง クエリผ่าน Athena
    select * from superstore
    


    上昇する

    Upsert คือกระบวนการในการ inserts rows เข้าไปใน db table ถ้าข้อมูลนั้นยังไม่มี หรือ update ข้อมูลถ้าข้อมูลนั้นมีแล้ว ในตัวอย่างนี้เราจะ update ค่าบาง rows ของ ship_mode, customer_name, sales, and profit columns

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก スパークスクリプトエディタ
  • เลือก Boilerplateコードで新しいスクリプトを作成する
  • # Import as always
    from delta import *
    from pyspark.sql.session import SparkSession
    
    # Initialize Spark Session along with configs for Delta Lake
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    updateDF = spark.sql("""
    
    MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
    USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
    ON superstore.row_id = updates.row_id
    WHEN MATCHED THEN
      UPDATE SET *
    WHEN NOT MATCHED
      THEN INSERT *
    """)
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    คำสั่งด้านบนนั้นเราะทำการ 更新ข้อมูลบน テーブルหลัก (スーパー店)ถ้ามีการพบข้อมูลใน 更新テーブル(更新)จากการ マッチングค่าใน ローワンIDถ้า ローワンIDตรงกันจะทำการ すべて更新.ถ้าไม่มีจะทำการ 全て挿入する
    สามารถดูรายละเอียดได้ ที่นี่

    削除

    ลองทำการลบข้อมูลกันมั่ง! Delete จะค่อนข้างตรงไปตรงมา..

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก スパークスクリプトエディタ
  • เลือก Boilerplateコードで新しいスクリプトを作成する
  • from delta import *
    from pyspark.sql.session import SparkSession
    
    
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    deleteDF = spark.sql("""
    DELETE 
    FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore 
    WHERE CAST(superstore.row_id as integer) <= 20
    """)
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(
        spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    
    หลักๆ コードด้านบนคือลบข้อมูลโดย ローワンID
    ลอง クエリผ่าน Athena
    SELECT * 
    FROM "default"."superstore" 
    -- Need to CAST hehe bec it is currently a STRING
    ORDER BY CAST(row_id as integer); 
    

    เราจะเห็น レコードหายไปละ データレイクกลายเป็น デルタจริงๆ!

    挿入

    คล้ายๆ Deletes, Inserts ค่อนข้างตรงไปตรงมา มาลองดู

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก スパークスクリプトエディタ
  • เลือก Boilerplateコードで新しいスクリプトを作成する
  • from delta import *
    from pyspark.sql.session import SparkSession
    
    
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    
    insertDF = spark.sql("""
    INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
    SELECT *
    FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
    WHERE CAST(row_id as integer) <= 20
    """)
    
    # Generate MANIFEST file for Athena/Catalog
    deltaTable = DeltaTable.forPath(
        spark, "s3a://delta-lake-aws-glue-demo/current/")
    deltaTable.generate("symlink_format_manifest")
    

    分割データ

    ทำไปหมดเกือบทุกกระบวนท่า แต่นี่เป็นแค่ dataset ง่ายๆ ซึ่งชีวิตจริงเราต้องคำนึงถึงการทำ parition ด้วยนะ! ทำไมต้องทำหละ คิดดูข้อมูลเราใหญ่นะ เราต้องหั่นไปชิ้นๆเพื่อเลือกดึงเฉพาะข้อมูลที่ใช้สิ, การทำ parition นี่จะช่วยเรา 2 เรื่องใหญ่ๆเลยคือ Performance กับ Cost กับบาง services ที่ใช้ในการ Scan data อย่าง Athena

    เรามีการแก้ไขบางอย่างใน Spark code ในการ write data เป็น parition ทำให้โครงสร้างการเก็บข้อมูลใน S3 เป็นแบบนี้


    แนวคิดของ デルタ湖ขึ้นกับ ログ履歴โดยตัว デルタ湖จะมีการสร้าง デルタログทุกๆการ コミット取引
    โดย デルタログนั้นจะมี デルタファイルซึ่งเก็บเป็น JSONซึ่งในนั้นเก็บข้อมูลของการทำที่เกิดขึ้นและรายละเอียดของ snapshptファイルล่าสุด และมีการเก็บข้อมูลเกี่ยวกับ 統計ของข้อมูลเช่นกัน
  • จาก Data Floq
  • เช่นตัวอย่างข้างล่าง

    RAW DATERAND PART = 2014 - 08 - 27/


    現在のDateRound部分

    ถ้าเราลองเปิด parquetファイルดู

    เราจะเห็นว่า コードเราทำให้เกิดการเขียน parquetファイルใหม่ระหว่างการ 削除โดย ローワンIDตั้งแต่ 21ขึ้นไปไม่ได้ถูกลบตามเงื่อนไข หลังจากนั้น JSONファイルก็จะทำการ マップเข้ากับ ファイルที่เพิ่งถูกสร้างขึ้นมาใหม่ในการทำ versioning
    เราสามารถสร้าง テーブルที่มีการทำ パーティションแล้วใน Athena ตามคำสั่งด้านล่าง
    CREATE EXTERNAL TABLE IF NOT EXISTS superstore ( 
        row_id STRING,
        order_id STRING,
        order_date STRING,
        ship_date STRING,
        ship_mode STRING,
        customer_id STRING,
        customer_name STRING,
        segment STRING,
        country STRING,
        city STRING,
        state STRING,
        postal_code STRING,
        region STRING,
        product_id STRING,
        category STRING,
        sub_category STRING,
        product_name STRING,
        sales STRING,
        quantity STRING,
        discount STRING,
        profit STRING,
        date_part STRING
    
    )
    -- Add PARTITIONED BY option
    PARTITIONED BY (date_part STRING)
    
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
    LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
    
    หลังจากนั้นให้รันคำสั่งด้านล่างเพื่อทำการ メタデータの更新ใน カタログเพื่อให้เราสามารถ クエリข้อมูลใน 教義ได้
    MSCK修理
    パーティションを追加します.

    ✅ 結論 จบแล้ว!