Lambdaを使ってTwitter定期投稿botを作ろう


1.基本構成

  • ツイートデータはAmazon S3に格納
  • Lambda→S3へのアクセス権限はIAMで設定
  • Cloud Watch Eventsで1時間ごとにLambda Functionを発火
  • Lambdaを使ってS3のテキストを検索し、内容をTwitter REST APIへ連携

2.S3に格納するデータ様式

csv形式でツイートする時間とツイート内容を記載する。
即席なのでテキストファイル使ったけど、ここはNoSQL使ったりしてもよいかも……。
Example)
[ツイート時間],[ツイート内容]

3.IAM Roleの設定

下記の通りに,S3アクセスを許可するIAMポリシーを作成し、IAM Roleにアタッチする。


{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": "*"
        }
    ]
}

4.Lambda Function の作成

Lambda Functionを作成し、トリガー(一時間ごとにテキストファイルを検索)は
Cloud Watchを使用する。

作成するLambda関数のプログラムは下記の通り。
仕組みとしては、「テキストファイルに記載されている時間」と「現在時刻」が合致したら
Twitter REST APIにツイート情報を連携するというもの。


import json
import boto3
import re
import datetime
from requests_oauthlib import OAuth1Session
S3 = boto3.client('s3')

def lambda_handler(event, context):
    # TODO implement
    #バケットファイルからテキスト一覧を取得
    BUCKET = S3.list_objects_v2(Bucket="daily-tweets",Prefix="DAILY_TWEETS")
    for content in BUCKET['Contents']:
    #~~.txtファイルを検索
        if re.match(r'^[^\.]+\.txt',content['Key']):
            obj=S3.get_object(Bucket="daily-tweets",Key=content['Key'])
            l=obj['Body'].read().decode('utf-8').split(",")
            time=datetime.datetime.utcnow() + datetime.timedelta(hours=9)
            print (time.strftime("%H") )
            #時間データと照合して合致したらツイート
            if time.strftime("%H") == l[0]:
                print (l[1])
                CK = '****' # * enter your Consumer Key *
                CS = '****' # * enter your Consumer Secret *
                AT = '****' # * enter your Access Token *
                AS = '****' # * enter your Accesss Token Secert *
                UPDATE_URL = 'https://api.twitter.com/1.1/statuses/update.json'
                twitter = OAuth1Session(CK, CS, AT, AS)
                req = twitter.post(UPDATE_URL,params={"status":l[1]})
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

これだけでは、「一回だけ」起動するプログラムなので、定期実行のためにCloud Watch Eventsにcronを設定する。
設定値は下記にすると、1時間ごとにツイートされる。(cronを変えれば10分毎・この曜日だけ、とかも可能です。)
cron(0 * * * ? *)

注)
from requests_oauthlib import OAuth1Session →こちらはAWSの標準では使えないのでlayerを作る必要があります。詳しくは
下記記事が参考になります。
https://www.yamamanx.com/aws-lambdapython-twitter-post/

5.イカしてないところ

  • APIのキーを直接書くな! →secret managerとかで何とかできるかも…
  • S3である必要なくない? →簡単さを重視してS3使ったけどNoSQLとか使えるかもね…?
  • API Gateway使ってtweet登録できるようにしたほうよいかも…(ただWEBに晒すサービスとなるとセキュリティ面少し心配…)

6.おわりに

ものすごくザックリ、作成したツイートボットの解説していきました。なので「こっちの構成の方がいいよ!」とか
あったらどしどしツッコミ入れてください。