Cloud Composer Tips


皆さんこんにちは。@best_not_bestです。

今年はCloud Composerを利用することが多かったため、そこで得たTipsをまとめたいと思います。
なお、以下記載のコードはPython 3での動作を想定しております。

サポート対象のPythonバージョン

2.7.15と3.6.6となります。
https://cloud.google.com/composer/docs/concepts/python-version

タスク失敗時のロギング/Monitoring検知

タスクが失敗した時のロギングへのログの落ち方がなかなか分かりにくいです。
AirflowExceptionraiseするタスクを用意し、これを各タスクの後にone_failedで実行させ、この時のメッセージを検知するのがシンプルかなと思います。

サンプルプログラム

sample_1.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""sample."""

from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

def output_error():
    """output error."""
    raise AirflowException('task failed.')

# (省略)

with models.DAG(
    dag_id='sample_001',
    # (省略)
) as dag:
    task_1 = # (省略)
    task_2 = # (省略)
    error_task = PythonOperator(
        task_id='error_task',
        python_callable=output_error,
        retries=0,
        trigger_rule='one_failed',
        dag=dag,
    )

    task_1 >> error_task
    task_2 >> error_task
    task_1 >> task_2

ロギングでの指標

以下で指標を作成します。

resource.type="cloud_composer_environment"
logName=projects/<プロジェクトID>/logs/airflow-worker
labels.workflow:sample_001
("task failed.")

Monitoringでのアラートポリシー

上記指標をターゲットとしてアラートポリシーを作成します。


エラーログ

ロギングでは少々分かりにくいので、Composerバケット配下のログが分かりやすいかと思います。

タスクの考え方

PythonOperatorである程度自由に処理を記述できますが、用意されているOperatorを極力使った方が処理速度が速いです。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/contrib/operators/index.html

例えばBigQuery内のデータをCloud StorageにCSVファイル出力する処理であれば、

  1. PythonOperatorでBigQueryのデータをクエリで取得
  2. PythonOperatorで取得したデータをCSVファイルにしてローカルに保存
  3. PythonOperatorでCSVファイルをCloud Storageにコピー

ではなく

  1. BigQueryOperatorでクエリ実行結果をBigQuery上にテーブル保存
  2. BigQueryToCloudStorageOperatorでテーブル内のデータをCloud Storageに出力

の方が、BigQueryやCloud Storage側で処理が行われるため実行時間が速くなります。
(前者はデータ数によっては処理が終わらないことも・・・。)

VPC Service Controls

境界内に置くことは可能ですが、Pythonパッケージのインストールや外部システムとの連携の際の設定が複雑になる、等の理由から個人的には利用していません。
BigQueryやCloud Storageだけでも境界内に置きたいという場合は、

  • BigQueryやCloud Storageを扱うプロジェクトA(境界内)
  • Cloud Composerを扱うプロジェクトB(境界外)

のようにプロジェクトを分け、Cloud Composerを実行しているサービスアカウントからのアクセスをプロジェクトAで許可するようにすると後々楽かと思います。

また余談ですが、セキュリティ向上のためとはいえFWルールで下りを全遮断してしまうと、スケジューラ等が動かなくなるのでご注意ください。(Tenant Projectと通信しているため?)
可能な限り下りは開けたままの方が良いかと思います。

ヘルスチェック

以下ご参考ください。
https://cloud.google.com/composer/docs/tutorials/health-check

まとめ

  • タスク失敗検知は検知用のタスクを用意する
  • 用意されているOperatorを極力使いましょう
  • 過度なVPC Service ControlsやFWルールでのアクセス制限は、保守運用を複雑にする場合もある

以上、ご参考になれば幸いです!