aws-sdk-goを使ってAthenaの実行結果を取得する


別の記事で、aws-sdk-goからAthenaのクエリを実行しました。
その際結果取得はaws-sdk-goのathena使わずに、s3経由でDLしちゃったほうがいいと書いたんですが、やはり使おうかなと思ったので、メモ代わりに簡単な解説とサンプルを残しておきます。

前提

aws-sdk-go (1.12.44)

GetQueryResults

公式ドキュメントの通り、GetQueryResultsInputを作って呼び出します。
最大取得件数を設定できて、かつページングが実行できるためなかなか便利です。

GetQueryResultsInput(引数のstruct)

QueryExecutionId, NextToken, MaxResultsを設定します。
NextTokenに前回のAPI呼び出しで返却されたtokenを渡すと、自動でページングして次の結果をMaxResults分返却してくれます。

GetQueryResultsOutput(返り値のstruct)

NextToken, ResultSetが含まれます。
ResultSetは前回の記事で記載したとおり、なかなか厄介な形になっています。

公式ドキュメントより
type ResultSet struct {

    // The metadata that describes the column structure and data types of a table
    // of query results.
    ResultSetMetadata *ResultSetMetadata `type:"structure"`

    // The rows in the table.
    Rows []*Row `type:"list"`
    // contains filtered or unexported fields
}

type Row struct {

    // The data that populates a row in a query result table.
    Data []*Datum `type:"list"`
    // contains filtered or unexported fields
}

type Datum struct {

    // The value of the datum.
    VarCharValue *string `type:"string"`
    // contains filtered or unexported fields
}

下記のような関係です。

struct property 中身
ResultSet Rows: []Row CSVのカラム数 x MaxResults分のデータ
Row Data: []Datum 1行分のデータ
Datum VarCharValue: *string 1カラムのデータ

注意点

1.Datumがnilの場合がある

下記のように、[]stringに入れ替えようとするとpanic起こす場合があります。

ダメな例
// rowはRow
sl := make([]string, len(row.Data))
for i, val := range row.Data {
    sl[i] = *va.VarCharValue
}
安全策
sl := make([]string, len(row.Data))
for i, val := range row.Data {
    if val == nil || val.VarCharValue == nil {
        sl[i] = ""
    } else {
        sl[i] = *val.VarCharValue
    }
}

2.一番最初のレスポンスの一行目がheaderになる

tokenなしでの初回リクエスト時、最初のRowはCSVのheaderが返ってきます。
https://stackoverflow.com/questions/45075754/amazon-athena-how-to-store-results-after-querying-with-skipping-column-headers

将来的にはきっとAPIのパラメータで制御できるようになるはず、、

サンプルまとめ

packageの全体は前記事を参照してください。

var GetAthenaQueryResult = func(id *string, token *string, hits int64) ([][]string, *string, error) {
    input := &athena.GetQueryResultsInput{
        QueryExecutionId: id,
        NextToken:        token,
        MaxResults:       &hits,
    }

    output, err := athenaClient.GetQueryResults(input)
    if err != nil {
        return nil, nil, err
    }

    ret := make([][]string, hits)
    index := 0

    for i, row := range output.ResultSet.Rows {
        if i == 0 && token == nil {
            // tokenなし(初回)リクエストの場合、
            // csvのheaderが最初に返ってくるため無視する
            continue
        }
        line := make([]string, len(row.Data))

        for j, val := range row.Data {
            if val == nil || val.VarCharValue == nil {
                line[j] = ""
            } else {
                line[j] = *val.VarCharValue
            }
        }

        ret[index] = line
        index++
    }

    return ret, output.NextToken, nil
}

呼び出し元はこんな感じです。

var rows [][]string
var token *string
var err error

queryId := "hogehoge"
hits := 100

for {
    rows, token, err = client.GetAthenaQueryResult(&queryId, token, hits)
    if err != nil {
        return err
    }

    fmt.Println(rows)

    // dataがもうなければ終了
    if token == nil {
        return nil
    }
}