AWS Lambdaを直列に動かす Step Functions編


はじめに

この記事は、社会人2年目 AWS、python初心者が勉強したことをまとめたものです。
間違っている点、改善点等ございましたご指摘いただけると大変ありがたいです。

この記事について

2つのLambdaを直列に動かす方法について記載します。
直列に動かすとは、Lambda1が正常終了した後Lambda2を動かすことを指しています。

Lambdaを直列に動かす方法として以下の二つが浮かびました。

  • Step Functionsを利用
  • SQSを利用

Step Functionsを利用する利点は、Lambda関数を一切変更する必要がない点です。
SQSを利用する場合は、一つ目のLambdaの返り値でSQSを呼び出す必要があるため、場合によっては変更が必要です。

本記事ではStep Functionsを利用した方法について説明します。

構築するシステム

Function1が正常終了したらFunction2を開始させるという極めてシンプルなワークフローです。

それぞれのLambda Functionはテーブルのレコードを全件取得して、テーブル定義が同一の別のテーブルに全件INSERTする関数です。

このシステムをStep Functionsにより構築します。

構築手順

Lambda Function作成

以下二つのLambda Function func1とfunc2を作成します。

func1はオリジナルテーブルの全レコードをコピー1テーブルにINSERTする関数です。
func2はコピー1テーブルの全レコードをコピー2テーブルにINSERTする関数です。

  • オリジナルテーブル:SushiInfo
  • コピー1テーブル:SushiInfo_copy1
  • コピー2テーブル:SushiInfo_copy2

※テーブル名が変な名前で申し訳ありません。。。

func1.py

import pymysql

def lambda_handler(event, context):

    connection = pymysql.connect(host = 'host',
                                 user = 'user',
                                 password = 'pass',
                                 db = 'sushi',
                                 charset = 'utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor,
                                 autocommit=False)

    with connection.cursor() as cursor:
        sql = "select * from SushiInfo"
        cursor.execute(sql)
        record_list = cursor.fetchall()

        sql = """
        INSERT INTO SushiInfo_copy1(
        id, price, memo, created_at, updated_at)
        VALUES(%s, %s, %s, %s, %s)
        """
        cursor.executemany(sql,[list(record.values()) for record in record_list])

        connection.commit()
func2.py

import pymysql

def lambda_handler(event, context):

    connection = pymysql.connect(host = 'host',
                                 user = 'user',
                                 password = 'pass',
                                 db = 'sushi',
                                 charset = 'utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor,
                                 autocommit=False)

    with connection.cursor() as cursor:
        sql = "select * from SushiInfo_copy1"
        cursor.execute(sql)
        record_list = cursor.fetchall()

        sql = """
        INSERT INTO SushiInfo_copy2(
        id, price, memo, created_at, updated_at)
        VALUES(%s, %s, %s, %s, %s)
        """
        cursor.executemany(sql,[list(record.values()) for record in record_list])

        connection.commit()

Step Functions作成

コードスニペットを以下の通り設定します。
以下の要領でつなげてあげれば2つ以上のLambdaを直列に動かすことも可能です。

{
  "Comment": "series functions!!",
  "StartAt": "func1",
  "States": {
    "func1": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:************:function:func1",
      "Next": "func2"
    },
    "func2": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:************:function:func2",
      "End": true
    }
  }
}

結果

実行できました。

レコードがコピーされていることも確認されました。

最後に

Step Functionsを利用することで簡単にLambdaを直列に動かすことが可能です。
次はSQSを利用した直列実行も実装してみたいと思います。