いい香りですね.PySpark統合Apache Hudi実戦
6818 ワード
1.準備
HudiはSpark-2.xバージョンをサポートしています.次のリンクをクリックしてSparkをインストールし、pysparkを使用して起動できます. spark-avroモジュールは--packages表示指定 spark-avroとsparkのバージョンは に一致する必要があります.本例では、spark-avro_に依存する2.11なのでscala 2.11を使用してhudi-spark-bundleを構築し、spark-avro_を使用すると2.12、hudi-spark-bundle_を使用する必要があります.2.12
いくつかの前変数の初期化を行います
ここで、DataGeneratorは、ストロークschemaに基づいて挿入および削除されたサンプルデータを生成するために使用することができる.
2.データの挿入
いくつかの新しいストロークデータを生成し、DataFrameにロードし、DataFrameをHudiテーブルに書き込む
3.クエリーデータ
データ・フレームへのデータのロード
このクエリは、パーティションパスフォーマットが
4.データの更新
新しいデータの挿入と同様に、DataGeneratorを使用して更新データを生成し、DataFrameを使用してHudiテーブルに書き込みます.
現在の保存モードは
5.インクリメンタルクエリ
Hudiは、指定されたcommit時間以降の変更をプルできるインクリメンタルプル能力を提供し、終了時間を指定しないと最新の変更がプルされます.
これは、クエリーが開始時間にコミットされた後のすべての変更を示します.このインクリメンタル・プル機能は、バッチ・データ上でフロー・パイプを構築できます.
6.特定の時点でのクエリー
すなわち、特定の時間のデータをどのように照会するかは、終了時間を特定のコミット時間に、開始時間を「000」(最も早いコミット時間を表す)に指定することによって、特定の時間を表すことができる.
7.データの削除
入力されたHoodieKeyセットを削除します.注意:削除操作はappendモードのみをサポートします.
8.まとめ
このブログでは、pysparkを使用してHudiテーブルを挿入、削除、更新する方法を示しています.pysparkとHudiのニーズがあるパートナーは試してみてください.
HudiはSpark-2.xバージョンをサポートしています.次のリンクをクリックしてSparkをインストールし、pysparkを使用して起動できます.
# pyspark
export PYSPARK_PYTHON=$(which python3)
spark-2.4.4-bin-hadoop2.7/bin/pyspark \
--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
いくつかの前変数の初期化を行います
# pyspark
tableName = "hudi_trips_cow"
basePath = "file:///tmp/hudi_trips_cow"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
ここで、DataGeneratorは、ストロークschemaに基づいて挿入および削除されたサンプルデータを生成するために使用することができる.
2.データの挿入
いくつかの新しいストロークデータを生成し、DataFrameにロードし、DataFrameをHudiテーブルに書き込む
# pyspark
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'insert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
mode(Overwrite)
は、データセットを上書きして再作成します.例では、各パーティションにおいてストローク記録が一意であることを保証するために、プライマリ・キー(schemaのuuid
)、パーティション・フィールド(region/county/city
)、および組合せフィールド(schemaのts
)が提供される.3.クエリーデータ
データ・フレームへのデータのロード
# pyspark
tripsSnapshotDF = spark. \
read. \
format("hudi"). \
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
このクエリは、パーティションパスフォーマットが
region/country/city
であるため、基本パス(basepath)からload(basePath + "/*/*/*/*")
を使用してデータをロードする読み取り最適化ビューを提供します.4.データの更新
新しいデータの挿入と同様に、DataGeneratorを使用して更新データを生成し、DataFrameを使用してHudiテーブルに書き込みます.
# pyspark
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
現在の保存モードは
append
です.通常、データセットの作成を最初に試みない限り、常に追加モードを使用します.各書き込み操作は、タイムスタンプで表される新しいcommitを生成します.5.インクリメンタルクエリ
Hudiは、指定されたcommit時間以降の変更をプルできるインクリメンタルプル能力を提供し、終了時間を指定しないと最新の変更がプルされます.
# pyspark
# reload data
spark. \
read. \
format("hudi"). \
load(basePath + "/*/*/*/*"). \
createOrReplaceTempView("hudi_trips_snapshot")
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in
# incrementally query data
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': beginTime,
}
tripsIncrementalDF = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
これは、クエリーが開始時間にコミットされた後のすべての変更を示します.このインクリメンタル・プル機能は、バッチ・データ上でフロー・パイプを構築できます.
6.特定の時点でのクエリー
すなわち、特定の時間のデータをどのように照会するかは、終了時間を特定のコミット時間に、開始時間を「000」(最も早いコミット時間を表す)に指定することによって、特定の時間を表すことができる.
# pyspark
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 2]
# query point in time data
point_in_time_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.end.instanttime': endTime,
'hoodie.datasource.read.begin.instanttime': beginTime
}
tripsPointInTimeDF = spark.read.format("hudi"). \
options(**point_in_time_read_options). \
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
7.データの削除
入力されたHoodieKeyセットを削除します.注意:削除操作はappendモードのみをサポートします.
# pyspark
# fetch total records count
spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
# fetch two records to be deleted
ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
# issue deletes
hudi_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 'uuid']).withColumn('ts', lit(0.0))
df.write.format("hudi"). \
options(**hudi_delete_options). \
mode("append"). \
save(basePath)
# run the same read query as above.
roAfterDeleteViewDF = spark. \
read. \
format("hudi"). \
load(basePath + "/*/*/*/*")
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
# fetch should return (total - 2) records
spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
8.まとめ
このブログでは、pysparkを使用してHudiテーブルを挿入、削除、更新する方法を示しています.pysparkとHudiのニーズがあるパートナーは試してみてください.