[Airflow]PythonOperatorでBigQueryへの接続


はじめに

CloudComposerを利用せずにGCE内のDockerにてAirflowを運用しております。
BigQueryをAirflowで制御する必要があり、接続に少し苦労しましたので書き残しておきます。

BigQueryOperatorで制御する方法はよくあるのですが、
PythonOperatorでGoogleSDKを使ってBigQueryを制御する方法はあまりありませんでした。
(それぞれのOperatorで制御しろよって話ですが、Pythonで自由に作りたい、、、)

TL;DR

<エラー内容>

client = bigquery.Client()
tables = client.list_tables(dataset_id)

↓ エラー出力

google.api_core.exceptions.Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/[project]/datasets/[dataset]/tables?prettyPrint=false: Request had insufficient authentication scopes.

<はまった原因>

単純にサービスアカウントを読み込んでいなかっただけです。

<解消方法>

サービス アカウント キー ファイルを使用した認証に記載の内容を実施するだけです。

Detail

1. サービスアカウントキーの発行

サービスアカウントキーを発行します。
発行手順は他のサイトを参照下さい。

2. 出力されたjsonをディレクトリに配置します。

インスタンスのdockerとリンクしているディレクトリにjsonファイルを配置します。

リンクしているディレクトリは、docker-compose.yamlのvolumesになります。

Docker内のAirflowのリンクディレクトリについては、設定にもよりますがairflow.cfgの中に記載があると思います。
cfgを読むのが面倒くさい、、、なんか長い、、って方は、実際にdocker内に入って確認すると良いと思います。

docker ps
・・・ Docker名とかが出てくる
docker exec -it [web_serverのDocker名] bash

初期インストール方法は、別の方の記事を参考いただければと思います。

3. 実際にコードを書いてみる

普通にサービス アカウント キー ファイルを使用した認証を参考としただけですが、マスクした形で実際のコードを載せておきます。
oO(テストコードですし自分のためにも)

内容は、データセットのテーブル一覧の出力です。
認証の部分は、credentialsに設定するservice_accountにて、key.jsonを呼び出して設定しております。
その内容をbigquery.Clientの引数にて渡しております。

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta, timezone
import pendulum

from google.oauth2 import service_account
from google.cloud import bigquery


JST = timezone(timedelta(hours=+9), 'JST')
batch_date = (datetime.now(JST)).strftime('%Y%m%d')
project = "projectName"
detaset = "datasetName"
credentials = service_account.Credentials.from_service_account_file(
    "/opt/airflow/credential/key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],
)


def start(**kwargs):
    print("start job")
    print(batch_date)


def getTables(dataset_id):
    datasetId = project + "." + detaset
    print(datasetId)  # projectName.datasetName
    client = bigquery.Client(credentials=credentials,
                             project=credentials.project_id,)
    tables = client.list_tables(datasetId)

    for table in tables:
        print("{}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        ## projectName.datasetName.tableName


with DAG(
    'viewTables',
    catchup=False,
    start_date=datetime(2021, 4, 25, 00, 00,
                        tzinfo=pendulum.timezone('Asia/Tokyo')),
    schedule_interval=timedelta(minutes=60),
    default_args={'owner': 'airflow'},
    tags=['tag'],
) as dag:

    startTask = PythonOperator(
        task_id='start',
        provide_context=True,
        python_callable=start,
    )

    getTablesTask = PythonOperator(
        task_id='get_tables',
        provide_context=True,
        python_callable=getTables,
    )

startTask >> getTablesTask

4. ついでにBigQueryOperaterでも使えるようにしておく

PythonOperatorとは関係ないですが、BigQueryOperaterを使う場合もkey情報は必要です。
Cloud Composerは、この辺を自動でやってくれる?と思いますが、自前で立ち上げた場合は、UI上で以下の設定が必要です。
bigquery_defaultは変更可能ですが、Cloud Composerでの標準の名前でしょうか??


このように利用します。

t2 = BigQueryOperator(
    task_id='copy_table',
    sql="""
    select * from `[source_projectName].[source_datasetName].[source_tableName]`
    """,
    destination_dataset_table='dest_projectName.dest_databaseName.dest_tableName',
    use_legacy_sql=False,
    bigquery_conn_id='bigquery_default',
    dag=dag,
)

最後に

airflowは色んな所で苦労します。
Cloud Composerのマネージドにすればこの辺が解消されるのでしょうか??