Cloud Composerを使ってデータ分析基盤を作ったときの話


本記事は、VISITS Advent Calendar 2019の10日目の記事です。
昨日は、@muraokaz によるクラスタリング+二次元マッピングによる可視化を定量評価する でした。

VISITS Technologiesでインフラエンジニアをしている@syogunです。
本日は、Cloud Composerを使用してVISITSでデータ分析基盤を構築(in progress)した時の話をさせて頂ければと思います。
主にCloudSQL(MySQL)→GCS→BigQueryの部分について書きます。
人様の情報がベースで恐縮ですが、こちらの記事を参考にして構築を進めました。
Cloud ComposerがGKE上で動いているため、ちょっとGKEの話も出てきます。
では参りましょう。

1. 構成図

今後変更はあるかもしれませんが、想定している構成図はこんな感じです。(GAEのログ転送は含まない)

データマスキングの部分は、GCPのDLPを使用してメールアドレスや電話番号を判別して、
その情報を元にデータをマスキングする方法を検討中です。
現在進行形なので、仕組みができたらまた書こうと思います。

2. 背景

ユーザーや弊社サービス利用企業のActivity等を分析し、プロダクトの改善に活かしていきたいというニーズがあり、データ分析基盤を作ろう!という話になりました。
具体的には、アプリケーションログ(GAE)、Google Analytics、DB(MySQL)をDataSourceとし、
そのDataSourceに必要なマスキング処理を行い、DWHにロードするといった構成を考えました。
弊社のクラウドは主にGCPを使っているため、DWHは文句なしでBigQueryを使うことにしました。

3. いろいろ試しました

そもそも「何を使えばいいだ?」というレベルから始まったのいろいろ試行錯誤しました。
おそらく一番の課題は、データを抽出、加工するところだと考え、
最初は、Cloud DataPrepCloud DataFusionを使用して、ノンプログラミングで楽することを考えました。
しかしその淡い夢は、以下の理由で絶たれれました。。

Cloud DataPrepについて

DataPrepとは、公式ドキュメントを見ると

ブラウザ環境の簡単なドラッグ&ドロップ操作で、分析用の多様なデータセットを視覚的に探索、クリーニング、準備できるインテリジェントなデータ準備およびクレンジング サービスです。

とあります。
DataSourceのマスキングにこのDataPrepを使うことを検討したんですが、

  • そもそも対応しているDataSourceがGCSとBigQueryしかなく、Cloud SQLに対応していなかった。
  • Cloud SQLからのデータ抽出は、別途CircleCI等のCIツールからgcloud sql export csvを叩いてGCSにCloud SQLのデータを出力することも検討したが、負けた感が強くて途中でやめた。

という理由で無念の撤退。

Cloud DataFusionについて

DataFusionとは、公式ドキュメントを見ると

ETL および ELT のデータ パイプラインを効率的に構築して管理できる、フルマネージドかつクラウド ネイティブなデータ統合サービスです。

とあります。
次にDataPrepと比較して様々なDataSourceに対応しているDataFusionを検討したんですが、

  • DataFusionからCloud SQLへの接続がどうしてもできなかった。。涙

という理由で無念の撤退。
余談ですが、DataFusionは、DataSourceとしてS3とかにも対応しており、
S3->BigQueryが実現できそうでした。これはAWSユーザにとっては嬉しいですね。

4. 最終的に何を使ったのか?

そんなこんなで最終的にCloud Composerにたどり着きました。
理由としては、

  • ワークフローを定義できるのでマスキング等の処理を任意のタイミングで挟める
  • MySqlToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperatorといったOperator(特定のプログラムを実行するためのテンプレートのようなもの)が用意されている
  • すでにCloudSQL(MySQL)→GCS→BigQueryをやっている人がおり、サンプルのDAGファイルもあった。
    • Airflowは、個人的になかなか学習コストが高かったので助かった

が挙げられます。

5. Cloud Composerセットアップな大まかな流れ

以下のような流れでセットアップできます。

  1. Cloud Composer環境を構築
  2. Cloud SQL ProxyをGKE上にデプロイ
  3. Airflow connectionsの設定
  4. DAGファイルの作成、設置

6. Cloud ComposerでCloudSQL(MySQL)→GCS→BigQueryを実現するためのポイント

ここでは、私がハマった箇所をご紹介します。

1. Cloud Composer環境を構築

当初は、以下のgcloudコマンドでCloud Composer環境を構築しました。

$ gcloud composer environments create <your env name> --location asia-northeast1 

しかしながら、Composer Backend timed out. なるエラーが出てコマンドがこけます。
原因は、Cloud Composer環境作成の際、サービスアカウントを明示的に指定しないと
デフォルトでは、<your project number>[email protected] というサービスアカウントが使用されます。
このサービスアカウントにComposer ワーカー権限がないため、Composerの作成にこけるようです。(エラーメッセージはなんとかしてほしい)

デフォルトのサービスアカウントは使いたくなかったので、
今回は別途cloud-composer@<your project name>.iam.gserviceaccount.comを作成し、そちらでComposerを作成しました。以下、コマンド例。

$ gcloud composer environments create <your env name> --service-account=cloud-composer@<your project name>.iam.gserviceaccount.com --location asia-northeast1 

サービスアカウント権限は、

  • Composer ワーカー
  • BigQuery 管理者
  • クラウドSQL編集者
  • ストレージ オブジェクト管理者

をつけました。

2. Cloud SQL ProxyをGKE上にデプロイ

Cloud ComposerからCloud SQLへの接続は、Cloud SQL Proxyを使用します。
Cloud Composer環境を作成すると、裏でGCEが起動し、その上でKubernetesが起動することになります。
よって、Cloud SQL Proxy用のDeployment,Serviceを作成し、DBの接続設定をCloud SQL ProxyのEndpointに向けることでCloud SQLへの接続が可能となります。

まず、kubectlコマンドでサービウアカウントのsecretを作成します。

$ mv <your service account file> credentials.json
$ kubectl create secret generic <your secret name> --from-file=key.json=credentials.json

次にこちらのManifest Fileを参考にCloud SQL Proxy用のDeploymentを作成します。
こちらのManifest Fileに少し手を加えて、さきほど作成したsecret(サービスアカウントキー)を埋め込んでます。
参考までに手を加えた部分を抜粋します。


    spec:
      volumes:
        - name: ssl-certs
          hostPath:
            path: /etc/ssl/certs
        - name: google-cloud-key
          secret:
            secretName:  <your secret name> # secret名を指定
      containers:
        - image: gcr.io/cloudsql-docker/gce-proxy:1.11
          volumeMounts:
            - name: ssl-certs
              mountPath: /etc/ssl/certs
            - name: google-cloud-key # secretをmount
              mountPath: /var/secrets/google
          env:
            - name: GOOGLE_APPLICATION_CREDENTIALS # GOOGLE_APPLICATION_CREDENTIALSにサービスアカウントを設定(不要かも?)
              value: /var/secrets/google/key.json
          command:
            [
              "/cloud_sql_proxy",
              "-instances=visits-for-innovators-prd:asia-northeast1:innovators-production-replica=tcp:0.0.0.0:3306",
              "-credential_file=/var/secrets/google/key.json", # credentialを明示的に指定
            ]

次に、こちらのManifest Filekubectl applyしてServiceリソースを作成します。
ここで作成したServiceがCloud SQL ProxyのEndpointになります。

3. Airflow connectionsの設定

GCPのCloud ComposerからAirflowのGUIに接続できます。
このAirflowのGUIからCloud SQLのConnections設定を行います。
ここでは文字コードを指定しないとExportしたjsonが文字化けすることがあったんですが、以下のようにutf8を明示的に指定することで回避できました。

なお、Conn IDには、上記で作成したServiceリソース名.default(例 mysql-dvh-sqlproxy-service.default)を指定すればOKです。
あとは、KubernetsのKube-DNSが名前解決してくれるはずです。

4. DAGファイルの作成、設置

Cloud Composerを作成すると自動でDAGファイルを配置するGCSバケットが作成されます。
このGCSバケットにDAGファイルを配置すれば、ワークフローが読み込まれます。
ここでのハマりポイントは、DAGファイルでSchedule設定しているのに、当該時刻が過ぎてもワークフローが走らないことがありました。
AirflowのSchedule設定のポイントは以下2つでした。

スケジュール実行

start_dateを設定する

まず、start_date、ワークフローの開始日付を設定しないと動きません。


# 設定例
default_args = {
    'owner': '<your name>',
    'depends_on_past': False,
    # Change this to a literal date to make scheduler work.
    'start_date': datetime(2019, 11, 25), # ここをちゃんと設定する
    'email': ['<your email address>'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1),
}
スケジュール実行時間の計算方式を理解する

Scheduleを設定してもジョブが動かないず、少しハマりました。
詳細は、こちらの記事の解説がすごく分かりやすかったですが、ポイントは¥

start_date + schedule_interval が経過したらスケジュールされる
スケジュールされるタイミングはschedule_intervalの経過後

とのことでした。
このあたり把握せず、デバッグしようと思うとハマりますのでお気をつけを。

BigQueryOperatorを使ったデータマートの作成

BigQuery上のTableをjoinして、データマート的なものを作ろうと考えました。
これには、AirflowのBigQueryOperatorを使用することを考えています。
具体的には、既存のBigqueryのTableに対しsql='sql/template.sql',で実行したSQLの結果を、destination_dataset_tableで指定したDatasetのTableに出力するというものです。
ところが、上記のQuery実行結果が、destination_dataset_tableで指定したDatasetのTableにコピーされない事象がありました。
これは、create_disposition='CREATE_IF_NEEDED', というパラメータをつけることで解決しました。
以下サンプルDAGです。
なお、ここでしているSQLは、DAGファイルと同じGCS上のディレクトリに配置すればOKです。


def gen_transform_table_task(table_config):
    transform_task = BigQueryOperator(
        task_id='transform_of_{}'.format(table_config.params['export_table']),
        sql='sql/template.sql',
        destination_dataset_table="{}.{}.{}".format(table_config.params['gcp_project'],
                                                    table_config.params['stage_dataset'] +
                                                    "_summary",
                                                    table_config.params['stage_table'] + "_{{ ds_nodash }}"),
        params={"project_nm": "<your gcp project name>",
                "dataset_nm": "innovators",
                "table_nm": "{}".format(table_config.params['export_table'])},
        write_disposition='WRITE_TRUNCATE',
        create_disposition='CREATE_IF_NEEDED', # ここ
        bigquery_conn_id='<your bigquery conn id>',
        location='US',
        use_legacy_sql=False,
        dag=dag)

    transform_task.doc_md = """\
        # Import table from storage to bigquery
        task documentation
        """
    return transform_task

7. まとめ

Cloud Composerは、いろんなOperatorが用意してあるので、使い方をマスターできればかなり便利そうな感じですね。
これからいろいろキャッチアップしていきたいです。

明日は、@ken_hikita による「kintoneの承認経路を作ってみたときのお話し」です!

8. 参考情報

以下の文献を参考にさせていただきました。