S3に画像がアップされるとそれを画像分類推論エンドポイントに投げるLambdaの作成


S3バケットに画像がアップロードされるとその画像をLambdaが取りに行ってあらかじめ立ててあるSageMaker推論エンドポイントに投げ、結果を取得するという仕組みを作ります。

結果をAWS SNSに送信させるやり方と、LambdaをCRONを使って定期実行させるやり方もさわりだけ紹介します。

前準備

S3バケット作成
SageMaker推論エンドポイントの作成

inference-endpointとしましょう。
作り方は以下の記事を参考にしてください。

SNSトピックの作成

以下を参考にして作りました(日本語対応はまだみたいです)。

1.Lambdaの作成

トリガーを追加をクリック。

トリガーにはS3を選択。

以下の画像のようにイベントタイプは全てのオブジェクトCreateイベント
PrefixはPictures/を(これで他のフォルダにアップロードされたときは無視できる)
Suffixは.jpgを入れる(これで例えばテキストファイルがアップされたときは無視できる)

inference.py
import logging
import boto3
import json
import urllib.parse

# Initialize logger
customer_logger = logging.getLogger(__name__)

client = boto3.client('sagemaker-runtime')

# バケット名
AWS_S3_BUCKET_NAME = 'gazou-ageru-bucket'
s3 = boto3.resource('s3')
bucket = s3.Bucket(AWS_S3_BUCKET_NAME)


def handler(event, context):

    # S3にアップロードされたファイルをオブジェクトとしてGETしたいとき
    # トリガーイベントから渡された情報からS3のバケット名とアップロードされたオブジェクト名を取得する
    bucketname = event['Records'][0]['s3']['bucket']['name']
    bucket = s3.Bucket(bucketname)
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    obj = bucket.Object(key)
    objbody = obj.get()['Body'].read()

    response = client.invoke_endpoint(
        EndpointName='inference-endpoint',
        Body= objbody,
        ContentType='image/jpeg',
        Accept='application/json'
    )

    body = response['Body']
    result=json.load(body)
    return(result)



# S3バケット内の全てのオブジェクトに対して実行したい場合
'''
for obj in bucket.objects.all():
    key = obj.key
    body = obj.get()['Body'].read()
'''

RuntimeはPython3.7, ハンドラーはinference.handlerとしました。

2.結果をAWS SNSへプッシュ

画像のようにdestinationの追加を押します。

以下のようにあらかじめ作成したAWS SNSトピックに対して非同期(Asynchronous)かつ成功時(On success)の設定で保存します。

SNSのメッセージを整形したい場合

この設定だと推論結果等がJSON形式でSNSにプッシュされます。メッセージを整形したい場合はもう一つLambdaをかませます。以下を参照してください。

3.実行テスト

以下のどちらかの方法でPCやラズパイから画像をアップロードします。

python(boto3)でやる場合の雛形
S3Upload.py
import boto3

s3= boto3.resource('s3')

def function():
    # 画像ファイルの場合
    data = open(filePath, mode='rb')
    s3.Bucket(bucketName).put_object(Key = s3Key, Body = data)
    # テキストファイルの場合
    data = open(filePath, mode='r')
    s3.Bucket(bucketName).put_object(Key = s3Key, Body = data.read())
    return

# または
def function():
    s3 = boto3.client('s3')
    bucket_name = 'バケット名'
    file_name = 'ローカルのファイルパス'
    key_name = 'S3バケットのキー名'

    s3.upload_file(file_name, bucket_name, key_name)

AWS CLIでやる場合
example.sh
aws s3 cp filename.jpg s3://bucket-name

参考:

CloudWatchlogsに保存してあるLambdaのlogか、SNSにプッシュしている場合はSNSに登録しているメールアドレスやSMS等をチェックします。メッセージ整形のLambdaを噛ませていないと長文のJSONが届くはずです。

CRONによる定期実行

※これは今回の例だと直接使えないのでおまけです。CRONでトリガーするとeventとしてS3バケットの名前やオブジェクト名を引いて来れなくなるのでコードを変える必要があります。

トリガーを追加をクリックしてEventBridge(CloudWatch Events)を選びます。

画像のように新しいルールの作成を選びSchedule Expressionの箇所にCRON形式で実行日時を打ち込みます(画像だと毎日午前3時30分に実行する)。

参考:

以上です。