AI Platform Trainingで機械学習モデルの学習バッチを作った話


この記事は、MicroAd Advent Calendar 2020の12日目の記事です。

はじめに

GCP上でバッチ処理を行うにはCloud Functions、Cloud Run、AppEngineなど色々と選択肢がありますが、これらにはタスク完了まで10~15分以内という時間制限があり、数時間・数日のオーダーである機械学習モデルのトレーニングには適していません。

一方でAI Platform Trainingは計算リソースが柔軟に変更でき、時間制限がなかったりと機械学習をするには最高の環境ですが、スケジューリング機能がないためバッチ処理には不向きです。

そこでこの記事ではCloud SchedulerとAI Platform Trainingを連携させ、機械学習モデルの学習処理を定期実行する方法をご紹介しようと思います。

Cloud Schedulerの設定

とりあえず AI Platform Training のジョブ作成リクエストを投げるスケジューラーを組んでみます。

設定が必要なのは基本4項目。

  • 頻度:cronで指定するバッチ処理の実行タイミング
  • JobId: AI Platform Trainingに作成するジョブの名前
  • masterType: 学習に使用するマシンタイプ
  • masterConfig: 学習に使用するDockerImage(詳しくはこちら

しかしこれだけではうまく動きません。
この設定でも一度はAI Platformのジョブを作成できるのですが、2回目の実行では次のようなエラーが出てしまいます。

{
  "error": {
    "code": 409,
    "message": "Field: job.job_id Error: A job with this id already exists.",
    "status": "ALREADY_EXISTS",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.BadRequest",
        "fieldViolations": [
          {
            "field": "job.job_id",
            "description": "A job with this id already exists."
          }
        ]
      }
    ]
  }

どうやらAI Platform TrainingのジョブIDは一意である必要があり、タスク実行毎に変更しなければいけないようです。

ですがCloud Scheduler単体ではリクエストを動的に生成する機能はないので、Cloud SchedulerだけでAI Platformをスケジューリングするのは無理なようです。

しょうがないので間にCloud Functionを経由してジョブIDにタイムスタンプを追加することで無理やりこの問題を解決しました。

実際に作ったCloud Functionはこんな感じです。Cloud Schedulerから送られてきたリクエストにタイムスタンプを追加するシンプルな構成になっています。

import time
import json
import base64
from googleapiclient import discovery, errors


def create_job(event, context):
    """
    Triggered from a message on a Cloud Pub/Sub topic.

    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    project_id = 'projects/test-project'
    ml = discovery.build('ml', 'v1')

    payload = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    print(payload)

    timestamp = str(int(time.time()))
    payload['jobId'] += '_' + timestamp
    request = ml.projects().jobs().create(parent=project_id, body=payload)

    try:
        response = request.execute()
        print(response)
    except errors.HttpError as error:
        print(error._get_reason())

スケジューラーの方もHTTPリクエストを投げるのではなくPub/SubでCloud Functionを呼び出すよう合わせて修正。


新しいスケジューラを動かすと、、、

Cloud Function経由でジョブが作成されているのを確認できました。
期待どうりタイムスタンプが付けられているので、ジョブIDが重複せず何度でもジョブを作成できます。

ちなみに

今回利用した AI Platform Training のブラッシュアップ版である AI Platform(統合型)がリリースされました。まだプレビュー段階なのでリージョンで日本を選択できなかったりと制限が多く業務利用は厳しかったりもしますが、ディスク容量やOSなど計算リソースをより細かく指定できるようになっているようです。今後のアップデートに期待大ですね。