AWS接着剤3.0とデルタ湖を使用しているS 3のSQLベースの挿入物、削除とupserts
24837 ワード
このブログ記事の目的は、どのように使用できるかを示すことです
今月始め、ブログ投稿をしました
今月、AWSリリースGlue version 3.0 ! AWS接着剤3.0はバッチおよびストリーム処理のためにパフォーマンス最適化されたApache Surf 3.1ランタイムを導入する.新しいエンジンは、データの摂取、処理および統合を高速化し、データの湖を水和させることができますし、迅速にデータから洞察力を抽出します.
しかし、これは大きな取引ですか?
さて、火花エンジンの一般的なパフォーマンスの向上の多くは別として、それはまた、現在の2010年の最新バージョンをサポートすることができますDelta Lake . 最も顕著なものはSupport for SQL Insert, Delete, Update and Merge .
あなたがデルタ湖が何であるかについて、わからないならば、あなたはそれが何であるかという一般的な考えを持つために上記の私のブログ柱をチェックすることができます.
デモを続けましょう!
Architecture Diagram Format to Delta Upsert Delete Insert Partitioned Data Conclusion
これは基本的に私たちが何をしているかの簡単なプロセスの流れです.私たちは
しかし、我々がそれに着く前に、我々は若干の前仕事をする必要があります. デルタ湖パッケージhere - 少しハードスポットが、探して AWSアカウント❗ 接着剤ETLはフリー層に含まれていません
サンプルデータのダウンロードhere - しかし、あなたは私自身を使うことができます、しかし、私はこれを使います コードは、私のもので見つかりますGitHub Repository
✅ デルタ表形式
まず最初に、それぞれのデータセットをデルタ形式に変換する必要があります.以下はこれを行うコードです.
生成後
upsertは、
この例では、2行分の値を更新します
あなたが完全な操作意味論をチェックアウトするなら
その後、我々は
今すぐAthenaの更新テーブルを参照してくださいする必要があります.
デルタ湖を介して削除は非常に簡単です.
削除のように、挿入も非常に簡単です.
簡単なデータセットのupsert、delete、insert操作を行いました.しかし、IRLはめったに起こりません.それで、我々がスパイスを上にして、分割されたデータにそれをするならば、どうですか?
私は前に行ってスパークを介していくつかの分割を行った
❗ あなたはどう思いますか.
答えは:はい!また、分割されたデータでこれを行うこともできます.
デルタ湖のコンセプトは
デルタ湖は、各々のコミットされた取引のためにデルタ・ログを生成します.
デルタログには
デルタファイルは、順番に
からData Floq
以下の例でこれを見ることができます
RAW DATERAND PART = 2014 - 08 - 27/
現在の日付
我々が開くならば
上の例から、我々のコードが新しい
さらに、Athenaで、テーブルが分割されている場合、スキーマの作成中にクエリで指定する必要があります
これらの手順をしないとエラーが出ます.
それだ!これは、SQL開発者になる素晴らしい時間です!を読んでいただきありがとうございます!あなたがこのポストで新しい何かを学んだという望み.
デルタ湖を食べましたか.どのようなヒント、トリックやベストプラクティスは、コミュニティと共有できますか?下のコメントにあなたの考えを聞くのが大好きだ!
ハッピーコーディング!
Spark SQL Engine
為すUPSERTS
, DELETES
, and INSERTS
. 基本的に、更新.今月始め、ブログ投稿をしました
PySpark
. 以下をチェックしてください.AWS接着剤とデルタ湖を用いたアップセットと削除
AWSコミュニティASEANのためのカイルEscosia・ Jul 21・ 10分読む
#aws
#tutorial
#bigdata
#analytics
But, what if we want it to make it more simple and familiar?
今月、AWSリリースGlue version 3.0 ! AWS接着剤3.0はバッチおよびストリーム処理のためにパフォーマンス最適化されたApache Surf 3.1ランタイムを導入する.新しいエンジンは、データの摂取、処理および統合を高速化し、データの湖を水和させることができますし、迅速にデータから洞察力を抽出します.
しかし、これは大きな取引ですか?
さて、火花エンジンの一般的なパフォーマンスの向上の多くは別として、それはまた、現在の2010年の最新バージョンをサポートすることができますDelta Lake . 最も顕著なものはSupport for SQL Insert, Delete, Update and Merge .
あなたがデルタ湖が何であるかについて、わからないならば、あなたはそれが何であるかという一般的な考えを持つために上記の私のブログ柱をチェックすることができます.
デモを続けましょう!
目次
✅ アーキテクチャ図
これは基本的に私たちが何をしているかの簡単なプロセスの流れです.私たちは
sample csv
ファイルを読み込みますS3 Bucket
次に、Glue
. (オプション)その後、お気に入りのBIツールに接続することができます.❗ 事前の要件
しかし、我々がそれに着く前に、我々は若干の前仕事をする必要があります.
Files
テーブルの中でjar
まず最初に、それぞれのデータセットをデルタ形式に変換する必要があります.以下はこれを行うコードです.
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')
# Write data as a DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")
# Read Source
updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')
# Write data as a DELTA TABLE
updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL, UNCOMMENT IF YOU WANT TO VIEW ALSO THE DATA FOR UPDATES IN ATHENA
###
# Generate MANIFEST file for Updates
# updatesDeltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/updates_delta/")
# updatesDeltaTable.generate("symlink_format_manifest")
このコードは、データセットをdelta
形式.これは、両方のソースデータだけでなく、更新プログラムで行われます.生成後
SYMLINK MANIFEST
ファイルは、我々はアテナを介して表示することができます.SQLコードもリポジトリに含まれています🔀 茄子
upsertは、
inserts
データベーステーブルへの行do not already exist
, or updates
彼らがそうするならば、彼ら.この例では、2行分の値を更新します
ship_mode
, customer_name
, sales
, and profit
. 私はちょうどランダムな文字スパムをしました、そして、私はそれを通してそれを考えませんでした😅.
# Import as always
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
updateDF = spark.sql("""
MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
ON superstore.row_id = updates.row_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK
# spark.sql("""
# GENERATE symlink_format_manifest
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)
上のSQLコードupdates
に基づいて更新テーブルにある現在のテーブルrow_id
. そして、その条件を評価する.If
row_id
is matched, thenUPDATE ALL
the data. If not, then do anINSERT ALL
.
あなたが完全な操作意味論をチェックアウトするなら
MERGE
を通して読むことができますthis その後、我々は
MANIFEST
ファイル再び.この世代のMANIFEST
ファイルを自動的に以下のクエリを実行して更新するように設定することができます.ALTER TABLE delta.`<path-to-delta-table>`
SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
より多くの情報を見つけることができますhere 今すぐAthenaの更新テーブルを参照してくださいする必要があります.
❌ 削除する
デルタ湖を介して削除は非常に簡単です.
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
deleteDF = spark.sql("""
DELETE
FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
WHERE CAST(superstore.row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK MANIFEST
# spark.sql("""
# GENERATE symlink_format_manifest
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)
この操作は、row_id
.SELECT *
FROM "default"."superstore"
-- Need to CAST hehe bec it is currently a STRING
ORDER BY CAST(row_id as integer);
⤴ インサート
削除のように、挿入も非常に簡単です.
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
insertDF = spark.sql("""
INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
SELECT *
FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
WHERE CAST(row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK MANIFEST
# spark.sql("""
# GENERATE symlink_format_manifest
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)
❗ 分割データ
簡単なデータセットのupsert、delete、insert操作を行いました.しかし、IRLはめったに起こりません.それで、我々がスパイスを上にして、分割されたデータにそれをするならば、どうですか?
私は前に行ってスパークを介していくつかの分割を行った
partitioned
このバージョンを使うorder_date
パーティションキーとして.S 3構造は次のようになります.❗ あなたはどう思いますか.
答えは:はい!また、分割されたデータでこれを行うこともできます.
デルタ湖のコンセプトは
log history
.デルタ湖は、各々のコミットされた取引のためにデルタ・ログを生成します.
デルタログには
JSON
ファイルの最新のスナップショットに関する操作や詳細についての情報があり、また、データの統計情報に関する情報も含まれます.デルタファイルは、順番に
JSON
ファイルと一緒にテーブルに起こったすべての変更のログを構成します.からData Floq
以下の例でこれを見ることができます
RAW DATERAND PART = 2014 - 08 - 27/
現在の日付
DELETED ROWS
我々が開くならば
parquet
ファイル上の例から、我々のコードが新しい
parquet
削除中のファイルexcluding
我々から濾過されるものdelete
操作.その後、JSON
ファイルは、新しく生成されたparquet
.さらに、Athenaで、テーブルが分割されている場合、スキーマの作成中にクエリで指定する必要があります
CREATE EXTERNAL TABLE IF NOT EXISTS superstore (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING,
date_part STRING
)
-- Add PARTITIONED BY option
PARTITIONED BY (date_part STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
その後、MSCK REPAIR <table>
to add the partitions .これらの手順をしないとエラーが出ます.
✅ 結論
それだ!これは、SQL開発者になる素晴らしい時間です!を読んでいただきありがとうございます!あなたがこのポストで新しい何かを学んだという望み.
デルタ湖を食べましたか.どのようなヒント、トリックやベストプラクティスは、コミュニティと共有できますか?下のコメントにあなたの考えを聞くのが大好きだ!
ハッピーコーディング!
Reference
この問題について(AWS接着剤3.0とデルタ湖を使用しているS 3のSQLベースの挿入物、削除とupserts), 我々は、より多くの情報をここで見つけました https://dev.to/awscommunity-asean/sql-based-inserts-deletes-and-upserts-in-s3-using-aws-glue-3-0-and-delta-lake-42f0テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol