AWS LambdaでTwitterScraperを使おうとして動かなかったで調べてみた


アイカツ!についてのツイートを毎日収集したいという欲があり、手動でやるのも面倒なのでAWS Lambda上で毎日出力するように自動化を実施したいと思っている。

まずはAWS Lambda上で、Lambda LayersにTwitterScraperのライブラリ(1.4.0)を登録して、ざっくり動作検証レベルで以下のようなコードを実装してテスト実行してみた。

from twitterscraper import query_tweets
import datetime as dt

def lambda_handler(event, context):

    begin_date = dt.date(2020,6,5)
    end_date = dt.date(2020,6,6)
    pool_size = (end_date - begin_date).days

    tweets = query_tweets("アイカツ", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang="ja")
    tuple_tweet=[(tweet.user_id, tweet.tweet_id, tweet.text.replace("\n","\t"), tweet.timestamp) for tweet in tweets]

    return True

そうすると以下のような「pool」がないですよ的なエラーがAWS Lambda上で出力されてしまう。

{
  "errorMessage": "name 'pool' is not defined",
  "errorType": "NameError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 14, in lambda_handler\n    tweets = query_tweets(\"アイカツ\", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang=\"ja\")\n",
    "  File \"/opt/python/twitterscraper/query.py\", line 246, in query_tweets\n    pool.close()\n"
  ]
}

普通にJupyter Notebook上では動いているので、Lambda Layersの何かがミスっているのかもと思い、そもそも変数「pool」って何?ということを調べてみる。

どうやらTwitterScraperのquery.pyの変数らしい。

query.py
def query_tweets(query, limit=None, begindate=dt.date(2006, 3, 21), enddate=dt.date.today(), poolsize=20, lang=''):
    no_days = (enddate - begindate).days

    if(no_days < 0):
        sys.exit('Begin date must occur before end date.')

    if poolsize > no_days:
        # Since we are assigning each pool a range of dates to query,
        # the number of pools should not exceed the number of dates.
        poolsize = no_days
    dateranges = [begindate + dt.timedelta(days=elem) for elem in linspace(0, no_days, poolsize+1)]

    if limit and poolsize:
        limit_per_pool = (limit // poolsize)+1
    else:
        limit_per_pool = None

    queries = ['{} since:{} until:{}'.format(query, since, until)
               for since, until in zip(dateranges[:-1], dateranges[1:])]

    all_tweets = []
    try:
        pool = Pool(poolsize)
        logger.info('queries: {}'.format(queries))
        try:
            for new_tweets in pool.imap_unordered(partial(query_tweets_once, limit=limit_per_pool, lang=lang), queries):
                all_tweets.extend(new_tweets)
                logger.info('Got {} tweets ({} new).'.format(
                    len(all_tweets), len(new_tweets)))
        except KeyboardInterrupt:
            logger.info('Program interrupted by user. Returning all tweets '
                         'gathered so far.')
    finally:
        pool.close()
        pool.join()

    return all_tweets

おそらくpool = Pool(poolsize) なのかなーと思い、この変数をtry句から外してAWS Lambdaを実行してみる。

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 14, in lambda_handler\n    tweets = query_tweets(\"アイカツ\", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang=\"ja\")\n",
    "  File \"/opt/python/twitterscraper/query.py\", line 233, in query_tweets\n    pool = Pool(poolsize)\n",
    "  File \"/opt/python/billiard/pool.py\", line 995, in __init__\n    self._setup_queues()\n",
    "  File \"/opt/python/billiard/pool.py\", line 1364, in _setup_queues\n    self._inqueue = self._ctx.SimpleQueue()\n",
    "  File \"/opt/python/billiard/context.py\", line 150, in SimpleQueue\n    return SimpleQueue(ctx=self.get_context())\n",
    "  File \"/opt/python/billiard/queues.py\", line 390, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/opt/python/billiard/context.py\", line 105, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/opt/python/billiard/synchronize.py\", line 182, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/opt/python/billiard/synchronize.py\", line 71, in __init__\n    sl = self._semlock = _billiard.SemLock(\n"
  ]
}

エラー内容が「Function not implemented」なので、どうやらマルチプロセスのbilliardライブラリが原因。マルチプロセスはAWS Lambdaで使用できないからのよう。

対応

twitterscraperのissuesに同様の現象についての記載がある。

github上のpullrequestに回避方法の実装があるので、quert.pyをpullrequestの内容に置換すると回避できた。
https://github.com/taspinar/twitterscraper/pull/280/commits/685c5b4f601de58c2b2591219a805839011c5faf

関数「query_tweets」に渡すときの変数「poolsize」を使ってマルチプロセス数を設定しているので、明示的に0にした場合マルチプロセス化しないような実装にしている。