Whyを用いたAWS S 3からの信頼できる摂食
11231 ワード
これはオリジナルHudiブログhereの再投稿です.
このポストでは、新しいデータファイルを確実に効率的に処理する新しいdeltastreamerソースについて話します.
今日現在、S 3からHudiにデータを摂取するために、ユーザーは最大の修正時間に基づく最後のチェックポイントから修正されたソースファイルをpath selectorが識別するDFSソースを活用します.
このアプローチに関する問題は、修正時間精度がS 3でupto秒であるということです.おそらく2番目のファイルには、設定可能なソースの制限を許可するファイルを超えて変更される可能性がありますいくつかのファイルがスキップされる可能性があります.詳細はHUDI-1723を参照.
回避策は、ソースの制限を無視し、読み続けることですが、問題は、ユーザーが確実にS 3から摂取できるように再設計する動機となりました.
デザイン
秒の粒度が十分でないユースケースの場合は、ログベースのアプローチを使用してDeltaStreamに新しいソースがあります.
新しいS3 events sourceは、S 3から摂取するために、変化通知と増分処理に頼ります.
アーキテクチャは以下の図に示す通りです.
このアプローチでは、ユーザーはenable S3 event notificationsにする必要があります.
以下の詳細については2種類のdeltastreamersがあります.
S3EventsSource : Hudi S 3メタデータテーブルを作成します.
このソースは、ソースバケットからファイルイベントを購読するAWS SNSとSQSサービスを活用します.
SQSからのイベントはこのテーブルに書かれます.そして、それは後続のインクリメンタル引き数のためのchangelogとして機能します. イベントがS 3メタデータテーブルにコミットされるとき、はSQSから削除されます.
S3EventsHoodieIncrSourceとS 3 EventsSourceによって書かれたメタデータテーブルを使用します. S 3メタデータテーブルを読み込み、追加または変更されたオブジェクトを取得します.これらのオブジェクトには、追加または変更されたソースファイルのS 3パスが含まれます. は、S 3バケツのソースファイルに対応するソースデータでHudiテーブルに書きます.
利点
デカップリング:パイプラインのあらゆるステップは、切り離されます.つのソースは互いに独立して起動することができます.私たちは、ほとんどのユーザーが与えられたバケツのすべての変化を得るために一つのdeltastreamerを実行して、それから複数のテーブルを外へ出すことができると想像します.
パフォーマンスとスケール:以前のアプローチは、すべてのファイルをリスト化するために使用され、変更時間でソートし、チェックポイントに基づいてフィルタリングします.パーティションのパスをプルダウンしていたが、ディレクトリのリストはボトルネックになった.変更通知とネイティブの雲APIに頼ることによって、新しいアプローチは、ディレクトリのリストとスケールを回避するファイルの数を摂取している.
信頼性:最大修正時刻とメタデータテーブルにS 3イベントが記録されているという事実にもはや依存しないので、ユーザーはすべてのイベントが最終的に処理されることを安心することができます.
フォールトトレランス:この設計では2つのレベルの故障toeranceがあります.まず、いくつかのメッセージがS 3メタデータテーブルにコミットされていない場合、それらのメッセージはキューに残ります.次に、インクリメンタル引き数が失敗した場合、ユーザーは最後のコミットポイントに対してS 3メタデータテーブルを問い合わせることができ、そのポイントからインクリメンタル引き数を再開することができます(Kafkaの消費者がどのようにオフセットをリセットするかのように).
非同期バックフィル:ログベースのアプローチでは、はるかに簡単にバックフィルをトリガするようになります.詳細は、「結論と将来の仕事」セクションを見てください.
設定とセットアップ
ユーザーは、S 3 EventsSource(メタデータソース)を起動するには、SQSキューURLと領域名を指定する必要があります.
パイプラインを設定するには、まずenable S3 event notificationsを設定します.
ダウンロード.
次に、S 3 EventsSourceとS 3 EventScreeneInRourceを起動します.
まとめと今後の課題
このポストは、S 3からHudiテーブルへのデータを確実かつ効率的に摂取するためのログベースのアプローチを導入しました.我々は積極的に以下の方向に沿ってこれを改善している.仕事の1つのストリームは、Googleのクラウドストレージ、Azure Blobストレージなどのような他のクラウドベースのオブジェクトストレージのサポートを追加することです. 仕事のもう一つの流れは、ユーザーがセットアップ通知をして、もはや必要でないとき、リソースを削除するのを許す資源マネージャーを加えることです. 別の興味深い作品は非同期バックフィルをサポートすることです.通知システムは、常に一貫していて、一般的にすぐにすべてのファイルの完全な配達を保証しません.ログベースのアプローチでは、設定可能な間隔で自動バックフィルをトリガするのに十分な柔軟性を提供しています一日一日や週に一度. この問題についての活発な発展についてもっと知るために、この
に従ってください.
我々は、コミュニティからの貢献を楽しみにしています.このポストを楽しんでください.
あなたのhudiを入れて、ストリーミングを維持!
このポストでは、新しいデータファイルを確実に効率的に処理する新しいdeltastreamerソースについて話します.
今日現在、S 3からHudiにデータを摂取するために、ユーザーは最大の修正時間に基づく最後のチェックポイントから修正されたソースファイルをpath selectorが識別するDFSソースを活用します.
このアプローチに関する問題は、修正時間精度がS 3でupto秒であるということです.おそらく2番目のファイルには、設定可能なソースの制限を許可するファイルを超えて変更される可能性がありますいくつかのファイルがスキップされる可能性があります.詳細はHUDI-1723を参照.
回避策は、ソースの制限を無視し、読み続けることですが、問題は、ユーザーが確実にS 3から摂取できるように再設計する動機となりました.
デザイン
秒の粒度が十分でないユースケースの場合は、ログベースのアプローチを使用してDeltaStreamに新しいソースがあります.
新しいS3 events sourceは、S 3から摂取するために、変化通知と増分処理に頼ります.
アーキテクチャは以下の図に示す通りです.
このアプローチでは、ユーザーはenable S3 event notificationsにする必要があります.
以下の詳細については2種類のdeltastreamersがあります.
S3EventsSource : Hudi S 3メタデータテーブルを作成します.
このソースは、ソースバケットからファイルイベントを購読するAWS SNSとSQSサービスを活用します.
SQSからの
S3EventsHoodieIncrSourceとS 3 EventsSourceによって書かれたメタデータテーブルを使用します.
利点
デカップリング:パイプラインのあらゆるステップは、切り離されます.つのソースは互いに独立して起動することができます.私たちは、ほとんどのユーザーが与えられたバケツのすべての変化を得るために一つのdeltastreamerを実行して、それから複数のテーブルを外へ出すことができると想像します.
パフォーマンスとスケール:以前のアプローチは、すべてのファイルをリスト化するために使用され、変更時間でソートし、チェックポイントに基づいてフィルタリングします.パーティションのパスをプルダウンしていたが、ディレクトリのリストはボトルネックになった.変更通知とネイティブの雲APIに頼ることによって、新しいアプローチは、ディレクトリのリストとスケールを回避するファイルの数を摂取している.
信頼性:最大修正時刻とメタデータテーブルにS 3イベントが記録されているという事実にもはや依存しないので、ユーザーはすべてのイベントが最終的に処理されることを安心することができます.
フォールトトレランス:この設計では2つのレベルの故障toeranceがあります.まず、いくつかのメッセージがS 3メタデータテーブルにコミットされていない場合、それらのメッセージはキューに残ります.次に、インクリメンタル引き数が失敗した場合、ユーザーは最後のコミットポイントに対してS 3メタデータテーブルを問い合わせることができ、そのポイントからインクリメンタル引き数を再開することができます(Kafkaの消費者がどのようにオフセットをリセットするかのように).
非同期バックフィル:ログベースのアプローチでは、はるかに簡単にバックフィルをトリガするようになります.詳細は、「結論と将来の仕事」セクションを見てください.
設定とセットアップ
ユーザーは、S 3 EventsSource(メタデータソース)を起動するには、SQSキューURLと領域名を指定する必要があります.
hoodie.deltastreamer.s3.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url
hoodie.deltastreamer.s3.source.queue.region=us-west-2
特定の要件に合うように調整できるメタデータソースには、他にいくつかの設定があります.hoodie.deltastreamer.s3.source.queue.long.poll.wait
:値は[ 0 , 20 ]秒の範囲でできます.0に設定された場合、メタデータソースは短いポーリングを使用してSQSからメッセージを消費します.偽の空の応答を減らして、SQSを使うコストを減らすので、長いポーリングを使うことを勧められます.デフォルトでは、この値は20秒に設定されます.hoodie.deltastreamer.s3.source.queue.visibility.timeout
:値は[ 0 , 43200 ]秒(すなわち最大12時間)の範囲でできます.SQSは自動的に消費されるメッセージを削除しません.コミット後にメッセージを削除するのはメタデータソースの責任です.SQSは、それが設定されたタイムアウト期間の間、見えなくなる間、消費されたメッセージを機内の状態に動かします.デフォルトでは、この値は30秒に設定されます.hoodie.deltastreamer.s3.source.queue.max.messages.per.batch
:メタデータソースの1ラウンドのバッチでのメッセージの最大数.デフォルトでは、この値は5に設定されます.ダウンロード.
次に、S 3 EventsSourceとS 3 EventScreeneInRourceを起動します.
# To start S3EventsSource
spark-submit \
--jars "/home/hadoop/hudi-utilities-bundle_2.11-0.9.0.jar,/usr/lib/spark/external/lib/spark-avro.jar,/home/hadoop/aws-java-sdk-sqs-1.12.22.jar" \
--master yarn --deploy-mode client \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar \
--table-type COPY_ON_WRITE --source-ordering-field eventTime \
--target-base-path s3://bucket_name/path/for/s3_meta_table \
--target-table s3_meta_table --continuous \
--min-sync-interval-seconds 10 \
--hoodie-conf hoodie.datasource.write.recordkey.field="s3.object.key,eventName" \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field=s3.bucket.name --enable-hive-sync \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.hive_sync.database=default \
--hoodie-conf hoodie.datasource.hive_sync.table=s3_meta_table \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=bucket \
--source-class org.apache.hudi.utilities.sources.S3EventsSource \
--hoodie-conf hoodie.deltastreamer.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url
--hoodie-conf hoodie.deltastreamer.s3.source.queue.region=us-west-2
# To start S3EventsHoodieIncrSource
spark-submit \
--jars "/home/hadoop/hudi-utilities-bundle_2.11-0.9.0.jar,/usr/lib/spark/external/lib/spark-avro.jar,/home/hadoop/aws-java-sdk-sqs-1.12.22.jar" \
--master yarn --deploy-mode client \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field eventTime --target-base-path s3://bucket_name/path/for/s3_hudi_table \
--target-table s3_hudi_table --continuous --min-sync-interval-seconds 10 \
--hoodie-conf hoodie.datasource.write.recordkey.field="pull_request_id" \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field=s3.bucket.name --enable-hive-sync \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.hive_sync.database=default \
--hoodie-conf hoodie.datasource.hive_sync.table=s3_hudi_v6 \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=bucket \
--source-class org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=s3://bucket_name/path/for/s3_meta_table \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
aws-java-sdk-sqsまとめと今後の課題
このポストは、S 3からHudiテーブルへのデータを確実かつ効率的に摂取するためのログベースのアプローチを導入しました.我々は積極的に以下の方向に沿ってこれを改善している.
我々は、コミュニティからの貢献を楽しみにしています.このポストを楽しんでください.
あなたのhudiを入れて、ストリーミングを維持!
Reference
この問題について(Whyを用いたAWS S 3からの信頼できる摂食), 我々は、より多くの情報をここで見つけました https://dev.to/bytearray/reliable-ingestion-from-aws-s3-using-hudi-20gkテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol