nodeからTreasureDataに接続する


nodeからTreasureDataに接続する方法をメモ

コード

var TD = require('td');

var tdSelectData = (function() {
    var td_databases = 'test_db'; // DB名
    var td_api_key = '123/123456789987654321'; // APIキー
    var client = new TD(td_api_key); // TDクライアント
    var td_query = 'select * from test_table'; // 実行クエリ
    client.prestoQuery(td_databases, td_query, function(err, results) {

        console.log('=============TD results = ' + JSON.stringify(results) + '\r\n');

        const MAX_TIME = 600; 
        var time = 0;
        (function loop() {

            if (time < MAX_TIME) {

                client.showJob(results.job_id, function(err, r) {

                    console.log('TD JobStatus = ' + r.status + '\r\n');

                    if (r.status == 'error') {
                        console.log('TD Job error\r\n');
                        return false;
                    }

                    if (r && r.status == 'success') {
                        client.jobResult(results.job_id, 'csv', function(err, tdData) {
                            return tdData;
                        });
                    } else {
                        console.log('============this job is still processing...  waiting 5 seconds\r\n');
                        sl.sleep(5);
                        time += 5;
                        loop();
                    }
                });


            } else {
                console.log('timeout error\r\n');
                return false;
            }

        }());

    });
}());

解説

client.prestoQuery(td_databases, td_query, function(err, results){

prestoのjobを実行します。
callbackのresultsには以下のような値がJSON形式で返ってきます。

{ job: '123456789',
  database: 'test_db',
  job_id: '123456789' }

(function loop() {

こちらではTDのJOBの結果がSuccessになるまで、ループしています。
JOBの結果待ち時間は600秒としています。

client.showJob(results.job_id,function(err, r){

jobの結果を返します。
callbackのrには以下のような値がJSON形式で返ってきます。
実行中はstatusがrunning、完了後はsuccess、失敗はerrorとなります。

{ query: 'select * from test_table',
  type: 'presto',
  priority: 0,
  retry_limit: 0,
  duration: null,
  status: 'running',
  cpu_time: null,
  result_size: null,
  job_id: '123456789',
  created_at: '2017-12-12 02:22:03 UTC',
  updated_at: '2017-12-12 02:22:03 UTC',
  start_at: '2017-12-12 02:22:03 UTC',
  end_at: '',
  num_records: null,
  database: 'test_db',
  user_name: 'test_user',
  result: '',
  url: 'https://console.treasuredata.com/jobs/123456789',
  hive_result_schema: null,
  organization: null,
  debug: {省略}

client.jobResult(results.job_id, 'csv', function(err, tdData) {

jobの結果を出力します。
callbackのtdDataにはcsv形式でクエリの結果が出力されます。
各行ループする際は以下のようなコードを記載すればよいと思います。

client.jobResult(results.job_id, 'csv', function(err, tdData) {
   csv.parse(tdData, function(err, rawData) {
       console.log(rawData);
   });
});

参考