サーバレスにお手軽なデータBotを作ってみよう〜♪ on GCP


この記事は、Classi Advent Calendar 2019 の7日目の記事です。

こんにちは、ClassiのデータAI部で、日々、データと格闘している@yuu_kimyです。
今年も、引き続き、GCP環境でデータ基盤の構築やデータの民主化に関わっています。

今年の8月ぐらいから、正式に、社内のデータ基盤を稼働しています。
名前は、「ソクラテス」と名付けています。データから学びを探り、探究するという意味を込めています。

それに合わせて、社内のデータ活用をより高めていくために、「SQL勉強会」を始めました。こちらの勉強会の内容は、別途、リポートできたらと思っています。

一方で、データの活用を仕組みの面でも後押ししていきたいと思っています。例えば、ちょっとしたデータ抽出でも、定期的にしていくと、それなりの手間になりますが、簡単なBotを作って、自動化にトライしてみたいと思います。

今回の要件

  • BigQueryにあるモニタリング用のテーブルを集計して、必要なデータを抽出する
  • 抽出した結果をSlackの特定チャンネルに配信する
  • 定期的に配信する(週1、月1など)

データBotの構成 on GCP

上記の要件を満たすために、GCPの幾つかのサービスを利用して、こんな風な構成で作ってみたいと思います。

利用する材料(サービス)

  • Cloud Scheduler
    • cronジョブスケジューラ。定期スケジューラとして利用。
  • Cloud Pub/Sub
    • 上記のCloud Schedulerが、ターゲットのPub/Subのトピックへメッセージをパブリッシュ。
  • Cloud Functions
    • Pub/Subをトリガーに、BigQueryへのクエリ実行とSlackへ通知。
  • BigQuery
    • 言わずもがなのDWH環境。
  • Slack
    • 今回は、Incoming Webhookを利用。

レシピ(実装の流れ)

  • 必要なクエリを書く(ここは適宜)
  • Cloud Pub/Subのトピックを作成
  • Cloud Schedulerの設定(ターゲットはPub/Subを選択し、上記で作成したトピックを指定)
  • Cloud Functionsで関数の処理を書く(自分はPythonで書きましたが、お好みで、Node.jsやGoで)
    • BigQueryへのデータを取得する
    • データの取得結果の整形
    • Slackへの通知処理

ポイント

もっと、シンプルに作成するのであれば、Cloud Schedulerのターゲットを「http」にして、Cloud Functionsのエンドポイントを指定する構成がありますが、それだと、特定のクエリしか呼び出せず、他のデータBotを作りたい場合は、全く同じような構成を作る羽目になってしまいます。
今回のように、Pub/SubとSchedulerを組み合わせることで、スケジュールごとにトピックのパブリッシュができるので、基本は、それに対応するクエリを用意すれば、簡易的なBotができるというわけです。

  • Sceduler側の設定(ペイロードの定義)
sample.json
{"botId": "X", "sqlCode": "yyy"}

以下は、Cloud Schedulerの画面のサンプルです。

Cloud Functionsの中身は、こんな風に書いてみました。(一部抜粋)

main.py
from google.cloud import bigquery
import json
import requests
import base64
import sys
from datetime import datetime,timedelta
from pytz import timezone

def send_to_slack(data, context):
    # Pub/Subのメッセージの取得
    pubsub = base64.b64decode(data['data']).decode('utf-8')
    message = json.loads(pubsub)

    if message["botId"] is not None:
        sql_code = message["sqlCode"]
        result = get_bq_data(sql_code)
    else:
        sys.exit(1)

    # Slack通知の実装(ここでは省略)
    post_slack(result)

def get_bq_data(sql_code):
    # 実際は、slql_codeとクエリを対応づけた辞書を作成しておくのが楽そう
    if sql_code  == 'yyy':
        query = "select from yyy ..."
    elif sql_code == 'zzz':
        query = "select from zzz ..."
    else:
        query = "select from other ..."

    # BigQueryへのクエリ実行
    bigquery_client = bigquery.Client()
    rows = bigquery_client.query(query).result()

    # クエリ結果の整形(ここは見せたい内容次第で、結構変わってくるかと思います)
    result = []
    for row in rows:
        result.append("{},\t{},\t{:,}".format(row[0], row[1], row[2]))

    return result

def post_slack(result):

    url = "web_hook_url"

    dt_now = datetime.now(timezone('Asia/Tokyo')) + timedelta(days=-1)
    target_message = "{0:18-04〜%Y-%m-%d時点までの結果}".format(dt_now)

    # Slackへの通知の見せ方次第で変わってくるかと思います
    requests.post(url, data=json.dumps({
        'text': 'サンプルBot; to: @sample-directors',
        'username': 'サンプルBot',
        'icon_emoji': ':robot_face:',
        'link_names': 1,
        "attachments": [
          {
            "fallback": "this is a bot by gcf",
            "text": "{}\n{}".format(target_message ,"\n".join(result)),
            "color": "#3399FF"
          }
        ]
    }))

上記の設定と関数のデプロイをすることで、下記のような簡易的なデータBotが出来上がりました!

※実際に、社内のチーム向けに運用をし始めました。

これで、定期的なデータ抽出のタスクが自動化できました!めでたしめでたし🎉

おわりに

簡単な構成で、サーバレスに実装できるのは素晴らしいですね。
他にも、組み合わせ方次第で、通知の仕組みなども作れるなーと思っています。

明日は、@hailongさんです。どうぞお楽しみに〜♪