PHP + AWS SDKでAthenaでクエリ実行から結果の取得まで。2018-08-13


概要

PHPでAWS Athenaからデータを取得します

ポイント

SQLを発行して結果を取得する、という流れですが、MySQLなどの場合と違うのが、

  1. クエリの発行
  2. クエリが終わるのを待つ
  3. 結果の取得

という最低3回はリクエストしなければいけないところです。

(MySQLなどの場合は「クエリを発行してその結果を得る」という1回のリクエストで済む)

準備

aws-sdk

aws-sdk-php を使います

% composer require aws/aws-sdk-php
composer.json
{
    "require": {
        "aws/aws-sdk-php": "^3.64"
    }
}

それ以外

  • Athenaでselectできる準備はできている
  • composer自体の導入などは割愛

コード例

example.php
<?php

require './vendor/autoload.php';

$options = [
    'region' => 'us-east-1',
    'version' => 'latest'
];
$athenaClient = new Aws\Athena\AthenaClient($options);

$databaseName = 'DATABASE_NAME';
$sql = 'select account_id from TABLE_NAME limit 3';
$outputS3Location = 's3://BUCKET_NAME/';

// 1. クエリの発行
// * データベースの指定
// * Athenaは結果をS3に出力するので出力先の指定
// * SQL指定
$startQueryResponse = $athenaClient->startQueryExecution([
    'QueryExecutionContext' => [
        'Database' => $databaseName
    ],
    'QueryString' => $sql,
    'ResultConfiguration'   => [
        'OutputLocation' => $outputS3Location
    ]
]);

$queryExecutionId = $startQueryResponse->get('QueryExecutionId');
var_dump($queryExecutionId);

// 2. クエリが終わるのを待つ
$waitForSucceeded = function () use ($athenaClient, $queryExecutionId, &$waitForSucceeded) {
    $getQueryExecutionResponse = $athenaClient->getQueryExecution([
        'QueryExecutionId' => $queryExecutionId
    ]);
    $status = $getQueryExecutionResponse->get('QueryExecution')['Status']['State'];
    print("[waitForSucceeded] State=$status\n");
    return $status === 'SUCCEEDED' || $waitForSucceeded();
};
$waitForSucceeded();

// 3. 結果の取得
$getQueryResultsResponse = $athenaClient->getQueryResults([
    'QueryExecutionId' => $queryExecutionId
]);
var_dump($getQueryResultsResponse->get('ResultSet'));

実行結果

(IDなど一部結果をmaskしています)

% php example.php
string(36) "QueryExecutionId-XXXXX"
[waitForSucceeded] State=SUCCEEDED
array(2) {
  ["Rows"]=>
  array(4) {
    [0]=>
    array(1) {
      ["Data"]=>
      array(1) {
        [0]=>
        array(1) {
          ["VarCharValue"]=>
          string(10) "account_id"
        }
      }
    }
    [1]=>
    array(1) {
      ["Data"]=>
      array(1) {
        [0]=>
        array(1) {
          ["VarCharValue"]=>
          string(15) "id1"
        }
      }
    }
    [2]=>
    array(1) {
      ["Data"]=>
      array(1) {
        [0]=>
        array(1) {
          ["VarCharValue"]=>
          string(15) "id2"
        }
      }
    }
    [3]=>
    array(1) {
      ["Data"]=>
      array(1) {
        [0]=>
        array(1) {
          ["VarCharValue"]=>
          string(15) "id3"
        }
      }
    }
  }
  ["ResultSetMetadata"]=>
  array(1) {
    ["ColumnInfo"]=>
    array(1) {
      [0]=>
      array(10) {
        ["CatalogName"]=>
        string(4) "hive"
        ["SchemaName"]=>
        string(0) ""
        ["TableName"]=>
        string(0) ""
        ["Name"]=>
        string(10) "account_id"
        ["Label"]=>
        string(10) "account_id"
        ["Type"]=>
        string(7) "varchar"
        ["Precision"]=>
        int(*)
        ["Scale"]=>
        int(0)
        ["Nullable"]=>
        string(7) "UNKNOWN"
        ["CaseSensitive"]=>
        bool(true)
      }
    }
  }
}

まとめ

PHPでAthenaから結果が得られました

補足

getQueryResults で得られるarrayの内容が少々扱いづらい。
AthenaはS3にCSVファイルとして結果を書き出すのでそちらを利用しても良い。

{BUCKET_NAME}/{QueryExecutionId}.csv

というKeyで出力される.

上述の結果例のCSVは以下の通り

BUCKET_NAME/QueryExecutionId-XXXXX.csv
account_id
id1
id2
id3

(カラムが一個しかなくてわかりづらいですがCSVファイルです)

追記補足

[PHP + AWS SDKでAthena] getQueryResultsの結果が扱いづらいので扱いやすくする
getQueryResults で得られるarrayの内容が少々扱いづらいので、変換についてまとめました