Python + responderで非同期処理して、ついでに処理完了をSlackで通知する方法


この記事は全部俺 Advent Calendar 2018の5日目の記事です。
以前紹介した構成でfire and forgetをやるよりも圧倒的に簡単に非同期処理が実装できるresponderを使った方法を紹介します。

responder is 何?

公式ドキュメントによると、Responder is a web service framework, written for human beings.
つまり、人間のために書かれたWebサービスフレームワークです。
人間のための、というのはPythonを使ってるとよく目にするフレーズですね。
(作者のkennethreitzさんがよく使うフレーズです。)

筆者の環境

name version
macOS 10.14.1
Python 3.7.1
Pipenv 2018.11.26
responsder 1.1.2

インストールとバグフィックスとHello, world!

作者が人間のためのPython開発ワークフローpipenvを作った人なので、敬意を表してpipenvを使っていきます。

pipenv install responder --preを行い、responderをインストールした後、公式ドキュメントに記載されている以下のソースコードを実行してみます。
(That async declaration is optional.と記載されているので、greet_world関数のasyncは削除しています。)

~/app.py
import responder

api = responder.API()


@api.route("/{greeting}")
def greet_world(req, resp, *, greeting):
    resp.text = f"{greeting}, world!"


if __name__ == '__main__':
    api.run()

どうでしょう?
import responderした時点で、ModuleNotFoundError: No module named 'starlette.lifespan'というエラーが出ましたね?
実はこれは最新版のresponderのバグです。
これはIssueにも上がっているように、pipenv install starlette==0.8を行ってstarletteのバージョンを0.8にしてやれば開発します。

2019/1/18追記:修正されました!
※余談ですが、過去にもstarletteのバージョン起因のバグが起きていたようなので、今後もこのエラーが出たら一度starletteのバージョンを変更してみると良いかもしれません。

気を取り直して、もう一度ソースコードを実行し、curl http://127.0.0.1:5042/Helloを実行、あるいはブラウザでhttp://127.0.0.1:5042/Helloにアクセスすると、Hello, world!という文言が帰ってくるはずです。
これでチュートリアルが完了したので、次に非同期処理によるBackground処理を試してみます。

非同期処理(Background処理)

ここにあるソースコードを書いてみます。
全体のソースコードは以下のようになります。

~/app.py
import time

import responder

api = responder.API()


@api.route("/")
def hello(req, resp):

    @api.background.task
    def sleep(s=10):
        time.sleep(s)
        print("slept!")

    sleep()
    resp.content = "processing"


if __name__ == '__main__':
    api.run()

そして、このソースコードを実行してcurl http://127.0.0.1:5042を実行してみると、レスポンスのprocessingという文字列はすぐ返ってくる一方で、標準出力へのslept!は10秒後に出力されるので、非同期実行されていることがわかります!
このままだと結果がわかりにくいので、標準出力にタイムスタンプを出力するようにファイルを書き換えて再度実行してみます。

~/app.py
import time
from datetime import datetime

import responder

api = responder.API()


@api.route("/")
def hello(req, resp):

    @api.background.task
    def sleep(s=10):
        print(f"[sleep] start at {datetime.now()}")
        time.sleep(s)
        print("slept!")
        print(f"[sleep] end   at {datetime.now()}")

    sleep()

    print(f"[hello] start at {datetime.now()}")
    resp.content = f"{datetime.now()} processing"
    print(f"[hello] end   at {datetime.now()}")


if __name__ == '__main__':
    api.run()

そして、同様にこのソースコードを実行してcurl http://127.0.0.1:5042を実行してみると以下のように結果が出力されます。

INFO: ('127.0.0.1', 64969) - "GET / HTTP/1.1" 200
[sleep] start at 2018-12-05 23:01:18.900378
[hello] start at 2018-12-05 23:01:18.900466
[hello] end   at 2018-12-05 23:01:18.900490
slept!
[sleep] end   at 2018-12-05 23:01:28.905147

上記の結果を見ると、はじめにsleep()が呼ばれ、その処理と同時にhello()の処理が行われていることがわかります。
ちなみに、resp.contentの結果は2018-12-05 23:01:18.900485 processingとなっていました。
sleep()開始→hello開始→helloのレスポンス返却→hello終了→slept!出力→sleep終了という順序になっていることが確認できますね。

ここまでで前回の記事と同様のことがサクッと実装できてしまいました。
ここからは、実際の業務を想定して処理完了後にSlack通知を行ってくれるように拡張することを考えていきます。

処理完了後にSlack通知

まず、Slack側の設定から行っていきます。

  1. Slackワークスペース内に、通知を行いたいチャンネルを作成しておく
  2. Slackアプリから、「Apps」の+ボタンを押して、Incoming WebHooksを検索して選択
  3. ブラウザに飛ぶので、画面左の緑色の「Add Configuration」ボタンを押す
  4. 「Post to Channel」にて通知を行うチャンネルまたは個人を選択して、「Add Incoming WebHooks integration」ボタンを押す
  5. 「Webhook URL」に出てくるURLをコピーしておく

次に、python側の処理を行っていきます。
といっても、実質POSTリクエストを飛ばすだけです。

最終的に、以下のような形式になります。

~/app.py
import json
import time
from datetime import datetime

import responder
import requests

api = responder.API()


@api.route("/")
def hello(req, resp):
    @api.background.task
    def sleep(s=10):
        print(f"[sleep] start at {datetime.now()}")
        time.sleep(s)
        print("slept!")
        print(f"[sleep] end   at {datetime.now()}")
        push_slack(f"[{datetime.now()}] background task is complete!!")

    sleep()

    print(f"[hello] start at {datetime.now()}")
    resp.content = f"{datetime.now()} processing"
    print(f"[hello] end   at {datetime.now()}")


def push_slack(msg):
    requests.post(
        # TODO: ここのurlを修正する必要があります
        url="[Incoming Webhook URL]",
        data=json.dumps({
            "text": f"<!here> {msg}",
            "username": "responder bot",
            "icon_emoji": ":ok:",
        }))


if __name__ == '__main__':
    api.run()

こうすると、標準出力とレスポンスとして、先ほどと同様の結果が得られます。
また、Slackには以下のように通知が来ていることが確認できました。

今回は実装しませんでしたが、try: ~ except:と合わせてエラー時には別メッセージで通知したり、開始時にも通知をしたりするとより実用的になると思います。

まとめ

responder、すっごく簡単。
このページに、api.run()コマンドでThis will spin up a production web server on port 5042, ready for incoming HTTP requests.と記載されているので、Flaskにあるような開発サーバ限定ではなく、productionサーバとして使用することも可能です。
GraphQLのサポートもあるので、それについても近いうちに触ってまとめてみたいと思います。