AWS MWAA (Amazon Managed Workflows for Apache Airflow) からSecrets Managerを使用してBigQueryに接続する


はじめに

最近、業務でデータ基盤の構築を行なっており、その中で色々検討した結果ワークフロー構築にMWAAを利用することになりました。
AirflowをAWSのマネージドサービスとして扱えるので、主要リソースをAWS上に構築している場合非常に便利ですよね!

ただ今回、Data Lake, Data WarehouseとしてBigQueryを利用したく、その接続周りで少々困ったのでまとめておきます。

実行準備

以下は作成済みである前提で進めて行きます。

  • GCP側
    • プロジェクト
    • BigQueryについて操作可能なGCPのサービスアカウント
    • BigQueryのテーブル (テーブル間でデータのやりとりを行うので、2テーブル用意をしてください)

BigQueryテーブル

テーブルについては以下の2つのテーブルを用意しました。

  • data_sources.users

  • warehouses.user_statuses

処理については、data_sources.usersから必要なデータを取得し、warehouses.user_statusesに格納します。warehouses.user_statuses.created_atには、格納日を入れることにします。
実際は色々加工をしたのちにwarehouseに格納するかと思いますが、今回はシンプルなデータのやりとりのみ実装していきます。

やってみる

今回は、以下の手順で進めて行きます。

  1. MWAAでBigQueryのデータを扱うオペレーターを含むDAGを構築
  2. Secrets ManagerにGCPサービスアカウントの秘匿情報を格納
  3. 実行確認

MWAAでBigQueryのデータを扱うオペレーターを含むDAGを構築

それでは早速やっていきましょう!

CloudFormationスタック

CloudFormationでリソースを作成していきますが、BigQueryとの接続部分が本題なので、細かい値が公式のクイックスタートチュートリアルの通りにします。
ただし、以下の編集を加えてください。

  
  
  
  #####################################################################################################################
  # CREATE MWAA
  #####################################################################################################################

  MwaaEnvironment:
    Type: AWS::MWAA::Environment
    DependsOn: MwaaExecutionPolicy
    Properties:
      AirflowConfigurationOptions:  # 追記
        secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend  # 追記
        secrets.backend_kwargs:  # 追記
          connections_prefix: "airflow/connections"  # 追記
          variables_prefix: "airflow/variables"  # 追記
      RequirementsS3Path: requirements.txt  # 追記
  
  
  

GCPのサービスアカウントのクレデンシャルをSecrets Managerに配置して読み込むので、これらを読み込めるようにする設定と、requirementsを読み込む設定を追記してあります。

追記したら、以下のチュートリアルの2番目のステップまで実行し、スタックを作成してください。

DAG

オペレーターについては、以下のページにあるbigquery_operatorを使用します。BigQueryに対してクエリをかけ、その結果を別テーブルにインサートするなどの操作が簡単に行えます。

DAGファイルは以下のようにしました。

import os
from datetime import datetime, timedelta, timezone
from textwrap import dedent

from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.contrib.operators import bigquery_operator

PROJECT_ID = 'xxxxxx'
LOCATION = 'xxxxxxx'

yesterday = (datetime.now()-timedelta(days=1))
_date = (datetime.now()-timedelta(days=1)).strftime('%Y-%m-%d')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': yesterday
}

with DAG(
    'bq-test',
    default_args=default_args,
    description='bq-test',
    params={
        'project_id': PROJECT_ID,
        'date': _date,
    }
) as dag:

    get_users = """
    select
        user_id,
        name,
        date('{date}') as created_at
    from {project_id}.{dataset}.{table}
    ;
    """

    t1 = bigquery_operator.BigQueryOperator(
        task_id='query_and_migrate_data',
        gcp_conn_id='google_cloud_default',
        sql=get_users.format(
            date="{{params.date}}",
            project_id="{{params.project_id}}",
            dataset='data_sources',
            table='users',
        ),
        destination_dataset_table="{{params.project_id}}.warehouses.user_statuses",
        use_legacy_sql=False,
        location=LOCATION,
        dag=dag,
    )

    t2 = BashOperator(
        task_id='echo_finish',
        bash_command='echo finish',
    )

    t1 >> t2

t1のオペレーターが本題部分で、data_sources.usersテーブルに対してget_usersでしたいしたクエリを実行し、その結果をdestination_dataset_tableで指定したwarehouses.user_statusesにインサートしています。
接続設定は、google_cloud_defaultを使用するようにしていしてあります。
作成したDAGを指定のS3の/dags/フォルダに配置します。

requirements.txt

MWAAからGCPのリソースを扱うために以下のrequirements.txtを指定のS3に配置します

requirements.txt
apache-airflow-providers-google

Secrets ManagerにGCPサービスアカウントの秘匿情報を格納

ここまでで実行準備は整いましたが、これではGCPに対するアクセスができず弾かれてしまうので、どこかに認証情報を配置する必要があります。

AirflowUIを立ち上げた後にconnectionsの項目からgoogle_cloud_defaultの設定値にGCPのサービスアカウントのクレデンシャル情報を直接書き込むこともできますが、少々具合が悪いので、今回はSecretManagerを使用していきます。

Secrets Managerに配置したキー情報の読み込みについては、以下の公式ページが参考になります。

以下のコードをローカルのPythonで実行して、Airflow用の接続URLを作成します。

import json
from airflow.models.connection import Connection

cred = {
    "type": "service_account",
    "project_id": "xxxxxx",
    "private_key_id": "xxxxxx",
    "private_key": "-----BEGIN PRIVATE KEY-----\nxxxxxx\n-----END PRIVATE KEY-----\n",
    "client_email": "xxxxxx",
    "client_id": "xxxxxx",
    "auth_uri": "xxxxxx",
    "token_uri": "xxxxxx",
    "auth_provider_x509_cert_url": "xxxxxx",
    "client_x509_cert_url": "xxxxxx"
}

extra=json.dumps({
    'extra__google_cloud_platform__keyfile_dict': json.dumps(cred),
    'extra__google_cloud_platform__scope': 'https://www.googleapis.com/auth/cloud-platform',
    'extra__google_cloud_platform__project': 'xxxxxx',  # プロジェクトID
    'extra__google_cloud_platform__num_retries': '1',
})

myconn = Connection(
    conn_id='google_cloud_default',
    conn_type='google_cloud_platform',
    extra=extra,
)

print(myconn.get_uri())

※ 設定値の詳細は以下のページから確認できます。

上記のコードを実行すると、エンクリプトされたURIが出力されます。これをSecrets Managerにプレーンテキストとして配置していきます。
このとき、公式ページの通りにシークレットの名前をairflow/connections/{コネクションの名前}とします。今回はconn_idgoogle_cloud_defaultとしたので、これを使用します。

ここまでで準備が完了しました!
AirflowUIからDAGを実行して確認すると、全てのフローがグリーンになり、BigQueryのwarehouses.user_statusesにデータがインサートされることが確認できると思います。

最後に

これで無事にBigQueryに接続できました。
マルチクラウドはこういった認証周りがめんどうで、公式に情報はあるっちゃあるんですが、ピンポイントで欲しい情報が少なく少々手間取りました。。。

もし今回の記事がどなたかの役に立てば幸いです!
それでは今回はここまで。