digdagでtd>結果をJavaで取得


前タスクの結果を次タスクに受け渡すとき、
digdag変数は環境変数になっているのでJavaの中から取得可能(Java以外でも環境変数取ればいい)
td>のlast_job_idとか

xxx/XXX.dig
+step1:
  td>: XXX.sql
+java:
  sh>: java -jar xxx.jar
xxx_java/src/main/java/xxx/XXX.java
package xxx;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;

import org.supercsv.io.CsvListReader;
import org.supercsv.io.ICsvListReader;
import org.supercsv.prefs.CsvPreference;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.treasuredata.client.TDClient;
import com.treasuredata.client.model.TDResultFormat;

public class XXX {
    public static void main(String[] args) throws Throwable {
        TDClient client = TDClient.newClient(); // get config from ~/.td/td.conf
        String tdJson = System.getenv("td"); // 直前のtd>がdigdag変数にセットしたものを環境変数から取得
        System.out.println(tdJson); // {"last_job_id":"9999999", "xxx":"xxx"}
        String jobId = new ObjectMapper().readValue(tdJson, Td.class).last_job_id;
        client.jobResult(jobId, TDResultFormat.CSV, new Function<InputStream, String>() {
            public String apply(InputStream is) {
                try {
                    ICsvListReader reader = new CsvListReader(new InputStreamReader(is), CsvPreference.STANDARD_PREFERENCE);
                    List<String> list;
                    while ((list = reader.read()) != null) {
                        System.out.printf("XXX:[%s] XXX:[%s]%n", list.get(0), list.get(1));
                    }
                    reader.close();
                    return null;
                } catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
}

@JsonIgnoreProperties(ignoreUnknown = true)
class Td {
    public String last_job_id;
}
xxx_java/build.gradle
buildscript {
    repositories {
        jcenter()
    }
    dependencies {
        classpath 'eu.appsatori:gradle-fatjar-plugin:0.3'
    }
}

apply plugin: "java"
apply plugin: "application"
apply plugin: "eclipse"
apply plugin: 'eu.appsatori.fatjar'

repositories {
    mavenCentral()
}

dependencies {
    compile 'com.treasuredata.client:td-client:0.7.28',
            'ch.qos.logback:logback-classic:1.1.7',
            'net.sf.supercsv:super-csv:2.4.0',
            'com.fasterxml.jackson.core:jackson-databind:2.8.4'
}

compileJava {
    options.encoding = 'UTF-8'
}

mainClassName = 'xxx.XXX'

fatJar {
    baseName = 'xxx'
    manifest {
        attributes 'Main-Class': 'xxx.XXX'
    }
}

task copyJar (type: Copy, dependsOn: fatJar) {
    from 'build/libs'
    include 'xxx.jar'
    into '../xxx'
}

task wrapper(type: Wrapper) {
    gradleVersion = '3.1'
}
xxx_java/src/main/resources/logback.xml
<configuration/>