Amazon SQSを使ったPub/Subモデルにより、システム間の依存関係を逆転させる


こんにちは。最近はReact/Reduxでヒーヒー言ってる @unasaka と申します。
Livesense Advent Calendar 2016 その3 10日目です。
Advent Calendarに初参加で緊張します。

今回は、実際にLivesenseのサービスで設計した内容について紹介したいと思います。

はじめに

私が携わっているサービスは、モノリシックなアプリケーションから、徐々にマイクロサービス化の方法が進んでいます。
責務ごとにシステムを切り分けていくうちに、各システムの依存関係が問題となってきました。
具体的な例で言うと、下図の様に、システムAが会員情報を作成、加工していて、システムBが会員情報を参照するようなシステムに分かれた場合、
システムAはシステムBに会員情報の更新を伝える必要があります。
つまり、システムAはシステムBに通知するためのAPIなどの実装を知っている必要があるということになり、
情報を利用する側であるシステムBに対して、システムAが依存する関係になってしまいます。
また、情報を加工するシステムが増えた場合、それらすべてのシステムがシステムBに依存する必要があったり、
逆に、消費するシステムが増えた場合に、システムAは新たに実装を追加しなければいけないことになってしまいます。

システムBがAPIを変更してしまうと、追従する必要があり、大変です。

今回はAmazon SNS、SQSを用いてキューを作成し、Pub/Subモデルを構築することにより、この依存関係を逆転させようと思います。(必要なのはキューなのでSQSだけでも良いのですが、SNSをSQS前に挟むことで、より抽象化できます。)
Pub/Subモデルに付いては、別途色んな資料がゴロゴロしていると思うので省略しますが、下の図のような関係になります。


システムAがキューにメッセージを積む(Publisher)、システムBはそのキューを購読している(Subscriber)。
システムAはメッセージを発行する方法のみ知っており、システムBの実装は知っている必要はありません。
情報が必要なシステムBが依存する形になり、依存関係を逆転させたと言えます。
一度作ってしまえば、会員情報を加工するシステムが増えた場合や、逆に会員情報を消費する側のシステムが増えた場合に対応できます。

今回は、ruby on railsを使ってどのように実装したか紹介したいと思います。
バージョンは以下のとおりとします。

Rails 5.0.0.1
ruby 2.3.0

AWSの準備

SNS, SQSを使い始めるためのAWS諸々の準備は割愛します。

SNS Topicの作成

SNSはCreate Topicからtopic名を入力するだけでOKです。

SQSの作成

キューの作成よりキューを作ります。キュー作成時のオプション等は要件に合わせて変更します(後から変えられるので適当でいいです)

SNSとSQSの結合

SNS Topicの詳細画面からCreate Subscriptionを選択し、先ほど作成したSQSのARNをEndpointと設定すればOKです。

とてもかんたんです。AWS凄いです。
この時、Other subscription actions -> Edit subscription attributesからRaw message deliveryにチェックをつけておけば、SNSが自動で付与する情報を除くことが出来ます。

作成できたら、Publish to topicからメッセージがちゃんとSQSに送信されるかテストしてみましょう。
SQS側でメッセージをポーリングして、受信できればOKです。

Publisher

AWSの準備も終わったので、メッセージを送信してみましょう。
AWSのアレコレとやり取りするには、AWS SDK for Rubyを使います。

Gemfile
gem 'aws-sdk'

いまはSNSに対してメッセージを送信したいので、Aws::SNS::Clientを使います。
APIの詳細は、ドキュメントを参照してください。
といっても、メッセージを送信するコードは以下のように簡単に書けます。

Publisher.rb
class Publisher
  def initialize
    @client = Aws::SNS::Client.new
    @arn = "arn::sns::foo::bar" # 上で作成したSNSのARN
  end

  def publish(message)
    @client.publish({ topic_arn: @arn, message: message })
  end
end

今回は、awsのregionや、アクセスキーなどは環境変数にしまっているので、publishの引数は、宛先とメッセージ本体だけでいいですが、直接指定する場合は、Client.newの引数に詰めてください。
あとはメッセージを送信したいタイミングで、Publisher.new.publish("お気持ち")すればオッケーです。
上記のコードはシングルトンにすべきか、悩みどころです。

実際に動作させてみてSQSへメッセージがたまったことを確認できたら、デキュー側を作ります。

Subscriber

便宜上Subscriberと呼びますが、やっている内容はキューのポーリングです。
SQSのポーリングも、同様に、AWS SDKを使って実装できます。
AWSの諸々の設定はPublisherと同じく、環境変数にしまっているとします。

Subscriber.rb
class Subscriber
  def initialize(sqs_url)
    @poller = Aws::SQS::QueuePoller.new(sqs_url)
  end

  def polling
    @poller.poll(skip_delete: true, max_number_of_messages: 10) do |messages|
      actions = parse_messages(messages)
      yield actions
      @poller.delete_messages(messages) # エラーがなければメッセージをSQSから削除
    end
  end

  private 

  def parse_messages(messages)
    # エラーチェックとか、よしなにメッセージをパースしてやりたいことを取り出したりとかする
  end
end

デキューした際に、エラーが発生したときのことを考えてskip: trueにして、手動で削除するようにしています。
メッセージを受け取った後の動きは、少し特殊かもしれないですが、メッセージをよしなに解析して、実行したいアクションをyieldして呼び出し側で使いやすいようにしています。以下のように使うことが出来ます。

Subscriber.new(sqs_url).polling do |actions|
  actions.each do |action|
    execute(action)
  end
end

私はSubscriberをrakeタスクとして動作させています。
キューからメッセージを受信できて、actionすることが出来たら成功です。
今回構築したシステムは一つ一つ動作確認がしやすくて、比較的AWSの中でも構築しやすいんじゃないかなぁと勝手に思っています。

最後に

このようにすることで、PublisherはSubscriber側の実装変更を気にする必要がなくなり、メンテナンスから開放されます。また、Pub/Subどちら側のシステムが増えたときにも対応しやすくなります。
うちではつい最近になって運用し始めたところで、Pub/Subともに1台ずつで、まだまだ経験値が浅いですが、どちら側のシステム増える計画があるため、このシステムを使って快適に開発を進めていこうと思います。

明日は11日目! @hiro_koba さんよろしくお願いします!