GoogleAppEngineで作る、SQLをGitHubにpushするとBigQueryを定期実行するマン


この記事はリクルートライフスタイル Advent Calendar 2018の4日目の記事です。

こんばんは

CETというチーム@mihirat です。
最近ではいくつかの新規サービス開発で、ちょこちょこフロントやサーバー書かせてもらったり、ちょっとしたSRE的な役回りをしていたりします。
また、現在「Jupyterだけで機械学習が実サービス展開できる基盤」(slideshareblog)の開発などをやってます。ぜひご一読ください。

今回は、社内で活用しているBQ定期実行アプリのご紹介です。

背景

弊社では分析や可視化にTableauを利用することが多いのですが、激重なクエリが描画のたびに発行されて待ち時間が長い・BQの計算資源を使いまくるという問題がありました。

そこで、「プログラムを書かなくても、よしなにBQに定期実行してテーブルを自動更新してくれるアプリケーションがあればなぁ」という話になったので突貫で作ってみたのですが、だんだん利用が増え、現在では100クエリ以上がこのアプリケーション上に登録されています。

「いやいや、スケジュールクエリ機能ってBQにありますよ?」と思われた方もいると思いますが、次のようなメリットがあります。

  • クエリのレビュー・共有
    • 強い分析官やデータサイエンティストの方から直接レビューを受けることができます。溜まりにくいSQLの知見が共有でき、チームのSQL力が向上します。また、「この前処理ってどう書いたらいいのかな?」な場合のリファレンスになります。
  • 野良クエリの撲滅
    • BQのスケジュールクエリは大変便利ですが、野良定期実行クエリとは管理側からすると恐ろしい機能です。このアプリがあれば、簡単に定期実行ができかつ管理下におけるので、懸念が減ります。

作ったもの

全体像はこちら。

処理の流れの概要としては、

  1. GAEを定期的にkick
  2. kickされたGAEは、GCSに配置されたSQLや設定ファイルを取得し、キューに積む
  3. キューのConsumerとなるGAEが別に存在し、積まれたキューに従ってクエリを実行し、もろもろ処理する

というシンプルなものになっています。

ポイント1. TaskQueue

TaskQueueとは、GAE付属のキューの機能です(doc)。適当なyamlを書くだけで、キューの最大待ち長さ、消化速度、リトライ回数などが指定できます。

queue:
- name: bqcultivator
  rate: 1/m # 消化速度
  bucket_size: 100 # キューをためておける数
  retry_parameters: 
    task_retry_limit: 1
  max_concurrent_requests: 5 # 同時実行数

これにより、クエリ発行の速度を絞れるほか、リトライの自動化などもできます。
キューに積む処理は非常に簡単で、

task := taskqueue.NewPOSTTask("/bqcultivator/maketable",
                              url.Values{
                              "sql_file_name": {sc.SqlFileName},
                              "project_id": {sc.ProjectID},
                              "target_project_id":{sc.TargetProjectID},
                              "dataset_id": {sc.DatasetID},
                              })
taskqueue.Add(ctx, task, "bqcultivator")

のように書くだけでキューに積まれます。キューの管理がいらないのでとても楽です。

ポイント2. CronJob

CronJobは最近CloudSchedulerとしてスピンアウト?しましたが、GAEのエンドポイントを定期的に実行してくれる機能です(doc)。こちらもyamlを書くだけで設定できます。

- description: hourly check
  url: /sqlenqueuer/task/hourly # app endpoint
  schedule: every 1 hours from 00:00 to 23:00
  timezone: Asia/Tokyo

これだけで、該当するGAEのエンドポイントを定期的に実行してくれます。hourly, daily, weeklyなどを用意しています。

ポイント3. dispatch

GAEは1プロジェクトあたり複数のGAEアプリケーションをデプロイでき、それらはURLベースで振り分けができます(doc)。

dispatch:
  - url: "*/sqlenqueuer/*"
    service: sqlenqueuer
  - url: "*/bqcultivator/*"
    service: bqcultivator

これで、CronJobがkickするエンドポイントで、該当するGAEを結びつけます。

アプリケーション部分

sqlenqueuer
CronJobで叩かれるエンドポイントを持ち、叩かれるとGCSにあるファイルをキューに積みます。
設定ファイルには、クエリのオプション(テーブル名、実行時間、Truncateするかどうか)が記載されてます。

bqcultivator
キューを消化して、BQにクエリを発行します。キューは勝手にPOSTされるので、特にキューについての処理を書く必要はなく、POSTに対しての処理だけ書いてあればOKです。
実行結果はslackに連携され、失敗していた場合はリトライリンクもセットにし、メンション付きで以下のようにユーザーに通知します。

2週間くらいで作ったときの超初期バージョンのコードです、もし興味があればご参照ください。クオリティ。。。

このアプリの使い方

  1. 利用者は、SQLと設定ファイルのyamlをGithub(Enterprise)にpushし、レビューを受ける
  2. マージされると、GCSにSQLと設定ファイルがCI/CDでアップロードされる
  3. GAEによって、GCSに配置されたSQLを定期実行
  4. 設定ファイルに従ってデータマートが作成されたり、結果をS3にファイルをおいたりする(後述)

追加機能:BQへのクエリの結果をそのままAPI化

CETチームでは、汎用APIと呼ばれる基盤を構築・運用しています。
2カラムのCSVを指定のS3にアップロードすると、そのCSVの1カラム目がkey, 2カラム目がvalueとなって、HTTPリクエストでkeyで引くとvalueが返却されるAPIを構築してくれる基盤です。valueには文字列はもちろんのこと、配列やJSONも埋め込み可能です。
詳しくはこちら。より詳細についてはTechBlogに書く予定です。(放置しててすみません)

今回のアプリは当初データマート作成だけしていたのですが、数ヶ月前にこのAPI基盤に連携する機能をリリースしました。
例えばBigQueryにしか置いてないデータを集計した結果や、BQ MLを使った簡単な機械学習の結果をS3に配置することで、そのままAPIにすることができます。
イメージとしては

SELECT user_id, some_json FROM `awesome_data.awesome_mart.awesome_table`

と書くと、 https://endpoint/key/を叩くと some_jsonが返却されるAPIが自動で作成されるものです。
awesome_tableを定期的に見に行くので、APIのレスポンスも日次などで更新されます。主にデータサイエンティストの方から好評の機能で、ABテストの高速化に貢献しています。

終わりに

話は変わり、最近Argoというプロダクトを検証しています。

が全部お互いによしなに云々してくれる(まだ何もわかっていない)らしく、バッチ処理など全てをこれに寄せるととてもハッピーな気がしています。
argo化すると、Helmよりも複雑なシステム全体をパッケージ化して共有・展開できそうなので、今回紹介したような基盤もargo化していけると良さそう?と期待しています。検証記事もそのうち書けるよう頑張ります。

よいお年を!