Cloud DataflowでBigQueryから取得したデータを加工してS3に出力する


はじめに

この記事は、Google Cloud Platform(2) Advent Calendar 2016 4日目の記事です。
DataflowからS3へファイルを出力する方法について紹介します。

概要

Cloud DataflowではGoogle Cloud StorageやBigQuery、Cloud PubSubなどに対してI/Oを行うライブラリが提供されているが、S3などAWS系のI/Oは現在のところ提供されていない。
そこで今回はS3へ出力するCustom Sink(S3Sink)の作成を行い、Dataflowを使ってBigQueryから取得したデータを加工して、その結果をS3へ出力するパイプラインを作成した。

今回作成するパイプラインの概要は下記の表のとおりである。

デザイン 備考
入力データ BigQuery (マスタテーブルとマッピングテーブルの2つ)
出力データ S3
ファイルフォーマット csv
変換 idの中でマッピングテーブルに該当するidを対応する値に変換する
開発環境 Dataflow Python SDK

S3Sinkの作成

S3Sinkは以下のような感じで作成した。

  • FileSinkを継承 
  • initialize_write: GCSの一時ディレクトリを設定
  • write_record: FileSinkのwrite_recordをコール
  • write_encode_record: レコードを書き込み、改行を入れる
  • open_writer:GCSにファイルを出力
  • finalize_writeでGCSからファイルを取得し、一つのファイルにマージする。botoを使用してS3にマージしたファイルをアップロード

open_writerでS3に書き込んでいたが処理速度があまりに遅かったため、一度GCSに書き込んでからマージして、finalize_writeでアップロードするようにした。

カスタムシンクを利用してDataflowを実行するのに必要な準備

以下のようなフォルダ構造でs3sink.pyを配置し、"python setup.py sdist"を実行して、s3sink.pyをパッケージングする。dataflowを実行するとき--extra_packageオプションでパッケージファイルを指定する。

s3_custom_lib
    |-dist
    |-s3package
         |-__init__.py
         |-s3sink.py
    |-setup.py

Dataflow

1.BigQueryに対してクエリをかけて、その結果をPCollectionで受け取る

rows = p | 'readBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=custom_options.query))

2.Mappging用のテーブルに対してもクエリをかけてデータを取得する

mapping = p | 'readBigQuery(MappingTable)' >> beam.io.Read(beam.io.BigQuerySource(query=custom_options.mappingquery))

3.CoGroupByKeyで2つのテーブルをJoinする。BigQueryでクエリをかけた結果はdictなので、tupleに変換してからCoGroupByKeyにかける。

join_data = {'rows': convert_tuple_main(rows), 'mapping': convert_tuple_mapping(mapping)} | beam.CoGroupByKey()

4.ParDoでIDを変換する。
この段階では各要素は以下のような形式になっている。

{"id xxx",{{"row":""},{"mapping":"mapped_id xxx"}}}

mapped_idにある文字列が入ってるIDだけ変換する処理をDoFnで記載する。

result = join_data | 'Replace ID' >> beam.ParDo(IdReplaceFn())

5.処理結果をS3にアップロードする

awscredentialfile = gcsio.GcsIO().open(filename=segment_options.awscredential)
        awscredential = awscredentialfile.read()
        result | 'writeS3' >> beam.io.Write(s3sink.S3Sink(awscredential, custom_options))

6.パイプラインを実行する

python run_dataflow --project {PJ_NAME} --job_name {JOB_NAME} --runner BlockingDataflowPipelineRunner --staging_location gs://xxx/stagin g --temp_location gs://xxx/temp --extra_package s3_custom_lib/dist/s3package-1.0.tar.gz

実行するとGCPのWebコンソールで以下のような図が出力されることを確認できる。

360万件のデータを処理するのに16分程度要した。処理中3vcpuしか使用されてなかったので、もう少し検討が必要。

最後に

Dataflow のPython SDKはβ版なので現段階ではストリーミング処理機能が実装されてないなど、Prodocutionで利用するには推奨されてませんので、注意が必要です。
明日12/5はsoundTrickerによるGAEに関する発表です。