いい香りですね.PySpark統合Apache Hudi実戦

6818 ワード

1.準備
HudiはSpark-2.xバージョンをサポートしています.次のリンクをクリックしてSparkをインストールし、pysparkを使用して起動できます.
# pyspark
export PYSPARK_PYTHON=$(which python3)
spark-2.4.4-bin-hadoop2.7/bin/pyspark \
  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
  • spark-avroモジュールは--packages表示指定
  • spark-avroとsparkのバージョンは
  • に一致する必要があります.
  • 本例では、spark-avro_に依存する2.11なのでscala 2.11を使用してhudi-spark-bundleを構築し、spark-avro_を使用すると2.12、hudi-spark-bundle_を使用する必要があります.2.12

  • いくつかの前変数の初期化を行います
    # pyspark
    tableName = "hudi_trips_cow"
    basePath = "file:///tmp/hudi_trips_cow"
    dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    

    ここで、DataGeneratorは、ストロークschemaに基づいて挿入および削除されたサンプルデータを生成するために使用することができる.
    2.データの挿入
    いくつかの新しいストロークデータを生成し、DataFrameにロードし、DataFrameをHudiテーブルに書き込む
    # pyspark
    inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
    df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    
    hudi_options = {
      'hoodie.table.name': tableName,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': tableName,
      'hoodie.datasource.write.operation': 'insert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2, 
      'hoodie.insert.shuffle.parallelism': 2
    }
    
    df.write.format("hudi"). \
      options(**hudi_options). \
      mode("overwrite"). \
      save(basePath)
    
    mode(Overwrite)は、データセットを上書きして再作成します.例では、各パーティションにおいてストローク記録が一意であることを保証するために、プライマリ・キー(schemaのuuid)、パーティション・フィールド(region/county/city)、および組合せフィールド(schemaのts)が提供される.
    3.クエリーデータ
    データ・フレームへのデータのロード
    # pyspark
    tripsSnapshotDF = spark. \
      read. \
      format("hudi"). \
      load(basePath + "/*/*/*/*")
    
    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
    

    このクエリは、パーティションパスフォーマットがregion/country/cityであるため、基本パス(basepath)からload(basePath + "/*/*/*/*")を使用してデータをロードする読み取り最適化ビューを提供します.
    4.データの更新
    新しいデータの挿入と同様に、DataGeneratorを使用して更新データを生成し、DataFrameを使用してHudiテーブルに書き込みます.
    # pyspark
    updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
    df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.write.format("hudi"). \
      options(**hudi_options). \
      mode("append"). \
      save(basePath)
    

    現在の保存モードはappendです.通常、データセットの作成を最初に試みない限り、常に追加モードを使用します.各書き込み操作は、タイムスタンプで表される新しいcommitを生成します.
    5.インクリメンタルクエリ
    Hudiは、指定されたcommit時間以降の変更をプルできるインクリメンタルプル能力を提供し、終了時間を指定しないと最新の変更がプルされます.
    # pyspark
    # reload data
    spark. \
      read. \
      format("hudi"). \
      load(basePath + "/*/*/*/*"). \
      createOrReplaceTempView("hudi_trips_snapshot")
    
    commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
    beginTime = commits[len(commits) - 2] # commit time we are interested in
    
    # incrementally query data
    incremental_read_options = {
      'hoodie.datasource.query.type': 'incremental',
      'hoodie.datasource.read.begin.instanttime': beginTime,
    }
    
    tripsIncrementalDF = spark.read.format("hudi"). \
      options(**incremental_read_options). \
      load(basePath)
    tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
    
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
    

    これは、クエリーが開始時間にコミットされた後のすべての変更を示します.このインクリメンタル・プル機能は、バッチ・データ上でフロー・パイプを構築できます.
    6.特定の時点でのクエリー
    すなわち、特定の時間のデータをどのように照会するかは、終了時間を特定のコミット時間に、開始時間を「000」(最も早いコミット時間を表す)に指定することによって、特定の時間を表すことができる.
    # pyspark
    beginTime = "000" # Represents all commits > this time.
    endTime = commits[len(commits) - 2]
    
    # query point in time data
    point_in_time_read_options = {
      'hoodie.datasource.query.type': 'incremental',
      'hoodie.datasource.read.end.instanttime': endTime,
      'hoodie.datasource.read.begin.instanttime': beginTime
    }
    
    tripsPointInTimeDF = spark.read.format("hudi"). \
      options(**point_in_time_read_options). \
      load(basePath)
    
    tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
    

    7.データの削除
    入力されたHoodieKeyセットを削除します.注意:削除操作はappendモードのみをサポートします.
    # pyspark
    # fetch total records count
    spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
    # fetch two records to be deleted
    ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
    
    # issue deletes
    hudi_delete_options = {
      'hoodie.table.name': tableName,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': tableName,
      'hoodie.datasource.write.operation': 'delete',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2, 
      'hoodie.insert.shuffle.parallelism': 2
    }
    
    from pyspark.sql.functions import lit
    deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
    df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 'uuid']).withColumn('ts', lit(0.0))
    df.write.format("hudi"). \
      options(**hudi_delete_options). \
      mode("append"). \
      save(basePath)
    
    # run the same read query as above.
    roAfterDeleteViewDF = spark. \
      read. \
      format("hudi"). \
      load(basePath + "/*/*/*/*") 
    roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
    # fetch should return (total - 2) records
    spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
    

    8.まとめ
    このブログでは、pysparkを使用してHudiテーブルを挿入、削除、更新する方法を示しています.pysparkとHudiのニーズがあるパートナーは試してみてください.