Cloud Composer(Airflow)でAutoMLを使った推論パイプラインを組んでみた


Overview.

GCPのCloud ComposerでAirflow環境を立ち上げAutoMLOperatorを使ってBigQueryからテーブルデータを抽出して、予測するフローを組んでみました。
普通にやれば行けるかと思ったら結構躓きました。

躓いたポイント

  • AutoMLOperatorはAirflow1.10.6からだが、CloudComposerのAirflowイメージは1.10.3までしか対応していない
  • AutoMLOperatorのリファレンスがスカスカすぎてgoogle-cloud-automlのライブラリのソースまで漁りました

どうやって解決したか

  • 依存関係のあるパッケージをCloud Composerの環境にインストール
  • AirflowのパッケージはCloud Composer環境にインストールできなかったので、ソースをとってきてpluginとして使った

前提

  • GCPプロジェクト課金済み(支払いアカウント紐づき済み)
    • モデルの学習・デプロイには課金が発生します
  • AutoMLモデルデプロイ済み
    • 適当なデータセットで適当なモデルを作成してデプロイしておきます
    • 予測に用いるテストデータをBigQueryテーブルにセットしておきます

Cloud ComposerでAirflow環境を構築する

環境設定

基本設定は以下のようにしました。

  • ゾーン: us-central1-c
  • Google API scopes: https://www.googleapis.com/auth/cloud-platform
  • Image version: composer-1.8.3-airflow-1.10.3
  • Python version: 3
  • ネットワーク タグ: なし
  • Node count: 3
  • Disk size (GB): 100
  • マシンタイプ: n1-standard-1

ここからがAirflow1.10.3未対応のAutoMLOperatorに対応するためのちょっとした工夫。
CloudComposerのコンソールのPyPl PACKAGESタブで、cached_propertygoogle-cloud-automl >=0.9.0パッケージを追加しておく。(この環境の変更には20~30分くらいかかる)

Airflow1.10.3ではデフォルトのAirflowパッケージの中にgcpモジュールがないのでAirflow1.10.6のGitHubリポジトリからAutoMLのOperatorだけ抽出してpluginとして置いてつかう。
リポジトリをcloneします。その中のAutoMLOperatorと依存関係のあるファイル
- airflow/airflow/gcp/operators/automl.py
- airflow/airflow/gcp/hooks/automl.py
- airflow/airflow/gcp/hooks/base.py

だけ抽出します。

airflow
 ┗ gcp
   ┣ operators
   ┃  ┗ automl.py
   ┗ hooks
      ┣ automl.py
      ┗ base.py

という感じにまとめておきます。
これをairflowフォルダごとCloudComposer環境用のGCSバケットのpluginsフォルダに置いておく。

DAGを準備

とりあえずBigQueryからCloud Storageにテストデータセットを抽出してAutoMLのpredictにかけ、結果をGCSに吐き出すところまでをワークフロー化してみます。
データフローとしては、
BigQuery -> Cloud Storage -> AutoML -> Cloud Storage
という感じ。
(AutoMLデータセット作成&学習&デプロイ部分は時間がなかったので割愛)

automl_pipeline.py
# coding: utf-8

from datetime import datetime, timedelta
from os import path 

import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
# from plugins/
from airflow.gcp.operators.automl import AutoMLBatchPredictOperator
#set paths
gcp_project = 'YOUR_PROJECT_NAME'
gcs_bucket = 'YOUR_BUCKET'
bq_dataset = 'YOUR_DATASET_NAME'
bq_table = 'YOUR_TABLE_NAME'
gcs_prefix = path.join('gs://', gcs_bucket, '')
gcp_location = 'us-central1'

# set AutoML model ID
model_id = 'TBL386XXXXXXXXXXX12' # AutoML model ID
# test dataset: `YOUR_PROJECT_NAME:YOUR_DATASET_NAME.YOUR_TABLE_NAME`
dataset_table = f'{gcp_project}:{bq_dataset}.{bq_table}'
# gcs path 
storage_uri_path = 'automl_data/input'
# set default conn id
gcp_conn_id = 'google_cloud_default'

default_args = {
    'owner': 'OWNER_NAME',
    'retries': 1,
    'retry_delay': timedelta(seconds=10),
    'start_date': airflow.utils.dates.days_ago(1)
    }

with DAG(
    dag_id='automl_predict',
    default_args=default_args,
    schedule_interval=None
    ) as dag:

    # extract dataset from bq to gcs
    bq_to_gcs_op = BigQueryToCloudStorageOperator(
        task_id='bq_to_gcs',
        bigquery_conn_id=gcp_conn_id,
        source_project_dataset_table=dataset_table,
        destination_cloud_storage_uris=[path.join(gcs_prefix, storage_uri_path, 'test.csv')],
        dag=dag
        )

    # set input data path
    input_data = f'gs://{gcs_bucket}/{storage_uri_path}/test.csv'
    input_config = {'gcs_source': {'input_uris': [input_data]}}
    # set output data path prefix
    output_data = f'gs://{gcs_bucket}/{storage_uri_path}/'.replace('input', 'output')
    output_config = {'gcs_destination': {'output_uri_prefix': outputdata}}

    # AutoML predict
    automl_predict_op = AutoMLBatchPredictOperator(
        task_id='predict',
        model_id=model_id,
        input_config=input_config,
        output_config=output_config,
        location=gcp_location,
        project_id=gcp_project,
        dag=dag
        )

    bq_to_gcs_op >> automl_predict_op

if __name__ == '__main__':
    dag.cli()

全体はこんな感じ。

これでDAGをトリガーすればGCSのgs://YOUR_BUCKET/automl_dataset/output/に予測結果がCSVで吐き出される。