コレオグラフィがワカランので実装してソースコード全公開してみる。


コレオグラフィとオーケストレーション

中央の制御ポイントに依存するのではなく、システムの各コンポーネントが、ビジネス トランザクションのワークフローに関する意思決定プロセスに参加するようにします。

参考:コレオグラフィ

一般的なアーキテクチャだと、

参考:コレオグラフィ

クライアント要求があり、それを1つのサーバーが受け取り、サービスA、サービスB、サービスCといったサーバーに指令を送ります。例えば、ユーザー登録をした場合、ユーザーの情報をDBへ保存する。ユーザーのメールアドレスにWelcomeメールを送る。といった作業を行います。このようなこういったシステムをオーケストレーションと言い、最初にクライアントを受け取るシステムをオーケストレーターと言います。音楽のオーケストラを思い浮かべると想像がつきやすいのですが、1人の指揮者が居て、それが、いろんな楽器に指示し、音を鳴らす。といったイメージです。

参考:コレオグラフィ

一方でコレオグラフィは、メッセージブローカーがあり、そこにイベント情報が入っています。サービスA,サービスB,サービスCは、それぞれが興味のあるイベントのみを購読しており、そのイベントが発行されると、動作する。といったアーキテクチャになっています。例えば、「ユーザー登録」というイベントが来ると、ユーザー登録サービスはユーザーの情報をDBへ保存します。また、同様に「ユーザー登録」のイベントが来ると、メール登録サービスは、Welcomeメールをユーザーへ送ります。

ここまで読んでみて、コレオグラフィの実装イメージってつきました?僕はつきませんでした。言葉だけはちょくちょく見るんですが、参考の実装っぽいものがあんまり見られませんでした。そこで、自分で作ってみた。というのがこの記事です。

コレオグラフィの疑問

  1. 1つのメッセージで複数のイベントを発火できるの?
  2. メッセージキュー(MQ)って1つで出来るの?
  3. レスポンスってどうするの?

 前述したとおり、「ユーザー登録」というイベントに対して、「DBに永続化する。」「Welcomeメールを送る。」という2つの動作をします。この場合、イベントを発行する側としては、イベントの発行は1つで済ませたい。という要望があります。例えば、新たに「登録者限定クーポン発行システム」をつなげたい。と考えます。そういったメッセージ購読側が増えても、発行側には変更が無いほうがベストです。これが、1の内容です。
 先ほどの「ユーザー登録」を拡張して、「ユーザー管理システム」というものを作ると考えます。そうしたときに、「ユーザー削除」も同様のシステムで管理したいと思います。ということは、同一のMQに「ユーザー登録」「ユーザー削除」の2つのメッセージが流れることになります。そうしたときでも適切にハンドルできるか。ということが気になります。そういったとき、2つのMQが必要?それとも1つのMQで済む?といった疑問が2になります。
 ユーザー登録をした場合、ユーザーIDや名前、メールアドレスを組にしたjsonを返したりすると思います。しかし、コレオグラフィのシステムを見ると、間にMQが挟まっています。これは、値を返すことが出来ないのでは?と思いました。一般的に、MQにメッセージを投げる場合は、メッセージは投げっぱなしになって、それがMQに永続化出来たか、出来なかったか。ぐらいしか分かりません。そのため、処理した結果を受け取るのは難しい。と思います。では、それは実際無理なのか?出来るのか?疑問に思ったのが3になります。

サンプルとなるシステム

とても簡単なシステムで、先ほどから例に出てる「ユーザー管理」のシステムです。

  • ユーザー登録するとユーザー情報が保存される
  • ユーザー削除するとユーザー情報が破棄される

という2つの機能しかもっていません。

しかし、ある日、

  • ユーザー登録するとWelcomeメールを送る

という仕様が追加されました。そのため、コレオグラフィをアーキテクチャに導入し、以下のような構成にしようと考えました。

external_user_apiはユーザーからの情報を一次受けし、その情報をメッセージキューへ流します。user_apiやmailerはそのメッセージの内容に反応して、Welcomeメールを送ったり、ユーザーを削除したりする。ということを想定しました。

実際に作ったシステム

コードはこちら
https://github.com/kotauchisunsun/choreography_sample

実装はPythonでAPIのフレームワークはFastAPI。MQはRabbitMQ。MQへの接続するライブラリはRabbitMQ公式がお勧めしてるpika。あと、メールの送信の確認のために、MailCatcher

基本的には、

$ git checkout https://github.com/kotauchisunsun/choreography_sample.git
$ cd choreography_sample
$ docker-compose up

で動くハズです。

ExchangeとRouting

一般的に、MQを用いたシステムは3つのコンポーネントがあります。

引用:HelloWorld!

メッセージを発行するPublisher、メッセージを購読するConsumber、そして、メッセージを管理するメッセージキューです。
RabbitMQには、それに加えて、ExchangeとroutingKeyという機能が存在してます。

引用:Routing

ここで、Xに示されるのがExchangeで、orange,black,greenで示されるのが、routingKeyにあたります。
Publisherは、メッセージとともにroutingKeyを指定して、メッセージを送ります。例えば、orangeというroutingKeyを付けて、XというExchangeにメッセージをPublishすると、ExchangeがメッセージをQ1へルーティングします。同様に、blackというroutingKeyを指定してメッセージをPublishすると、Q2へメッセージを送ります。今回、コレオグラフィを実現するにあたって、create_userとdelete_userをroutingKeyにすることで、1つのExchangeにPublishすることで、メッセージをそれぞれのキューへルーティングすることを実現しています。

引用:Routing

また、上図のように、Q1にblack、Q2にもblackというroutingKeyを指定しています。この場合は、Exchangeに送られた1つのメッセージが、それぞれのキューに対し、コピーされます。例えば、publisherが"hello"というメッセージをblackというroutingKeyを付けてpublishすると、Q1に"hello"というメッセージが入り、Q2にも"hello"というメッセージが入ります。今回のシステムでは、create_userというroutingKeyを2つのキューに指定することで、「ユーザー登録」と「Welcomメールの送信」という2つの動作を1度のPublishで実現しています。

レスポンス問題

コレオグラフィを用いたときに、どうやってレスポンスを受け取るのか?というと、近いものがRabbitMQのRPCに書かれています。

クライアントからサーバーへキューを介して、メッセージをサーバーへ送ります。その後、レスポンス用のキューを用意し、サーバーはレスポンスの内容をキューへ送り、クライアントはメッセージを読み取る。という仕組みです。当たり前と言えば当たり前ですが、実際できるの?という感じです。

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

クライアント側を一部抜粋して、説明すると、重要な部分はbasic_publishの部分です。このpropertiesの内容に、reply_toという属性があり、ここに、callback_queueが仕込まれています。これがレスポンス用のキューです。その後、self.connection.process_data_events()がループで呼び出されます。メッセージが来ると、on_responseself.responseが書き換わり、ループを抜け、intにパースされて返す。という処理です。MySQLなどのDBはテーブルそのものを作ったり、削除したり、ということはあまり動的に行いませんが、RabbitMQは簡単にキューを作ったり、消したり出来るので、こういう所業が出来るようです。

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

サーバー側も簡単です。ここでもキーとなるのはbasic_publishで、ここでrouting_keyprops.reply_toが指定されています。これは、publish側で指定したreply_toです。このように、メッセージを送る際に、propertiesとして、レスポンスを返す先のキューを指定することで、RPCを実現しているようです。
 と、ここまで書きましたが、あんまり使わなさそうです。そもそもRPCの要求だと、gRPCなどの代替技術があるので、あまりメリットがなさそうな印象です。そもそもなぜMQを使うか。というところに立ち返ると、1つ1つの処理に時間がかかるからです。例えば、ユーザーからREST APIで受け取っており、裏側をMQのRPCにしたとします。MQで処理に時間がかかるのであれば、REST APIとして、ユーザーに提供できる代物ではなく、タイムアウトが頻発するAPIになります。では、MQで処理に時間がかからないのであれば、それは裏側もREST APIで良くない?となります。そうなると、直接ユーザーに提供する機能ではなく、サーバー間通信に使うものとするならば、それは、gRPCで良くない?となります。
 コレオグラフィは、イベント発行側が一方的にイベントを発行することで、他のサービスが協調動作するのが良い点なんだと思います。そうすることで、協調動作するサーバーが増えても、イベント発行側の変更なしに機能追加が出来るのが便利だと思います。そこを返り値を受け取るようにしてしまうと、協調サービスが増える度に、レスポンスの数も変わってしまい、発行側と協調側が密結合する形になります。そのようなことから、コレオグラフィでは、イベント発行側は返り値を受けないようにする。というのがベターな選択肢と思いました。

顧客が本当に欲しかったもの

コレオグラフィの問題は、障害時の回復です。私が作ったシステムだと、メールのシステムがダウンすると、ユーザー登録はされるが、Welcomeメールが送られない。という障害が起こりえます。また、user_apiがダウンすると、ユーザー登録は失敗しているのに、Welcomeメールが送られる。というよく分からない状態が起こりえます。そして、純なコレオグラフィに傾倒しすぎており、レスポンスもないので、クライアントからすると、ユーザー登録が成功したのかすら分からないです。

最終的にこんなアーキテクチャの方が良かったかも。と思いました。user_apiは責務として、データベースへの永続化と、イベントの発行を担います。こうすることでレスポンスを返すことができます。また、MQにもイベントを発行しているので、拡張性もあります。例えば、MQがダウンした場合、DB側はトランザクションを張っておき、その内部でMQにPublishするようにしておけば、DBに書き込まれることはありません。逆にDBがcommitに失敗したにも関わらず、MQにメッセージが発行されてしまった場合は、consumer側がuser_apiにユーザーの存在確認を走らせ、存在すれば送る。という方式にすると、ユーザー登録に失敗したユーザーにWelcomeメッセージを送る挙動を防げます。

疑問への回答

  1. 1つのメッセージで複数のイベントを発火できるの?
    -> できる。複数のキューに同一のroutingKeyを指定することで、1回のメッセージ発行で複数種類のイベント発火が可能(create_userによるユーザー登録とWelcomeメール送信)

  2. メッセージキュー(MQ)って1つで出来るの?
    -> キューは複数必要。ただし、メッセージ発行側は1つのExchangeに対し、routingKeyを変えて、Publishすることで、別々のイベントを発火させることができる。(create_userとdelete_user)

  3. レスポンスってどうするの?
    -> メッセージを発行するときにプロパティとして、レスポンスを入れるキューを指定することで、返り値を得ることができる。ただ、あまり使わなさそう。

感想

圧倒的にめんどくさい

 今回、docker-composeで環境を整えましたが、コンテナが7つあります。そのうち、RabbitMQとMailCatcherは既存のものなので、自作したコンテナは5つあります。こんな検証ごときに、Dockerfile書きたくないんですよ。できれば、既存のものだけで済ませたい。
 最初は、user_apiのみを作っていました。そこから、external_user_apiはMQへpublishをするように作って、create_userを購読するようにuser_apiを改造しようか。と思いました。しかし、「実際、プロダクション運用することを考えると、user_apiとして稼働してたapiが、MQを購読する変更することはないぞ?」と気づき、create_user_consumerが生まれました。そうすると、create_user_consumerがuser_apiのAPIを叩く必要があり、そうすると、swaggerのツールチェインを入れて、swaggerからclientを生成し、それを結合して、ホスト名を設定して・・・という風に工数が雪だるま式に増えていきました。また、疑問点を検証するため、Consumerが3台必要でした。システム全体を疎結合にするために、docker-composeから、環境変数への引き渡しなども必要になったので、疎結合の糊のコストが無駄に高かったです。
 良かったポイントとしては、わかりやすいミドルウェアを選定したことです。RabbitMQはWebUIがついており、キューやExchange、メッセージの内容を見ることができます。また、MailCatcherもWebUIがついており、どんなメールが送られたかを確認することができます。また、FastAPIでAPIを構築していたので、swaggerのWebUIがついていて、ここから、手軽にAPIの検証ができたため、検証のループが非常に回しやすかったです。
 今回、コレオグラフィの検証でしたが、実質はRabbitMQのお勉強という感じでした。今は、こういう用途なのはRedisの方が優勢ですかね?ただこれらをクラウドで動かすとなると結構厄介な話もあって、最近のサーバーはステートレスだったり、そもそもリクエストが来ないと起きてない場合(Herokuとか)も多いので、こういうコンシューマって立てにくいんじゃないかな?と思ってたりもします。そういったコンシューマが立てられるインフラ回りもちょっと追っておかないとなぁ。と思った、検証でした。
 これ、コレオグラフィじゃないよ!もっといいサンプルあるよ!とご存じの方がいらっしゃったら教えてください。