AWS Athenaで複数のクエリを投げてみた


Athenaでクエリを投げる時に色々と苦労したので、まとめてみる。

利用環境はMacです。

【背景】

  • Athenaがクエリを並列で処理できるのが20並列と書いてあったが(AWSのドキュメント)実際に確かめたい
  • athenaに大量のリクエストを送った場合、エラーが出るのか、queueを管理してくれるのか、してくれるならどのようにしてくれるのか知りたい
  • ただ、実際にどう実現すればいいかわからない

こんな課題感で手を動かし始めました。

【実際にやったこと】

  1. Athenaのconsoleからクエリを投げてみた
  2. cliからクエリを大量に投げてみた

これらをどのようにしていったかまとめます。(いろんな記事を参考にしました)

AthenaのConsoleからクエリを投げてみた

以下簡単に、手順をまとめます。

  • S3にデータを配置する
  • AthenaにDBとTableを作成する
  • Athenaからクエリを叩く

S3にデータを配置する

athenaから具体的にクエリを叩きたい元になるファイルをS3にアップロードします。
任意のディレクトリを作成し、その中に保存しました。
今回ファイル形式としてcsvファイル(bzip圧縮)とparquetファイルに対してクエリを叩きました。

AthenaにDBとTableを作成する

DBを作成する際には公式のドキュメントの手段通りに行いました。

Table作成には
CREATE EXTERNAL TABLE構文を用いました。


CREATE EXTERNAL TABLE IF NOT EXISTS DB_name.table_name (
  `e_1` int,
  `e_2` int,
  `e_3` int,
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 'S3 URI'
TBLPROPERTIES ('has_encrypted_data'='false');

もしくはここのCreate tableからも作成することができます。

こちらの記事がとても参考になります。AWS Athena で CREATE TABLE する

Athenaからクエリを叩く


select 
var_pop(CAST(e_1 AS DECIMAL(4, 3))) as var_pop_qe1,
var_pop(CAST(e_2 AS DECIMAL(4, 3))) as var_pop_qe2,
var_pop(CAST(e_3 AS DECIMAL(4, 3))) as var_pop_qe3

from DB_name.table_name
;

今回はe_1, e_2, e_3カラムのデータをDECIMALに変換したのちに分散を計算しています。
このように、Athenaでサポートされている関数や演算子もまとまっていますので、公式ドキュメントをご参照ください

以上が、Amazon Athena Management Consoleからのクエリの叩き方でした。

cliからクエリを大量に投げてみた

athenaのmanagement consoleから大量のクエリを投げるのがどのように行えばいいのかがわからなかったので、cliからlambdaを経由して、大量のクエリを送りました。
以下参考文献です。

今回は上記の参考文献を参考にしてpythonで以下のように記述しました。

import time
import boto3

# athena constant
DATABASE = 'athenaでアクセスしたいDB名'
TABLE = 'athenaでアクセスしたいtable名'

# S3 constant
S3_OUTPUT = '結果を出力したいS3のURI'
S3_BUCKET = 'バケット名'

# number of retries
RETRY_COUNT = 10

def lambda_handler(event, context):
    # created query
    query = (
        """
        SELECT
        AVG(CAST(e_1 AS DECIMAL(4, 3))) as avg_qe1,
        AVG(CAST(e_2 AS DECIMAL(4, 3))) as avg_qe2,
        AVG(CAST(e_3 AS DECIMAL(4, 3))) as avg_qe3
        FROM %s.%s;
        """
    )  % (DATABASE, TABLE)

    # athena client
    client = boto3.client('athena')

    # Execution
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,
        }
    )

    # get query execution id
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id)
    # get execution status
    for i in range(1, 1 + RETRY_COUNT):

        # get query execution
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')

    # get query results
    result = client.get_query_results(QueryExecutionId=query_execution_id)
    print(result)

    # get data
    result_output = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']
    return result

今回、工夫した点は、for文を用いて繰り返し処理を行った点です。

実際に30件ほど送ったらAthenaでqueueされつつ全て実行されました。
一方で、数千件送った場合、cliに以下のようなエラーが返ってきました。

An error occurred (TooManyRequestsException) when calling the StartQueryExecution operation: You have exceeded the limit for the number of queries you can run concurrently. Please reduce the number of concurrent queries submitted by this account. Contact customer support to request a concurrent query limit increase.

一クエリ15秒程度かかる大きなクエリを投げた時に、1分間で実行されたクエリの数(athenaのhistoryで確認)と、実際のクエリにかかった秒数(athenaのhistoryで確認)でおおよその並列数を計算したところ3~4件程度でした。
Athenaはリクエストが投げられてからAWS側でリソースを確保するため、常に最大パフォーマンスが担保されるわけではないそうです。

まとめ

今回明らかになったこと
- Athenaで並列に同時実行するために、cliからlambdaを経由することで実現できた。
- 大量のリクエストを送った場合、athena側で一定queue管理してくれるものの、一定以上のリクエストを送るとToo Many Requestというresponseが返ってきて実行されない

また、何か学びがあれば共有していきたいです