Delta Lakeインポート

19269 ワード

DataBricksは、データ処理フレームワークApacheとファイルフォーマットApache Parquetを開発し、クラウド環境に適したData Lakeとは何かを考えています.
  • データ
  • の分割方法
  • 理想的なファイルサイズ
  • を設定する方法
  • モード
  • の拡張方法
  • 圧縮プログラム
  • の作成方法
  • 失敗したETLジョブ
  • を復元する方法
  • オリジナルデータストリームをデータ湖
  • に送る方法
    これらの問題を解決する過程で,Delta Lakeという優れたオープンソースストレージを発見し,Spark Submit上でDelta Lakeを使用する5つの理由を発表した.[ リファレンス ]
    最も重要なのは、Delta Lakeが変化するトランザクション・ログを保持しているため、ACIDに基づいて読み取り/書き込み操作を行うことができます.
    私がデルタ湖に導入した環境は:
  • AWS EMR + structured streaming + pyspark
  • AWS S3
  • AWS Athena
  • Delta Lakeを導入するメリットは次のとおりです.
  • Structured streamingはs 3にデータをロードすると、小さなファイルが多すぎるため、読み取り性能が非常に悪い.幸いなことに、デルタ湖の圧縮機能を利用して、ファイルサイズを適切に調整することができます.
  • Delta Lakeのmerge機能により、過去のデータをすばやく埋め戻すことができます.
  • Appendonlyログを使用すると、データ・レプリケーションの問題を解決できます.
  • では、Delta Lakeを導入するために必要なコードレベルの作業についてお話しします.

    1. Download Delta Core


    まず、Mavenから正しいバージョンのDelta-coreをダウンロードします.
    curl "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar" -O

    2. Spark Submit with Jars


    spark-コミット時にダウンロードしたjarファイルがコミットされます.
    spark-submit --master yarn --deploy-mode cluster \
    	--jars "delta-core_2.12-1.0.0.jar" \
    	...
  • Pythonでdelta-sparkライブラリを使用する場合は、jarファイルを-py-filesオプションとして渡します.
  • またはアプリケーションでは、jarファイルパスをspark.SparkContext.addPyFile()に入れるとインポートできます.
  • builder = SparkSession.builder \
        .config(conf=SparkConf()) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .enableHiveSupport()
    
    spark = builder.getOrCreate()
    
    spark.sparkContext.addPyFile("s3://ridi-emr/jars/delta-core_2.12-1.0.0.jar")

    3. Spark SQL Extension


    Spark SQL DDL拡張子を追加してDelta Tableを処理します.
    from pyspark.sql import SparkSession
    
    builder = SparkSession.builder \
    	.config(conf=SparkConf()) \
    	.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .enableHiveSupport()
    
    spark = builder.getOrCreate()
    sc = spark.sparkContext

    4. Create Delta Table

  • Delta Tableを作成し、初期オプションを指定します.
  • テーブル名でhive metastoreまたは
  • に登録
  • delta. s3://your_locationなどの位置識別子を使用できます.
  • 2delta.compatibility.symlinkFormatManifest.enabled=trueに設定されている場合、後述するインベントリファイルは、データ挿入が発生するたびに更新され得る.
  • CREATE TABLE IF NOT EXISTS delta.`s3://your_location` (
    	`key` STRING,
    	`value` STRING,
    	`topic` STRING,
    	`timestamp` TIMESTAMP,
        `date` STRING
    )
    USING DELTA
    PARTITIONED BY (date)
    LOCATION 's3://your_location/'
    TBLPROPERTIES (
        'delta.compatibility.symlinkFormatManifest.enabled'='true'
    )

    5. Generate Manifest

  • Athenaから外部テーブルを読み込むにはmanifestファイルが必要です.
  • の最初の1番を実行する必要があります.
  • GENERATE symlink_format_manifest FOR TABLE **delta.`s3://your_location`**

    6. Create Athena Table

  • Athenaにリアルタイムに関連付けられた外部テーブルを作成します.
  • Athenaのほか、Spark Extended SQLはDDLもサポートしています.
  • このようにして作成されたテーブルはインベントリファイルを読み込むので、Spark SQLを使用してDQLを行うことはできません.
  • より前に作成されたインベントリファイルのパスを指定します.
  • CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
        key STRING,
    	value STRING,
    	topic STRING,
    	timestamp TIMESTAMP
    )
    PARTITIONED BY (date STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS
    INPUTFORMAT
        'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT
        'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://your_location/_symlink_format_manifest/'

    7. Repair Athena Table

  • AthenaスキャンS 3は、新しいパーティションをテーブルメタデータに反映する.
  • データは
  • の新しいパーティションに入り、インベントリファイルのパーティションが更新された後に実行する必要があります.
  • 日のバッチ・ユニット.
  • は、後述するパーティション投影法を用いることが好ましい.
  • MSCK REPAIR TABLE database.table

    8. Partition Projection

  • MSCK REPAIR法には以下のような欠点がある.
    データは、正常に動作するには、
  • パーティションに1回書き込みする必要があります.
  • フルスキャン
  • テーブル.
  • 日のバッチスケジュールに注意してください.
  • は、MSCK REPAIRではなく、より効率的なパーティション更新方法を提供する.
  • ADD PARTIONクエリー
  • MSCK REPARDと異なり、パーティションを追加する際にデータを入力する必要はありません.
  • は、クエリを1日単位で送信する必要があります.
  • AWS GLUEのパーティションスキーム
  • Hive Metastoreでテーブル属性のパーティション投影オプションを指定すると、Metastoreからメタデータ情報を取得することなく、Athenaでパーティションを作成できます.
  • Delta Lake + Partition Projection
    オプションは、UIによって
  • AWS GLUEに設定されてもよいし、6.Create Athena Tableからテーブル属性に渡されてもよい.
  • AthenaはDelta Lakeのmanifestを参照するため、パネルスタックのパスではなくmanifest更新のパスとして位置を指定する必要があります.
  • CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
        key STRING,
    	value STRING,
    	topic STRING,
    	timestamp TIMESTAMP
    )
    PARTITIONED BY (date STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS
    INPUTFORMAT
        'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT
        'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://your_location/_symlink_format_manifest/'
    TBLPROPERTIES (
        'projection.enabled'='true',
        'projection.date.type'='date',
        'projection.date.format'='yyyy-MM-dd',
        'projection.date.range'=NOW-2YEARS,NOW+1DAY',
        'projection.date.interval'='1',
        'projection.date.interval.unit'='DAYS'
    )

    9. Read and Write


    Sparkでテーブルを保存またはロードする場合は、位置ベースの識別子を使用する必要があります.
  • Read
  • spark.read.format("delta") \
        .load("s3://your_location") \
        .where("date='2022-04-17'")
    SELECT *
    FROM delta.`s3://your_location`
    WHERE date='2021-11-25'
  • Write
  • df.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("date") \
        .save("s3://your_location")

    10. Compact Files

  • オープンソースデルタでは自動最適化はサポートされていません.
  • https://github.com/delta-io/delta/issues/815
  • 圧縮操作により、ファイル数が減少し、
  • ではなく読み取り性能が向上します.
  • partition_clause = "date='2022-04-17'"
    num_partitions = 10
    
    spark.read.format("delta") \
        .load("s3://your_location") \
        .where(partition_clause) \
        .repartition(num_partitions) \
        .write \
        .format("delta") \
        .option("dataChange", "false") \
        .option("replaceWhere", partition_clause) \
        .mode("overwrite") \
        .save("s3://your_location")

    11. Vacuum

  • Deltaテーブルに関連付けられた場所で参照されなくなったファイルを消去します.
  • nを超えるデータを削除します.削除条件は、ファイルの最後の変更時間ではなく、トランザクション・ログが切断された時間から開始されます.
  • の現在のパフォーマンスは非常に遅い.
  • 2spark.conf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")の設定を追加することで、並列削除を行うことができます.
  • VACUUM delta.`s3://your_location` RETAIN 72 HOURS