AWS MWAA (Amazon Managed Workflows for Apache Airflow) からSecrets Managerを使用してBigQueryに接続する
はじめに
最近、業務でデータ基盤の構築を行なっており、その中で色々検討した結果ワークフロー構築にMWAAを利用することになりました。
AirflowをAWSのマネージドサービスとして扱えるので、主要リソースをAWS上に構築している場合非常に便利ですよね!
ただ今回、Data Lake, Data WarehouseとしてBigQueryを利用したく、その接続周りで少々困ったのでまとめておきます。
実行準備
以下は作成済みである前提で進めて行きます。
- GCP側
- プロジェクト
- BigQueryについて操作可能なGCPのサービスアカウント
- BigQueryのテーブル (テーブル間でデータのやりとりを行うので、2テーブル用意をしてください)
BigQueryテーブル
テーブルについては以下の2つのテーブルを用意しました。
処理については、data_sources.users
から必要なデータを取得し、warehouses.user_statuses
に格納します。warehouses.user_statuses.created_at
には、格納日を入れることにします。
実際は色々加工をしたのちにwarehouseに格納するかと思いますが、今回はシンプルなデータのやりとりのみ実装していきます。
やってみる
今回は、以下の手順で進めて行きます。
- MWAAでBigQueryのデータを扱うオペレーターを含むDAGを構築
- Secrets ManagerにGCPサービスアカウントの秘匿情報を格納
- 実行確認
MWAAでBigQueryのデータを扱うオペレーターを含むDAGを構築
それでは早速やっていきましょう!
CloudFormationスタック
CloudFormationでリソースを作成していきますが、BigQueryとの接続部分が本題なので、細かい値が公式のクイックスタートチュートリアルの通りにします。
ただし、以下の編集を加えてください。
・
・
・
#####################################################################################################################
# CREATE MWAA
#####################################################################################################################
MwaaEnvironment:
Type: AWS::MWAA::Environment
DependsOn: MwaaExecutionPolicy
Properties:
AirflowConfigurationOptions: # 追記
secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend # 追記
secrets.backend_kwargs: # 追記
connections_prefix: "airflow/connections" # 追記
variables_prefix: "airflow/variables" # 追記
RequirementsS3Path: requirements.txt # 追記
・
・
・
GCPのサービスアカウントのクレデンシャルをSecrets Managerに配置して読み込むので、これらを読み込めるようにする設定と、requirementsを読み込む設定を追記してあります。
追記したら、以下のチュートリアルの2番目のステップまで実行し、スタックを作成してください。
DAG
オペレーターについては、以下のページにあるbigquery_operator
を使用します。BigQueryに対してクエリをかけ、その結果を別テーブルにインサートするなどの操作が簡単に行えます。
DAGファイルは以下のようにしました。
import os
from datetime import datetime, timedelta, timezone
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.contrib.operators import bigquery_operator
PROJECT_ID = 'xxxxxx'
LOCATION = 'xxxxxxx'
yesterday = (datetime.now()-timedelta(days=1))
_date = (datetime.now()-timedelta(days=1)).strftime('%Y-%m-%d')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': yesterday
}
with DAG(
'bq-test',
default_args=default_args,
description='bq-test',
params={
'project_id': PROJECT_ID,
'date': _date,
}
) as dag:
get_users = """
select
user_id,
name,
date('{date}') as created_at
from {project_id}.{dataset}.{table}
;
"""
t1 = bigquery_operator.BigQueryOperator(
task_id='query_and_migrate_data',
gcp_conn_id='google_cloud_default',
sql=get_users.format(
date="{{params.date}}",
project_id="{{params.project_id}}",
dataset='data_sources',
table='users',
),
destination_dataset_table="{{params.project_id}}.warehouses.user_statuses",
use_legacy_sql=False,
location=LOCATION,
dag=dag,
)
t2 = BashOperator(
task_id='echo_finish',
bash_command='echo finish',
)
t1 >> t2
t1のオペレーターが本題部分で、data_sources.users
テーブルに対してget_users
でしたいしたクエリを実行し、その結果をdestination_dataset_table
で指定したwarehouses.user_statuses
にインサートしています。
接続設定は、google_cloud_default
を使用するようにしていしてあります。
作成したDAGを指定のS3の/dags/フォルダに配置します。
requirements.txt
MWAAからGCPのリソースを扱うために以下のrequirements.txtを指定のS3に配置します
apache-airflow-providers-google
Secrets ManagerにGCPサービスアカウントの秘匿情報を格納
ここまでで実行準備は整いましたが、これではGCPに対するアクセスができず弾かれてしまうので、どこかに認証情報を配置する必要があります。
AirflowUIを立ち上げた後にconnectionsの項目からgoogle_cloud_default
の設定値にGCPのサービスアカウントのクレデンシャル情報を直接書き込むこともできますが、少々具合が悪いので、今回はSecretManagerを使用していきます。
Secrets Managerに配置したキー情報の読み込みについては、以下の公式ページが参考になります。
以下のコードをローカルのPythonで実行して、Airflow用の接続URLを作成します。
import json
from airflow.models.connection import Connection
cred = {
"type": "service_account",
"project_id": "xxxxxx",
"private_key_id": "xxxxxx",
"private_key": "-----BEGIN PRIVATE KEY-----\nxxxxxx\n-----END PRIVATE KEY-----\n",
"client_email": "xxxxxx",
"client_id": "xxxxxx",
"auth_uri": "xxxxxx",
"token_uri": "xxxxxx",
"auth_provider_x509_cert_url": "xxxxxx",
"client_x509_cert_url": "xxxxxx"
}
extra=json.dumps({
'extra__google_cloud_platform__keyfile_dict': json.dumps(cred),
'extra__google_cloud_platform__scope': 'https://www.googleapis.com/auth/cloud-platform',
'extra__google_cloud_platform__project': 'xxxxxx', # プロジェクトID
'extra__google_cloud_platform__num_retries': '1',
})
myconn = Connection(
conn_id='google_cloud_default',
conn_type='google_cloud_platform',
extra=extra,
)
print(myconn.get_uri())
※ 設定値の詳細は以下のページから確認できます。
上記のコードを実行すると、エンクリプトされたURIが出力されます。これをSecrets Managerにプレーンテキストとして配置していきます。
このとき、公式ページの通りにシークレットの名前をairflow/connections/{コネクションの名前}
とします。今回はconn_id
をgoogle_cloud_default
としたので、これを使用します。
ここまでで準備が完了しました!
AirflowUIからDAGを実行して確認すると、全てのフローがグリーンになり、BigQueryのwarehouses.user_statuses
にデータがインサートされることが確認できると思います。
最後に
これで無事にBigQueryに接続できました。
マルチクラウドはこういった認証周りがめんどうで、公式に情報はあるっちゃあるんですが、ピンポイントで欲しい情報が少なく少々手間取りました。。。
もし今回の記事がどなたかの役に立てば幸いです!
それでは今回はここまで。
Author And Source
この問題について(AWS MWAA (Amazon Managed Workflows for Apache Airflow) からSecrets Managerを使用してBigQueryに接続する), 我々は、より多くの情報をここで見つけました https://qiita.com/Hisaaki-Kato/items/aec0e110164cfde55ead著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .