Cloud DataflowでBigQueryにストリーミングインサートする時のちょっとした注意
問題
Dataflowで、Build-in I/Oのライブラリを使って、BigQueryにストリーミングインサートでデータを挿入しようとすると思わぬエラーに見舞われた。
こういうコード
pubsub2bigquery.py
import argparse
import logging
import json
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class JSONStringToDictFn(beam.DoFn):
def process(self, element):
items = json.loads(element)
yield items
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_subscription',
required=True,
help=(
'Input PubSub subscription '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
parser.add_argument(
'--output_dataset',
required=True,
help=(
'Output BigQuery dataset '
'"<PROJECT>.<DATASET>"'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
subscription = known_args.input_subscription
rides = (
p
| 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
| 'ToDict' >> beam.ParDo(JSONStringToDictFn())
)
(bigquery_project, dataset) = known_args.output_dataset.split('.')
rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
dataset=dataset,
project=bigquery_project)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
import argparse
import logging
import json
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class JSONStringToDictFn(beam.DoFn):
def process(self, element):
items = json.loads(element)
yield items
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_subscription',
required=True,
help=(
'Input PubSub subscription '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
parser.add_argument(
'--output_dataset',
required=True,
help=(
'Output BigQuery dataset '
'"<PROJECT>.<DATASET>"'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
subscription = known_args.input_subscription
rides = (
p
| 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
| 'ToDict' >> beam.ParDo(JSONStringToDictFn())
)
(bigquery_project, dataset) = known_args.output_dataset.split('.')
rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
dataset=dataset,
project=bigquery_project)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
こういうプログラムを以下のように実行するとエラーが発生。
python pubsub2bigquery.py \
--project <PROJECT ID> \
--region='us-central1' \
--runner DataflowRunner \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
--input_subscription <SUBSCRIPTION> \
--output_dataset <DATASET ID> \
--temp_location=<GCP PATH for temp> \
--staging_location=<GCP PATH for staging>
手元の環境では、自動スケーリングの以下のオプションを抜くと、つまりワーカー1で実行するとエラーは発生せず。
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
エラー
全文は抜粋しませんが、以下のような内容を含んだエラーがログに出ます。
"/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse http_response, method_config=method_config, request=request) RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing https://bigquery.googleapis.com/bigquery/v2/projects/shua-gcp-book/datasets/nyc_taxi_trip/tables/realtime_rides?alt=json: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 28 Aug 2020 03:40:42 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{ "error": { "code": 403, "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors", "errors": [ { "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors", "domain": "usageLimits", "reason": "rateLimitExceeded" } ], "status": "PERMISSION_DENIED" } } > [while running 'generatedPtransform-121843']
下の方に "code": 403
とか "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"
とか "reason": "rateLimitExceeded"
とかあるので、APIコールの上限に引っかかったようです。
うーん、ストリーミングインサートで書き込みすぎか。。。
と思ったのですが、公式ドキュメントの以下の通り、クォータはないはず。
もうちょっとよくログをみてみると。。。
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1126, in process bigquery_tools.parse_table_reference(destination), schema) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1112, in _create_table_if_needed additional_create_parameters=self.additional_bq_parameters)
(省略)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 534, in get_table response = self.client.tables.Get(request)
_create_table_if_needed
? get_table
? どうやらテーブル情報の取得次のAPI操作引っかかっている模様。BigQueryに書き込みするところの apache_beam.io.gcp.bigquery.WriteToBigQuery
で create_disposition
パラメータがあってこれがデフォルトで CREATE_IF_NEEDED
になっているようで、おそらくストリーミングインサート実行時にテーブルの存在確認のために参照している模様。
テーブルは基本あるし、こんな動きは不要なのでこのパラメータを以下のように CREATE_NEVER
にしてしまう。
rides | 'Write rides to BigQuery' >> WriteToBigQuery('realtime_rides',
dataset=dataset,
project=bigquery_project,
create_disposition=BigQueryDisposition.CREATE_NEVER)
importも忘れずに。
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
これでエラーは出なくなりました!みなさまお気をつけください。
参考
Apache Beamのコミュニティにもバグチケットがきられていました。
ちなみに、Stackoverflowにも同じ悩みの方がいました。
Author And Source
この問題について(Cloud DataflowでBigQueryにストリーミングインサートする時のちょっとした注意), 我々は、より多くの情報をここで見つけました https://qiita.com/ShuA/items/1b2ffed37a024c3dae07著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .