TreasureDataで一定以上の時間がかかっているjobをGASを使ってSlackに通知する


背景

弊社では複数のサービスでTreasureDataを利用しており、ユーザ数も100以上でエンジニアからデータサイエンティスト、ビジネスサイドの人間とさまざまです。

アドホックなクエリを投げた際に、 TD_TIME_RANGE の設定をし忘れて、リソースを大量に食ってしまい、他のクエリに影響を与えてしまうなんてことがよくあります。
もちろん、priorityを使って運用するようにしていますが、実際のサービスで利用しているクエリに影響することもあります。

そこで、一定以上の時間動いているクエリをAPI経由で取得し、それをSlackに通知するGoogleAppsScriptを書きました。
しきい値以上のクエリでも、単にS3などへの外部の書き出しに時間がかかっていることもあるので、リソース状況(Mapper,Reducerの数)も一緒に通知してみました。

Google Apps Script

あまりきれいなものではないですが、動いているのでお許し下さい。
今回はHiveクエリだけを対象にしています。


var THRESHOLD = 1800; // しきい値30分以上
var TDAPIKEY  = '(TreasureData APIKEY)';
var WEBHOOKURL = 'https://(Slack Webhook URL)';


function postSlack(message) {
  var payload = {
    "text" : message,
    "username" : "The Machine", // "Person of Interest"が好きなので
    "channel" : "#treasuredata_alert"
  };
  var params = {
    "method" : "POST",
    "payload" : JSON.stringify(payload)
  };
  var response = UrlFetchApp.fetch(WEBHOOKURL, params);
}

function tdJobList() {
  var options = {
    "method": "GET",
    "contentType" : "application/json",
    "headers" : {
      "AUTHORIZATION" : "TD1 " + TDAPIKEY
    }
  };
  var response = UrlFetchApp.fetch('https://api.treasuredata.com/v3/job/list?status=running&from=0&to=99', options);
  return JSON.parse(response);
}

function tdJobDetail(jobid) {
  var options = {
    "method": "GET",
    "contentType" : "application/json",
    "headers" : {
      "AUTHORIZATION" : "TD1 " + TDAPIKEY
    }
  };
  var response = UrlFetchApp.fetch('https://api.treasuredata.com/v3/job/show/' + jobid, options);
  return JSON.parse(response);
}

function getCpuUsage(jobid) {
  var job = tdJobDetail(jobid);
  var logs = job.debug.stderr.split('\n');
  var cpuUsage = [];
  logs.forEach(function(line) {
    if(line.match(/^Hadoop job information for /)) {
      cpuUsage.push(line)
    }
  });
  return cpuUsage;
}

function createAlertMsg(userName, url, database, query, cpuUsage) {
  var msg =  THRESHOLD / 60 + '分以上実行されているクエリがあります! :dizzy_face: \n';
  msg += 'User: ' + userName + '\n';
  msg += 'Database: ' + database + '\n';
  msg += 'Query: ' + query.split('\n')[0] + ' .....\n';
  msg += '<' + url + '>\n';
  msg += 'Resource Usage:\n';
  msg += '```\n' + cpuUsage.join('\n') + '\n```\n';

  return msg;
}

function parseDate(str) {
  return new Date(str.replace(/-/g, "/").replace("UTC", "+00:00"));
}

function checkTdJobs() {
  var now = new Date();
  var response = tdJobList();
  var jobs = response.jobs
  jobs.forEach(function(job) {
    if (job.type !== 'hive') return;
    var startAt = parseDate(job.start_at);
    var duration = (now - startAt) / 1000;
    if (duration > THRESHOLD) {
      var cpuUsage = getCpuUsage(job.job_id);
      postSlack(createAlertMsg(job.user_name, job.url, job.database, job.query, cpuUsage));
    }
  })
}

通知結果

こんな感じで通知されます。