Lambdaから指定したS3に入出力するGlueを起動させる話


背景

仕事の中でAWSのGlueを使う機会があり、その際に指定したS3からデータを読み込み、Glueでデータを加工して別の指定したS3バケットにデータを出力することを行ったので、その流れと方法をメモがてら残しておこうと思います。

構築内容

S3(Input)->Glue->S3(Output)

引数(パラメータ)を取得

# パラメータを取得
args = getResolvedOptions(sys.argv, [
                          'JOB_NAME', 'source_path', 'out_path'])
# 各パラメータを変数におく
source_path = args['source_path']
out_path = args['out_path']

今回はLambdaから2つの引数の'source_path', 'out_path'を受け取る.

S3からデータを受け取り

# 指定したS3のバケット配下のファイルをすべて読み込む
DataSource0 = glueContext.create_dynamic_frame_from_options(
    connection_type = "s3",
    connection_options = {'paths': source_path, 'recurse': True},
    format="json")

引数で受け取ったsource_pathを入れ、指定したバケットのパスの配下にあるファイルを全て読み込む

別のS3バケットに出力する

# 指定したS3のバケットにファイルを出力する
df.write.options(header=True, codec="gzip")\
    .mode('overwrite').csv(out_path)

指定したバケットのパスへcsvの形式でgzipに圧縮してから出力する

Lambdaで引数をつけてGlueを起動する

from logging import getLogger, INFO
from datetime import datetime, timedelta, timezone
from app.libs.sns.sns_client import SnsClient
import boto3
import os
import json


logger = getLogger(__name__)
logger.setLevel(INFO)

# 起動するglueのジョブ名
# JOB_NAMEは環境変数
JOB_NAME = os.environ['JOB_NAME']
# エラー通知のSNSトピック
ERROR_PUBLISH_SNS_ARN = os.environ['ERROR_PUBLISH_SNS_ARN']


class TriggerJob:
    def __init__(self):
        self.sns = SnsClient()

    def handler(self, event):
        start_arg = {}
        try:

            # Glueのジョブを実行する
            start_arg = {
                '--out_path': "<指定のアウトプットするs3のパス>"
                '--source_path': "<指定のインプットするs3のパス>"
            }

            glue = boto3.client('glue')
            # glueJob起動
            response = glue.start_job_run(
                JobName=JOB_NAME,
                AllocatedCapacity=2,
                Arguments=start_arg)
            logger.info("response:{}".format(response))
            return response
        except Exception as err:
            logger.error("Glueジョブの起動でエラーが発生しました。err: {}".format(err))

最後に

今回はlambdaでGlueを指定のS3から入力と出力をするコードを載せました。
実際は他の処理とかもあり、関係ない部分は削除しておりますが、不要なものがまだあるかもしれないですがご了承ください。
ご参考になれば幸いです。