Redisを使って雑に分散処理をする


みなさん雑に分散処理を実装したくなることありますよね?私はあります。

複数台のサーバに処理を分散して最後にそれらの結果をまとめて受け取ってごにょごにょするみたいなよくあるパターンです。

この手のものを実装しようとなると大抵KafkaみたいなメッセージングキューだったりStorm, Flinkみたいな分散処理エンジンを導入するわけですが、データは小さいしExactly Onceもフォールトトレラントもいらないみたいなケースでは解決したい問題に対して構成が重くなりがちです。

そんな時、Redis一つで雑に解決できたりするのでその方法を紹介します。

Redisとは

RedisとはオープンソースのインメモリNoSQLで一般にKVSとしてキャッシュなどに使われることが多いです。

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker.

しかし、公式にもあるようにキャッシュだけでなく実は多様なデータ構造をサポートするデータベースやメッセージブローカーとして使うこともできます。

今回はそのサポートするデータ構造の一つであるList型とメッセージブローカーとしての機能のPub/Subを用いることで分散処理を実装していきます。

アーキテクチャ

                                             rpop
                              +-------+   ---------->   +----------+
                   lpush      |       |     publish     | Worker 1 |
   +--------+   ---------->   |       |   <----------   +----------+
   | Master |    subscribe    | Redis |       rpop
   +--------+   <----------   |       |   ---------->   +----------+
                              |       |     publish     | Worker 2 |
                              +-------+   <----------   +----------+

List型をlpush, rpopでFIFOすることによってキューを実現し、Pub/Subで処理結果をMaster側へと戻しています。
それぞれはキーによって一意に識別されるため処理ごとにキーを分けることで複数の分散処理を繋ぎ合わせることも可能ですし、本末転倒ですがRedis Clusterを用いることでRedis自体をスケールさせることができます。
ただ雑な分散処理ゆえrpop後にWorkerが落ちてしまうと結果が欠損してしまうため、Master側でpublishされた結果を数えるなどしてそれを検知する実装は必須でしょう。

サンプルコード

これ以上特に書くことがないのでRubyのサンプルコードを置いておきます。

Master側では reduce キーでsubscribeして分散数の結果を受け取ったら終了するようにしつつ、 mapper キーに分散処理したいジョブをlpushします。

# master.rb
require 'redis'

redis = Redis.new
queue = Redis.new

targets = 1..100

t = Thread.new do
  count = 0
  redis.subscribe('reduce') do |on|
    on.message do |channel, message|
      puts message
      count += 1
      redis.unsubscribe if count == targets.size
    end
  end
end

targets.each do |i|
  queue.lpush('mapper', i)
end

t.join

Worker側は先ほどの mapper キーのキューをひたすらポーリングすることでジョブを受け取り、それを処理して reduce キーにpublishして結果を返します。

# worker.rb
require 'redis'

redis = Redis.new

loop do
  job = redis.rpop('mapper')
  if job.nil?
    sleep 5
    next
  end
  dosomething = "mapped#{job}"
  redis.publish('reduce', dosomething)
end

これだけです。

最後に

実際に今回これを使って長らく直列実行されてきたrspecのテストを分散実行して高速化を実現しました。
テストの実行であればAt Least Onceでも許されますし求められる信頼性もそれほど高くありません。
既存のテスト分散処理フレームワークはいくつかありますが構成のシンプルさと汎用性を考えこれの採用に至りました。

求められる信頼性によってはこのくらい雑でも十分なケースもあると思うのでトレードオフを理解してうまく採用していけるといいのではないでしょうか。