เริ่มใช้งาน SQLベースの挿入、削除、およびupsertsโดยใช้ AWS接着剤3.0และ デルタ湖
AWS Glue คืออะไร?
ในเดือนนี้ AWSออก 接着剤เวอร์ชั่น 3.0!AWS接着剤3.0สิ่งที่เพิ่มมาคือ การทำให้ パフォーマンスApacheスパーク3.1สำหรับงานที่เป็นการประมวลผลแบบแบตช์และสตรีม เพิ่มความเร็วทั้งในส่วนการทำ 摂食・加工และ 積分! ลองมาดู ベンチマークกัน
ได้ 思いやりのเพิ่มขึ้นหลายเท่า!
目次
アーキテクチャ図
デルタ湖คืออะไร
ストレージ層オープンソースプロジェクトที่อำนวยความสะดวกให้เราสามารถสร้าง レイクハウス建築บน データレイクโดยคุณสมบัติประกอบไปด้วย รองรับ 酸取引คล้าย RDBMSการจัดการ メタデータและสามารถสร้าง ストリーミングหรือ バッチ処理บนพวก データ湖貯留ต่างๆได้เช่น アマゾンS 3
รายละเอียดเพิ่มเติมที่นี่
https://docs.delta.io/latest/delta-intro.html
ทฏษดีพอละ! ลอง ビルドเล่นกันดู! เราจะสร้าง フローง่ายๆคือจะเอาข้อมูลใน CSVファイルมาแล้ว アップロードไปที่ S 3バケットแล้วจะ プロセスโดยใช้ AWS接着剤!หลังจากนั้นจะ クエリข้อมูลผ่าน アマゾンアテナ
事前の要件
デルタ表形式
อันดับแรกเราจะ convert dataset ของเราก่อน
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')
# Write data as a DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")
# Read Source
updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')
# Write data as a DELTA TABLE
updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
หลักๆคือการ 変換ตัว データเป็น デルタ形式ซึ่งทำ 2データセットที่เป็นข้อมูลตั้งต้นกับข้อมูลที่จะใช้ 更新.หลังจาก 変換เราจะมีการสร้าง symlinkマニフェストファイルซึ่งเราจะใช้ในการดูผ่าน アテナลองสร้าง テーブルใน Athena บน デルタ湖貯留
CREATE EXTERNAL TABLE IF NOT EXISTS superstore (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
ลอง クエリผ่าน Athena select * from superstore
上昇する
Upsert คือกระบวนการในการ inserts rows เข้าไปใน db table ถ้าข้อมูลนั้นยังไม่มี หรือ update ข้อมูลถ้าข้อมูลนั้นมีแล้ว ในตัวอย่างนี้เราจะ update ค่าบาง rows ของ ship_mode, customer_name, sales, and profit columns
# Import as always
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
updateDF = spark.sql("""
MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
ON superstore.row_id = updates.row_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
คำสั่งด้านบนนั้นเราะทำการ 更新ข้อมูลบน テーブルหลัก (スーパー店)ถ้ามีการพบข้อมูลใน 更新テーブル(更新)จากการ マッチングค่าใน ローワンIDถ้า ローワンIDตรงกันจะทำการ すべて更新.ถ้าไม่มีจะทำการ 全て挿入するสามารถดูรายละเอียดได้ ที่นี่
削除
ลองทำการลบข้อมูลกันมั่ง! Delete จะค่อนข้างตรงไปตรงมา..
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
deleteDF = spark.sql("""
DELETE
FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
WHERE CAST(superstore.row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
หลักๆ コードด้านบนคือลบข้อมูลโดย ローワンIDลอง クエリผ่าน Athena
SELECT *
FROM "default"."superstore"
-- Need to CAST hehe bec it is currently a STRING
ORDER BY CAST(row_id as integer);
เราจะเห็น レコードหายไปละ データレイクกลายเป็น デルタจริงๆ!
挿入
คล้ายๆ Deletes, Inserts ค่อนข้างตรงไปตรงมา มาลองดู
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
insertDF = spark.sql("""
INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
SELECT *
FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
WHERE CAST(row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
分割データ
ทำไปหมดเกือบทุกกระบวนท่า แต่นี่เป็นแค่ dataset ง่ายๆ ซึ่งชีวิตจริงเราต้องคำนึงถึงการทำ parition ด้วยนะ! ทำไมต้องทำหละ คิดดูข้อมูลเราใหญ่นะ เราต้องหั่นไปชิ้นๆเพื่อเลือกดึงเฉพาะข้อมูลที่ใช้สิ, การทำ parition นี่จะช่วยเรา 2 เรื่องใหญ่ๆเลยคือ Performance กับ Cost กับบาง services ที่ใช้ในการ Scan data อย่าง Athena
เรามีการแก้ไขบางอย่างใน Spark code ในการ write data เป็น parition ทำให้โครงสร้างการเก็บข้อมูลใน S3 เป็นแบบนี้
แนวคิดของ デルタ湖ขึ้นกับ ログ履歴โดยตัว デルタ湖จะมีการสร้าง デルタログทุกๆการ コミット取引
โดย デルタログนั้นจะมี デルタファイルซึ่งเก็บเป็น JSONซึ่งในนั้นเก็บข้อมูลของการทำที่เกิดขึ้นและรายละเอียดของ snapshptファイルล่าสุด และมีการเก็บข้อมูลเกี่ยวกับ 統計ของข้อมูลเช่นกัน
RAW DATERAND PART = 2014 - 08 - 27/
現在のDateRound部分
ถ้าเราลองเปิด parquetファイルดู
เราจะเห็นว่า コードเราทำให้เกิดการเขียน parquetファイルใหม่ระหว่างการ 削除โดย ローワンIDตั้งแต่ 21ขึ้นไปไม่ได้ถูกลบตามเงื่อนไข หลังจากนั้น JSONファイルก็จะทำการ マップเข้ากับ ファイルที่เพิ่งถูกสร้างขึ้นมาใหม่ในการทำ versioning
เราสามารถสร้าง テーブルที่มีการทำ パーティションแล้วใน Athena ตามคำสั่งด้านล่าง
CREATE EXTERNAL TABLE IF NOT EXISTS superstore (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING,
date_part STRING
)
-- Add PARTITIONED BY option
PARTITIONED BY (date_part STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
หลังจากนั้นให้รันคำสั่งด้านล่างเพื่อทำการ メタデータの更新ใน カタログเพื่อให้เราสามารถ クエリข้อมูลใน 教義ได้MSCK修理
パーティションを追加します.
✅ 結論 จบแล้ว!
Reference
この問題について(เริ่มใช้งาน SQLベースの挿入、削除、およびupsertsโดยใช้ AWS接着剤3.0และ デルタ湖), 我々は、より多くの情報をここで見つけました https://dev.to/awscommunity-asean/sql-based-inserts-deletes-and-upserts-in-s3-aws-glue-3-0-delta-lake-4dafテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol