.NET 6 と Daprを使った分散サービス開発 その7 Rabbit MQ と Input Binding


Input Binding (入力バインディング)による連携

今まで、以下の流れで作業を行ってきました。今回はDaprに備わっているInput Binding (入力バインディング)の中で、Rabbit MQにenqueueしたらトリガされる実行に触れていきたいと思います。

今までの前提の上で入力バインディングの機能を追加しますので、このページから読まれている方は、前提として以下を読んできてください。


Input Bindingについて

前回の記事の再掲となりますが、Input Binding (入力バインディング)は、例えば5分毎(イベント)でサービスをコール(トリガ)してほしい、Azure Storage Queue / Amazon SQS / Apache Kafkaなど所謂キューに、enqueue(イベント)されたら、サービスをコール(トリガ)してほしいなどのケースで用いる事ができます。

イベントに対して、トリガされた処理を行う、Azure Functions や AWS Lambdaなどが、馴染み深いと思いますが、これらと同じような感じでイベントに対して処理を行う仕組みを構成する事ができます。

バインディングの一覧は、Input Binding (入力バインディング)、Output Binding (出力バインディング)共に以下に記載があります。

Rabbit MQ バインディングを試す

これらのInput Binding (入力バインディング)で、今回はキューにメッセージがenqueueされたら、指定されたAPIがトリガリングされる仕組みを試していきます。

Rabbit MQの準備

まず、ローカルの環境でRabbit MQが動作する環境を準備します。以下のようなコマンドでロードしました。TCPポート 5672,5673,15672をEXPOSEしています。

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 5673:5673 -p 15672:15672 rabbitmq:3-management

以下のようにDokcerでロードされていれば、大丈夫です。今回は管理コンソール付きでロードしています。

$ docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED         STATUS                  PORTS                                                                                                                   NAMES
097a36fc518b   rabbitmq:3-management   "docker-entrypoint.s…"   9 minutes ago   Up 9 minutes            4369/tcp, 5671/tcp, 0.0.0.0:5672-5673->5672-5673/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   some-rabbit
ba8a9bdb62ab   daprio/dapr:1.6.0       "./placement"            2 weeks ago     Up 12 hours             0.0.0.0:6050->50005/tcp                                                                                                 dapr_placement
808b6358b4e7   openzipkin/zipkin       "start-zipkin"           2 weeks ago     Up 12 hours (healthy)   9410/tcp, 0.0.0.0:9411->9411/tcp                                                                                        dapr_zipkin
bed085e54a77   redis                   "docker-entrypoint.s…"   2 weeks ago     Up 12 hours             0.0.0.0:6379->6379/tcp                                                                                                  dapr_redis

Rabbit MQ管理コンソールへのアクセスとQueueの準備

ローディングされたら、 http://localhost:15672/ の管理コンソールにアクセスしてください。デフォルトユーザとパスワードは guest / guest でアクセスできます。

次にExchangeを作成します。確認したところ、現時点ではExchangeを作成する事をこのコンポーネントでは前提としているようです。はまりやすい所ですが、Queueでは無く、上部のタブからExchange を選択し、daprexchangeという名称で、新たにExchangeを作成します。

Exchange作成後、Queueタブからも作成したExchangeが確認できます。

componentフォルダにrabbitmq.yamlファイルを追加

statestoreを扱った際に、statestore.yamlを追加したと思いますが、同じようにrabbitmq.yamlを追加します。

rabbitmq.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: Rmq
spec:
  type: bindings.rabbitmq
  version: v1
  metadata:
  - name: queueName
    value: daprexchange
  - name: host
    value: amqp://guest:guest@localhost:5672
  - name: durable
    value: true
  - name: deleteWhenUnused
    value: false
  - name: ttlInSeconds
    value: 60
  - name: prefetchCount
    value: 0
  - name: exclusive
    value: false
  - name: maxPriority
    value: 5
  - name: contentType
    value: "application/json"
scopes:
- worker-service

また、仕様を確認してみた所、POSTメソッドでの呼び出し固定となっています。(おそらく変更方法がありますが、いったんデフォルト)つまり、この場合/Rmq へPOSTの実装をRmqControllerに行えばOKとなります。

前回用いたプロジェクトをそのまま利用します。

今回は、前回用いたWorkerServiceプロジェクトにコントローラーを追加します。
前回の記事はこちらから確認ください。

プロジェクトの変更

リクエストのモデルをプロジェクトに作成しておきます。RmqRequest.cs をプロジェクトに追加しておきます。

RmqRequest
namespace WorkerService;

public class RmqRequest
{
    public String Message { get; set; }

}

RmqController.cs を新規にControllersに追加します。POSTされたら、リクエストをコンソールに表示するだけのコードです。

RmqController.cs
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;

namespace WorkerService.Controllers;

[ApiController]
[Route("[controller]")]
public class RmqController : ControllerBase
{
    private readonly ILogger<RmqController> _logger;

    public RmqController(ILogger<RmqController> logger)
    {
        _logger = logger;
    }

    [HttpPost(Name = "PostRmqQueue")]
    public void Post(RmqRequest request)
    {
        Console.WriteLine((string)request.Message);

        return;
    }
}

起動と確認

編集を終えたら、tye runで起動しましょう。

$ tye run
Loading Application Details...
Launching Tye Host...

[14:44:48 INF] Executing application from tye.yaml
[14:44:48 INF] Dashboard running on http://127.0.0.1:8000
[14:44:48 INF] Build Watcher: Watching for builds...
...

今までと同じ要領で、Tye ダッシュボード http://127.0.0.1:8000 をブラウザで開きましょう。まず、worker-serviceのLogを開いておきます。

まずは、マッピングされたworker-serviceでSwaggerを起動して、プロジェクト単体で動作テストを行いましょう。

スクリーンショットの例では、 http://localhost:57810/swagger で起動しています。
ここから、POSTしてみて・・・・

Tyeのダッシュボードからログにもメッセージ string が表示されています。

次にRabbiq MQの管理コンソールからMessageを追加しましょう。管理コンソールでは、以下のようにDaprからのconsumer接続も確認できます。

以下のようなJsonをメッセージとして発行します。

Rabbit MQから送るメッセージ
{
  "message": "DaprDaprDaprDaprDaprDaprDaprDaprDaprDaprDapr"
}

Tyeのダッシュボードのworker-serviceのログにもメッセージが表示されています。