Cloud DataflowでBigQueryにストリーミングインサートする時のちょっとした注意


問題

Dataflowで、Build-in I/Oのライブラリを使って、BigQueryにストリーミングインサートでデータを挿入しようとすると思わぬエラーに見舞われた。

こういうコード

pubsub2bigquery.py
import argparse
import logging
import json

from past.builtins import unicode

import apache_beam as beam

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class JSONStringToDictFn(beam.DoFn):
    def process(self, element):
        items = json.loads(element)

        yield items

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
      '--input_subscription',
      required=True,
      help=(
          'Input PubSub subscription '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
    parser.add_argument(
      '--output_dataset',
      required=True,
      help=(
          'Output BigQuery dataset '
          '"<PROJECT>.<DATASET>"'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:

        subscription = known_args.input_subscription
        rides = (
            p
            | 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
            | 'ToDict' >> beam.ParDo(JSONStringToDictFn())
        )

        (bigquery_project, dataset) = known_args.output_dataset.split('.')
        rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
                                                             dataset=dataset,
                                                             project=bigquery_project)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

こういうプログラムを以下のように実行するとエラーが発生。

python pubsub2bigquery.py \
--project <PROJECT ID> \
--region='us-central1' \
--runner DataflowRunner \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
--input_subscription <SUBSCRIPTION> \
--output_dataset <DATASET ID> \
--temp_location=<GCP PATH for temp> \
--staging_location=<GCP PATH for staging>

手元の環境では、自動スケーリングの以下のオプションを抜くと、つまりワーカー1で実行するとエラーは発生せず。

--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \

エラー

全文は抜粋しませんが、以下のような内容を含んだエラーがログに出ます。

"/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse http_response, method_config=method_config, request=request) RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing https://bigquery.googleapis.com/bigquery/v2/projects/shua-gcp-book/datasets/nyc_taxi_trip/tables/realtime_rides?alt=json: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 28 Aug 2020 03:40:42 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{ "error": { "code": 403, "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors", "errors": [ { "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors", "domain": "usageLimits", "reason": "rateLimitExceeded" } ], "status": "PERMISSION_DENIED" } } > [while running 'generatedPtransform-121843']

下の方に "code": 403 とか "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors" とか "reason": "rateLimitExceeded" とかあるので、APIコールの上限に引っかかったようです。

うーん、ストリーミングインサートで書き込みすぎか。。。
と思ったのですが、公式ドキュメントの以下の通り、クォータはないはず。

もうちょっとよくログをみてみると。。。

"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1126, in process bigquery_tools.parse_table_reference(destination), schema) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1112, in _create_table_if_needed additional_create_parameters=self.additional_bq_parameters)
(省略)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 534, in get_table response = self.client.tables.Get(request)

_create_table_if_needed? get_table? どうやらテーブル情報の取得次のAPI操作引っかかっている模様。BigQueryに書き込みするところの apache_beam.io.gcp.bigquery.WriteToBigQuerycreate_disposition パラメータがあってこれがデフォルトで CREATE_IF_NEEDED になっているようで、おそらくストリーミングインサート実行時にテーブルの存在確認のために参照している模様。

テーブルは基本あるし、こんな動きは不要なのでこのパラメータを以下のように CREATE_NEVER にしてしまう。

fixed.py
        rides | 'Write rides to BigQuery' >> WriteToBigQuery('realtime_rides',
                                                             dataset=dataset,
                                                             project=bigquery_project,
                                                             create_disposition=BigQueryDisposition.CREATE_NEVER)

importも忘れずに。

fixed.py
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition

これでエラーは出なくなりました!みなさまお気をつけください。

参考

Apache Beamのコミュニティにもバグチケットがきられていました。

ちなみに、Stackoverflowにも同じ悩みの方がいました。