Delta Lakeインポート
19269 ワード
DataBricksは、データ処理フレームワークApacheとファイルフォーマットApache Parquetを開発し、クラウド環境に適したData Lakeとは何かを考えています.データ の分割方法理想的なファイルサイズ を設定する方法モード の拡張方法圧縮プログラム の作成方法失敗したETLジョブ を復元する方法オリジナルデータストリームをデータ湖 に送る方法
これらの問題を解決する過程で,Delta Lakeという優れたオープンソースストレージを発見し,Spark Submit上でDelta Lakeを使用する5つの理由を発表した.[ リファレンス ]
最も重要なのは、Delta Lakeが変化するトランザクション・ログを保持しているため、ACIDに基づいて読み取り/書き込み操作を行うことができます.
私がデルタ湖に導入した環境は: AWS EMR + structured streaming + pyspark AWS S3 AWS Athena Delta Lakeを導入するメリットは次のとおりです. Structured streamingはs 3にデータをロードすると、小さなファイルが多すぎるため、読み取り性能が非常に悪い.幸いなことに、デルタ湖の圧縮機能を利用して、ファイルサイズを適切に調整することができます. Delta Lakeのmerge機能により、過去のデータをすばやく埋め戻すことができます. Appendonlyログを使用すると、データ・レプリケーションの問題を解決できます. では、Delta Lakeを導入するために必要なコードレベルの作業についてお話しします.
まず、Mavenから正しいバージョンのDelta-coreをダウンロードします.
spark-コミット時にダウンロードしたjarファイルがコミットされます. Pythonで またはアプリケーションでは、jarファイルパスを
Spark SQL DDL拡張子を追加してDelta Tableを処理します. Delta Tableを作成し、初期オプションを指定します. テーブル名でhive metastoreまたは に登録 delta. 2 Athenaから外部テーブルを読み込むにはmanifestファイルが必要です. の最初の1番を実行する必要があります. Athenaにリアルタイムに関連付けられた外部テーブルを作成します. Athenaのほか、Spark Extended SQLはDDLもサポートしています. このようにして作成されたテーブルはインベントリファイルを読み込むので、Spark SQLを使用してDQLを行うことはできません. より前に作成されたインベントリファイルのパスを指定します. AthenaスキャンS 3は、新しいパーティションをテーブルメタデータに反映する. データはの新しいパーティションに入り、インベントリファイルのパーティションが更新された後に実行する必要があります. 日のバッチ・ユニット. は、後述するパーティション投影法を用いることが好ましい. MSCK REPAIR法には以下のような欠点がある.
データは、正常に動作するには、 パーティションに1回書き込みする必要があります. フルスキャンテーブル. 日のバッチスケジュールに注意してください. は、MSCK REPAIRではなく、より効率的なパーティション更新方法を提供する. ADD PARTIONクエリー MSCK REPARDと異なり、パーティションを追加する際にデータを入力する必要はありません. は、クエリを1日単位で送信する必要があります. AWS GLUEのパーティションスキーム Hive Metastoreでテーブル属性のパーティション投影オプションを指定すると、Metastoreからメタデータ情報を取得することなく、Athenaでパーティションを作成できます. Delta Lake + Partition Projection
オプションは、UIによって AWS GLUEに設定されてもよいし、 AthenaはDelta Lakeのmanifestを参照するため、パネルスタックのパスではなくmanifest更新のパスとして位置を指定する必要があります.
Sparkでテーブルを保存またはロードする場合は、位置ベースの識別子を使用する必要があります. Read Write オープンソースデルタでは自動最適化はサポートされていません. https://github.com/delta-io/delta/issues/815 圧縮操作により、ファイル数が減少し、ではなく読み取り性能が向上します. Deltaテーブルに関連付けられた場所で参照されなくなったファイルを消去します. nを超えるデータを削除します.削除条件は、ファイルの最後の変更時間ではなく、トランザクション・ログが切断された時間から開始されます. の現在のパフォーマンスは非常に遅い. 2
これらの問題を解決する過程で,Delta Lakeという優れたオープンソースストレージを発見し,Spark Submit上でDelta Lakeを使用する5つの理由を発表した.[ リファレンス ]
最も重要なのは、Delta Lakeが変化するトランザクション・ログを保持しているため、ACIDに基づいて読み取り/書き込み操作を行うことができます.
私がデルタ湖に導入した環境は:
1. Download Delta Core
まず、Mavenから正しいバージョンのDelta-coreをダウンロードします.
curl "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar" -O
2. Spark Submit with Jars
spark-コミット時にダウンロードしたjarファイルがコミットされます.
spark-submit --master yarn --deploy-mode cluster \
--jars "delta-core_2.12-1.0.0.jar" \
...
delta-spark
ライブラリを使用する場合は、jarファイルを-py-filesオプションとして渡します.spark.SparkContext.addPyFile()
に入れるとインポートできます.builder = SparkSession.builder \
.config(conf=SparkConf()) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport()
spark = builder.getOrCreate()
spark.sparkContext.addPyFile("s3://ridi-emr/jars/delta-core_2.12-1.0.0.jar")
3. Spark SQL Extension
Spark SQL DDL拡張子を追加してDelta Tableを処理します.
from pyspark.sql import SparkSession
builder = SparkSession.builder \
.config(conf=SparkConf()) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport()
spark = builder.getOrCreate()
sc = spark.sparkContext
4. Create Delta Table
s3://your_location
などの位置識別子を使用できます.delta.compatibility.symlinkFormatManifest.enabled=true
に設定されている場合、後述するインベントリファイルは、データ挿入が発生するたびに更新され得る.CREATE TABLE IF NOT EXISTS delta.`s3://your_location` (
`key` STRING,
`value` STRING,
`topic` STRING,
`timestamp` TIMESTAMP,
`date` STRING
)
USING DELTA
PARTITIONED BY (date)
LOCATION 's3://your_location/'
TBLPROPERTIES (
'delta.compatibility.symlinkFormatManifest.enabled'='true'
)
5. Generate Manifest
GENERATE symlink_format_manifest FOR TABLE **delta.`s3://your_location`**
6. Create Athena Table
CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
key STRING,
value STRING,
topic STRING,
timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
7. Repair Athena Table
MSCK REPAIR TABLE database.table
8. Partition Projection
データは、正常に動作するには、
オプションは、UIによって
6.Create Athena Table
からテーブル属性に渡されてもよい.CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
key STRING,
value STRING,
topic STRING,
timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.date.type'='date',
'projection.date.format'='yyyy-MM-dd',
'projection.date.range'=NOW-2YEARS,NOW+1DAY',
'projection.date.interval'='1',
'projection.date.interval.unit'='DAYS'
)
9. Read and Write
Sparkでテーブルを保存またはロードする場合は、位置ベースの識別子を使用する必要があります.
spark.read.format("delta") \
.load("s3://your_location") \
.where("date='2022-04-17'")
SELECT *
FROM delta.`s3://your_location`
WHERE date='2021-11-25'
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save("s3://your_location")
10. Compact Files
partition_clause = "date='2022-04-17'"
num_partitions = 10
spark.read.format("delta") \
.load("s3://your_location") \
.where(partition_clause) \
.repartition(num_partitions) \
.write \
.format("delta") \
.option("dataChange", "false") \
.option("replaceWhere", partition_clause) \
.mode("overwrite") \
.save("s3://your_location")
11. Vacuum
spark.conf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
の設定を追加することで、並列削除を行うことができます.VACUUM delta.`s3://your_location` RETAIN 72 HOURS
Reference
この問題について(Delta Lakeインポート), 我々は、より多くの情報をここで見つけました https://velog.io/@ivoryrabbit/Delta-Lake-도입기テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol