Digdagワークフローエンジンを利用したデータ監視ワークフロー作成


Digdagワークフローエンジンを利用したデータ監視ワークフロー作成

この記事はオールアバウトアドベントカレンダー18日目の記事です(投稿のお休みの日もあるので、事実14日目?)。

データエンジニアとして普段からトレジャーデータやBigQueryはもちろん、既存のRDBMSも業務で使っていますが、今回はオールアバウトで提供しているアドネットワーク「プライムアド」の広告配信監視をワークフローで実装してみたので、その概要と失敗談をまとめてみました。
もちろん、アドネットワークの監視はここで紹介するワークフロー以外にも色々の監視体制をとっています。
むしろワークフローによる監視は副次的なもので、データエンジニアの観点で、データからの監視はできないか?という問いから試してみたことになります。

Digdagについて

みなさん、Digdagはご存知でしょうか?オールアバウトではトレジャーデータを利用しているため、自分はトレジャーデータのイベントからDigdagの存在を知ることができ、今回のデータ監視で実際に使ってみることにしました。
Digdagはトレジャーデータがオープンソースとして公開したワークフローエンジンです。
Digdagを選んだ理由としては、プラグインを使うことで、監視結果をslackの指定のチャネルに通知することができるメリットが大きかったです。
メール通知だけだと他のメールにも埋もれがちなので、できるだけslackを活用したかったので。

やりたかったこと

アドネットワークではユーザに対して価値ある広告を配信する必要があるため、正常に広告配信が行われているのかを監視する必要があります。
今回はサーバーの死活監視等、インフラ周りからの監視ではなく、データからではの監視をやってみることにしました。
※サーバーは生きているが、何故か広告配信が行われていない等の予期せぬことへのいち早く気付くのが目的です。

今回の監視方針及び設定は下記のようになります。

監視方針

一定期間の広告Impデータを、同一期間の一週間の平均値と比較し、閾値を超えた場合にはアラートを出す

監視設定

  • 監視感覚 : 5分毎
  • 監視対象 : 15分前~10分前の広告インプレッション(以下、広告Imp)データ
  • 監視ツール : Digdagワークフロー
  • アラート通知 : slackの指定のチャネル

監視方法

  • ファーストステップ : 監視対象のデータを抽出し、監視用テーブルへ格納する
  • セカンドステップ : 監視用テーブルへ今格納したデータと、同じ期間の1週間の平均値と比較する 比較結果が閾値を超えた場合には、アラートを出す 例) 起動時間 : 2018/12/17 10:00:00 監視対象 : 2018/12/17 09:45:00 ~ 2018/12/17 09:50:00 比較対象期間 : 2018/12/10 ~ 2018/12/16の09:45:00 ~ 09:50:00

補足

  • 5分おきにした背景 : ある程度まとまったデータで比較したかったため
  • 起動時間の15分前から10分前を対象にした背景 : ログ収集にfluentdを使っているが、fluentdの仕様上、5分のバッファを持ってるため、直近5分のデータだとまだデータが格納されていない可能性があり、「すでに格納されている最近のデータ」の意味で。
  • データ比較を1日単位にしてない背景 : 広告Impはページビューが発生した際に発生するため、1日のページビューも時間ごとに差が激しく、1日平均と5分間の数値との比較は粒度も違うし、データ的にも比較対象としては適切ではないため ※ちなみに、ページビューの時間ごとの差は下記の図のように山と谷があります。

図時間ごとのページビュー

実装

やりたかったことの説明だけでも長くなりましたが、これからは実装した内容をまとめてみます。

テーブル設計

5分おきで取得するデータを格納するテーブルの設計を行いました。
※DB名やテーブル名は仮になります。

DWH : トレジャーデータ
DB : hogehoge
Table : watch_5minutes_each

Columns

カラム名 概要 データ型
time unixtime int
target_date 集計対象日 string
target_hour 集計対象時 string
target_minute 集計対象分 string
ad_imp 集計対象期間の広告Imp数 int

Digdag導入(テスト環境)

下記サイトを参照して下記の手順でDigdagおよび関連ツールをインストールしました。
Treasure Data社のOSSワークフローエンジン『Digdag』を試してみた

Digdag入手

curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"

実行権限付与

chmod +x /usr/local/bin/digdag

Java入手

wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u181-b13/96a7b8442fe848ef90c96a2fad6ed6d1/jdk-8u181-linux-x64.rpm

Javaインストール

rpm -ivh jdk-8u181-linux-x64.rpm

スクリプト作成

監視方法のファーストステップとセカンドステップに該当する、監視対象データの抽出や格納、その後の比較を行う処理はBashスクリプトを利用したので、詳細については割愛しますが、それぞれのステップごとに1個ずつのスクリプトを用意しました。
※スクリプト名も仮です。

  • ファーストステップ : get_data.sh
  • セカンドステップ : diff_check.sh

ワークフローのディレクトリ構成及び関連ファイル

ワークフローのディレクトリ構成は下記のようにしています(もちろん、名称は仮です)。

/既存バッチシステムのどこか/workflow/ <- ワークフローのルート
/既存バッチシステムのどこか/workflow/config/ <- ワークフローの環境設定ファイル格納
/既存バッチシステムのどこか/workflow/shells/ <- bashスクリプトファイル格納
/既存バッチシステムのどこか/workflow/yml/ <- slack通知用のYMLファイル格納

ルート

実際にDigdag Schedulerで起動されるファイルが格納されています。
※このSchedulerについては、色々トライアル・アンド・エラーを繰り返したので、後程改めてまとめてみます。

ad_watch.dig
# タイムゾーンを指定する
timezone: Asia/Tokyo

_export:
  # Digdagの環境ファイルをインポートする
  !include : config/ad_watch_slack.dig

# 5分おきにスケジューラーを起動する
schedule:
  minutes_interval>: 5

# 15分前~10分前の広告Impデータを取得し、監視用テーブルへ登録する
+data_get_before_15minutes:
  sh>: shells/get_data.sh

# 上記で取得したデータをベースに、1週間の同じ時間帯の広告Impと比較し、ALERT_CRITERIA 以下の場合は通知する
# ALERT_CRITERIA : diff_check.shの内部変数
+diff_check:
  sh>: shells/diff_check.sh

  # diff_checkの結果がエラーの場合、YMLファイルを参照してslackへ通知する
  _error:
    slack>: yml/diff_check_danger.yml

環境設定ファイル

プラグインなど、Digdagの環境変数になります。
ちなみに、本番とステージング環境ではそれぞれの設定値を持っているため、環境ごとの設定ファイルを用意し、環境ごとにシンボリックリンクを使っています。

ad_watch_slack.dig
# slackエラー通知用プラグイン
plugin:
  repositories:
    - https://jitpack.io
  dependencies:
    - com.github.szyn:digdag-slack:0.1.2
# 通知対象となるslackのwebhook url
webhook_url: https://hooks.slack.com/services/hogehoge/fugafuga
# ワークフロー名
workflow_name: ad_watch
# 既存バッチを利用するため、バッチを起動するユーザーのAPP HOME
USER_BATCH_HOME: /hogehoge/fugafuga
# 環境情報
ENV: develop

slack通知用のYML

これが今回使いたかったslack通知用の情報ファイルです

diff_check_danger.yml
# slackユーザー名
username: Digdag
# slack通知用絵文字
icon_emoji: ':ghost:'
# slack通知用チャネル (ad_watch_slack.digのwebhook urlと同じチャネル)
channel: '#hogehoge'
# .digファイルで指定した環境変数を使える(${ENV}等)
attachments:
- fallback: '[FALSE] ${workflow_name} workflow'
  color: "bad"
  text: '*[FALSE]* `${workflow_name}` Workflow'
  mrkdwn_in:
  - text
  - pretext
  - fields
  fields:
  - title: Task Name
    value: "${task_name}"
    short: false
  - title: Session Date
    value: "${session_local_time}"
    short: true
  - title: Environment
    value: "${ENV}"
    short: true

Digdag Scheduler導入

Digdagのワークフローファイル(上記の場合、ad_watch.dig)を作成したら、digdag schedulerを起動することで.digファイルの「schedule」ブロックが設定した間隔で自動で起動されます(ad_watch.digだと5分間隔)。

ただ、単純に下記のようにした場合は、ユーザーがサーバーからログアウトした場合には、schedulerが起動しなくなるため、schedulerをバックエンドで動かす必要があります。

ng_sample.cmd
digdag scheduler
ok_sample.cmd
digdag scheduler &

ここからがトライアル・アンド・エラーになりますが、どのような問題があったのかを見ていきましょう。

トライアル・アンド・エラー

権限関連トラブル

よくある話で、みなさんもステージングユーザーと本番環境ユーザーは別ユーザーにしていることが多いと思います。
今回のワークフローを導入したシステムでも、ステージング環境と本番環境のユーザーが異なるため、下記のような問題が発生していました。
(説明のため、ステージングユーザーは「user_stg」、本番ユーザーは「user_pro」とします。)

1.Digdagを起動する専用ユーザーを試してみた
digdag関連サービスを起動するための「digdag」ユーザーを作成し、そのユーザー権限で「digdag scheduler」を起動してみましたが、
(当たり前ですが)digdagユーザーでは既存バッチスクリプトが置いてある「user_stg」や「user_pro」のホームディレクトリが参照できないため、digdag schedulerは動いても、既存バッチ内の「ad_watch.dig」が参照できず、何のスケジュールも動かすことができませんでした。
-> digdagユーザーを利用することを諦めた(専用ユーザーである以外のメリットは特になかったため)。

2.user_proの場合、まれではあるがターミナル経由で本番調査をする際に使ってるため、digdag schedulerを起動するには不適切。
ヒューマンエラーで、user_proで本番環境のdigdag schedulerをkillしてしまったら、せっかく作った監視機能が台無しになるため、user_proでdigdag schedulerを起動するのはNG。
-> itdユーザーでdigdag schedulerを起動することは原則NG。

3.rootユーザーでdigdag schedulerを起動してみた
digdag schedulerの場合、起動時にオプション指定をしないと、各ユーザーの「~/.config/digdag/config」が基本設定パスとなる。
rootユーザーの場合、digdagに関する設定ファイルなど一切持ってないし、digdagに関する設定ファイルはバッチ内部に埋め込んだので、このままではNG。
->digdag schedulerを起動する際に、--projectオプションを追加して「ad_watch.dig」ファイルが参照されるようにした。

ok_sample2.cmd(rootユーザーで実行)
digdag scheduler --project user_proのホームディレクトリ/既存バッチシステムのどこか/workflow --task-log ログを書き出したいディレクトリ &

ちなみに、digdag schedulerの起動ログも確認するために、上記には「--task-log」も追加しています。

既存バッチへ埋め込んだことによるトラブル

既存バッチの豊富な設定ファイルをそのまま利用したかったため、今回のワークフローを既存バッチに埋め込んだが、それによる副作用もあったので、まとめます。

1.ユーザーごとのバッチホームディレクトリが異なる
開発者(dev_jeon)と本番での起動ユーザー(user_pro)の場合、バッチホームディレクトリが異なります。
そのため、開発環境でバッチホームを指す変数「USER_BATCH_HOME」(仮)の値を、ユーザー毎に設定していますが、本番のdigdagを起動するのはrootであるため、そのような変数を持ってません。
どうすればいいのか悩んで結果、digdagの変数設定ファイル「ad_watch_slack.dig」に同じ名前の変数名と値を登録することで解決できました。
これによって追加開発が発生する場合にも、各開発環境の値を変更することで、本番環境への影響なく開発することができます。
例)
dev_jeonの場合
USER_BATCH_HOME: /dev_jeon/batch
dev_hogehogeの場合
USER_BATCH_HOME: /dev_hogehoge/batch

さいごに

普段の業務でデータ関連処理を行うことは多いですが、インフラ周りはそこまで機会がなかったため、インフラに強い方から見ると、「え?こんなミスするの?」と思われったかもしれません。
現状はオールアバウトの素晴らしいSREグループの力を借りて、上記の問題はすべて解決済みで、現時点でも5分おきにデータを利用した監視が行われています。
せっかくDigdagを使えるようになりましたので、今後も色々トライアルしてみたいと思います。
みなさんの方で使われてるワークフローの参考になる使い方がありましたら、ぜひご意見ください!