AWS LambdaからAmazon Elasticsearch Serviceにつないでみる


この投稿は、AWS Lambda Advent Calendar 2017の初日の投稿になります。

初日なので簡単なのをば!

Elasticsearchは今までローカルやプライベートネットワーク上にインストールして使うことが多く、マネージドサービスを使ったことがなかったので、今回Amazon Elsaticsearch Serviceを使ってみました。

で、色々なサイトを参考させてもらい、設定〜インデックス作成まで実施したので、その備忘録を載せます。

準備

準備ですが、クラスタを作成するだけです。当然t2.smallで作成。Elasticsearchのバージョンは5.5を選択しました。他はデフォルトの設定です。

Lambdaからアクセスしてインデックスを作成してみる

AWS Lambdaに適当な関数を作成して、とりあえずアクセスしてみます。こちらを参考にさせてもらいました。

Lambda から elasticsearch service に何かする [cloudpack OSAKA blog]

このコードでアクセスするためには、STSのAssumeRolests:AssumeRoleがIAM Roleに付与されている必要があります。ポイントとなるポリシーの設定はこんな感じです。(もちろんテスト的な設定なので、本来はきちんとアクセス権限を設計しましょう)

        {
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
                "*"
            ],
            "Effect": "Allow"
        }

ソースコードはこんな感じで動きました。ENDPOINT、REGION、ROLE_ARN、S3_BUCKET、S3_OBJECTは環境変数からの定義となります。

import os
import sys

import boto3

sys.path.append(os.path.join(
    os.path.abspath(os.path.dirname(__file__)), 'lib'))
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

ENDPOINT = os.environ['ES_ENDPOINT']
REGION = os.environ['REGION']
ROLE_ARN = os.environ['ROLE_ARN']
S3_BUCKET = os.environ['S3_BUCKET']
S3_OBJECT = os.environ['S3_OBJECT']


def run(event, context):
    es_client = connect_es(ENDPOINT)

    if event['method'] == "create-index":
        s3 = boto3.client('s3')
        index_doc = s3.get_object(Bucket=S3_BUCKET, Key=S3_OBJECT)['Body'].read()
        print(index_doc)
        create_index(es_client, event['index'], index_doc)
        return "Success"
    return es_client.info()

def connect_es(es_endpoint):

    print('Connecting to the ES Endpoint {0}'.format(es_endpoint))
    credentials = get_credential()
    awsauth = AWS4Auth(credentials['access_key'], credentials['secret_key'], REGION, 'es', session_token=credentials['token'])

    try:
        es_client = Elasticsearch(
            hosts=[{'host': es_endpoint, 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection)
        return es_client
    except Exception as E:
        print("Unable to connect to {0}".format(es_endpoint))
        print(E)
        exit(3)

def create_index(es_client, index_name, index_doc):
 try:
  res = es_client.indices.exists(index_name)
  print("Index Exists ... {}".format(res))
  if res is False:
   es_client.indices.create(index_name, body=index_doc)
 except Exception as E:
  print("Unable to Create Index {0}".format("metadata-store"))
  print(E)
  exit(4)

def get_credential():
    client = boto3.client('sts')
    assumedRoleObject = client.assume_role(
        RoleArn=ROLE_ARN,
        RoleSessionName="Access_to_ES_from_lambda"
    )
    credentials = assumedRoleObject['Credentials']
    return { 'access_key': credentials['AccessKeyId'],
             'secret_key': credentials['SecretAccessKey'],
             'token': credentials['SessionToken'] }

コードを見れば分かる通り、S3にアクセスするため、権限の設定が必要になります。

S3に配置するJSONデータの準備

今回はS3にインデックスの設定を保持します。こんな感じのdata.jsonを配置します。この辺はAWS Solution Architectのブログを参考にします。

【AWS Database Blog】AWS Lambda と Pythonを使ってメタデータをAmazon Elasticsearch Serviceにインデクシング

{
    "dataRecord": {
        "properties": {
            "createdDate": {
                "type": "date",
                "format": "dateOptionalTime"
            },
            "objectKey": {
                "type": "string",
                "format": "dateOptionalTime"
            },
            "content_type": {
                "type": "string"
            },
            "content_length": {
                "type": "long"
            },
            "metadata": {
                "type": "string"
            }
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}

Lambdaを実行

以下のJSONをパラメータとして実行します。JSONパラメータを送らなければ、Elasticsearch Clientの情報をReturnするようになってます。

{
  "method": "create-index",
  "index": "sample-index"
}

アクセスしてみる

アクセスしたところ、無事インデックスが作成されました。sample-indexが増えているのがわかります。

まとめ

今回初めてAmazon Elasticsearch Serviceを利用しましたが、準備段階の手間が省けているのはありがたいですね。またAWS上のサービスなので、Lambdaなどの別サービスからのアクセスも簡単でした。

おまけ

ちなみに最初、受け取ったElasticsearch ClientをそのままReturnしていたのですが、その時のエラーメッセージがこんな感じ。一見するとJSONで保持できない=パラメータが異なった?と受け取れます。

{
  "errorMessage": "<Elasticsearch([{'host': 'HOSTNAME.us-east-1.es.amazonaws.com', 'port': 443}])> is not JSON serializable",
  "errorType": "TypeError",
  "stackTrace": [
    [
      "/var/lang/lib/python3.6/json/__init__.py",
      238,
      "dumps",
      "**kw).encode(obj)"
    ],
    [
      "/var/lang/lib/python3.6/json/encoder.py",
      199,
      "encode",
      "chunks = self.iterencode(o, _one_shot=True)"
    ],
    [
      "/var/lang/lib/python3.6/json/encoder.py",
      257,
      "iterencode",
      "return _iterencode(o, 0)"
    ],
    [
      "/var/runtime/awslambda/bootstrap.py",
      110,
      "decimal_serializer",
      "raise TypeError(repr(o) + \" is not JSON serializable\")"
    ]
  ]
}

これ、原因が

    return es_client

es_clientをそのまま返していたから。

    return es_client.info()

とすれば動くのですが、こんなの気づけんって。。。せめてピンポイントでエラー発生行番号が出力されれば話は別ですが、不親切すぎなエラーメッセージ(^^;どこかで改善されることを期待っす。