RubyとMySQLで作られたジョブキューPerfectQueueを試してみる


TreasureData製の分散キュー PerfectQueueを使ってみます。
redisを利用したSidekiqやAWS SQSを利用したShoryukenのようにRubyで使える
ジョブキューはPerfectQueue以外にも存在していますが、PerfectQueueはキューの管理にMySQLを使っているところが特徴的です。

インストール

PerfectQueueはRubyで作られているので、通常のRubyのプロダクトと同じようにbundlerを用いてインストールを行います。

mkdir perfectqueue
cd ./perfectqueue
bundle init

Gemfileを作成した後に、以下の様にGemファイルを編集します。

Gemfile
# frozen_string_literal: true
source "https://rubygems.org"

gem "perfectqueue"
gem "json"
gem "mysql2"

Gemfileを編集が終わったあとはbundlerでインストールを実行します。

bundle install --path=./vendor/bundle

PerfectQueueはMySQLにキューを入れていくので、PerfectQueueで使用する
MySQLのデータベースを作成します。

mysql> create database perfectqueue;
Query OK, 1 row affected (0.00 sec)

PerfectQueueで使用するDBの設定は、config/perfectqueue.ymlに記述します。

config/perfectqueue.yml
development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues
  processors: 1

DBの設定が完了したら、perfectqueueのinitコマンドでMySQLにqueue用のテーブル
を作成します。

$ bundle exec perfectqueue init

MySQL上でテーブルの確認を行うと、queue用のテーブルは以下のようになっています。

mysql> use perfectqueue;
mysql> desc queues;
+-------------+---------------------+------+-----+---------+-------+
| Field       | Type                | Null | Key | Default | Extra |
+-------------+---------------------+------+-----+---------+-------+
| id          | varchar(255)        | NO   | PRI | NULL    |       |
| timeout     | int(11)             | NO   | MUL | NULL    |       |
| data        | longblob            | NO   |     | NULL    |       |
| created_at  | int(11)             | YES  |     | NULL    |       |
| resource    | varchar(255)        | YES  |     | NULL    |       |
| max_running | int(11)             | YES  |     | NULL    |       |
| owner       | bigint(21) unsigned | NO   |     | 0       |       |
+-------------+---------------------+------+-----+---------+-------+

CLIでのキューの投入

PerfectQueueのインストールが完了したので実際のキューを導入してみます。

$ bundle exec perfectqueue submit k1 my_task '{"uid":1}' -u user_1

perfectqueue submitがキューを投入するためのコマンドになります。
それ以降の引数はid、タスクのタイプ、データの順に記述します。
キューの投入が完了したら、MySQLでqueueテーブルを検索してみます。

mysql> select * from queues;
+----+------------+------------------------------+------------+----------+-------------+-------+
| id | timeout    | data                         | created_at | resource | max_running | owner |
+----+------------+------------------------------+------------+----------+-------------+-------+
| k1 | 1479699578 | {"uid":1,"type":"my_task"} | 1479699578 | user_1   |        NULL |     0 |
+----+------------+------------------------------+------------+----------+-------------+-------+

perfectqueueのコマンドでもキューの中身を確認することが出来ます。
簡単なコマンドなので、ruby以外のプログラムからもシステムコールでキューを投入する
ことは容易だと思います。
その場合は、gem install perfectqueueを行って、bundlerなしで、perfectqueueを実行できる
環境にしておいたほうがいいでしょう。

$ bundle exec perfectqueue list
                           key            type               user             status                   created_at                      timeout   data
                            k1         my_task             user_1            waiting    2016-11-21 12:39:38 +0900    2016-11-21 12:39:38 +0900   {"uid"=>1}
1 entries.

Rubyでのキューの投入

キューの投入をrubyからでも試してみます。

enqueue.rb
require 'bundler'
Bundler.require
require 'yaml'

environment = "development"

configration = YAML.load_file(File.expand_path(File.join(File.dirname(__FILE__),"..","config","perfectqueue.yml")))[environment]
queue = PerfectQueue.open(configration)

queue.submit("k2", "my_task", {uid:'2'},{user:"user_2"})

queue.submitがキューの投入を実行するメソッドになります。
引数の順番はperfectqueueコマンドのときと同じです。
第4引数のオプションにはuserの他に、now,run_at,max_runnning,compression
を使うことが出来ます。
nowとrun_atはqueueテーブルのtimeoutに値が入り、max_runningはmax_runningに値が入ります。
簡単にキューの投入を行えますが、注意するべきこととしては、先ほどのqueueテーブルの
構造でもわかる通りidがユニークになっていることです。そのために同じkeyのキューを
投入しようとするとエラーになってしまいます。

$ bundle exec ruby ./app/enqueue.rb 
disconnects current connection: task key=k2 already exists

そのためにキューを投入するためのidは重複しないように実行するhostの名前で
あったり、導入する時刻、あるいはSecureRandom.hexなどの関数をつかって
idをユニークにする必要があります。

ワーカーの実装

PerfectQueueでは、投入されたキューを処理ためには、キューの取り出しを行う
dispatcherとキューに応じた処理を行うhandlerの2つの仕組みでキューを処理してきます。
まずはdispatcherの記述をおこないます。

app/workers/dispatch.rb
Dir[File.join(File.dirname(__FILE__), 'dispatch', '*.rb')].each {|file| require file }

class Dispatch < PerfectQueue::Application::Dispatch
  # describe routing
  route "type1" => TestHandler
  route /^regexp-.*$/ => :TestHandler
  route "my_task" => MyTaskHandler
end

routeで先ほどキューを投入するときに指定したキューのタイプとそれに処理する
handlerを指定します。
handlerは以下の様に記述します。

app/workers/dispatch/my_task_handler.rb
class MyTaskHandler < PerfectQueue::Application::Base
  # implement run method
  def run
    # do something ...
    puts "acquired task: #{task.inspect}"

    # call task.finish!, task.retry! or task.release!
    task.finish!
  end
end

キューの投入の際に指定したデータはtask.dataメソッドで取り出すことが出来ます。

PerfectQueueの実行

キューの実行は以下のコマンドで実行します。

bundle exec perfectqueue run -I. -rapp/workers/dispatch Dispatch

キューの実行が完了した状態のmysqlのテーブルです。

mysql> select * from queues;
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| id      | timeout    | data                               | created_at | resource | max_running | owner |
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| k1      |  479709237 | {"uid":1,"type":"my_task"}         |       NULL | NULL     |        NULL |     0 |
| k2      |  479709237 | {"uid":"2","type":"my_task"}       |       NULL | NULL     |        NULL |     0 |
+---------+------------+------------------------------------+------------+----------+-------------+-------+

created_atやreourceがNULLに更新され、timeoutも更新されています。
workerプロセスを複数起動したい場合は、perfectqueue.ymlにprocessors
の項目を追加します。

config/perfectqueue.yml
development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues
  processors: 2

この状態でPerfectQueueを起動してプロセスを確認すると以下のように
2つのプロセスが動いていることが確認できます。

$ ps aux | grep perfectqueue
ec2-user  9478  5.8  2.9 297856 30392 pts/2    Sl+  15:58   0:00 ruby2.2 /home/ec2-user/projects/perfectqueue/vendor/bundle/ruby/2.2/bin/perfectqueue run -I. -rapp/workers/dispatch Dispatch
ec2-user  9481  0.0  2.5 298020 26324 pts/2    Sl+  15:58   0:00 perfectqueue-supervisor:Dispatch                                                                                            
ec2-user  9486  0.2  2.9 385320 30548 pts/2    Sl+  15:58   0:00 perfectqueue:Dispatch 1                                                                                                     
ec2-user  9491  0.2  2.9 385188 30532 pts/2    Sl+  15:58   0:00 perfectqueue:Dispatch 2 

まとめ

駆け足ではありましたが、PerfectQueueの使い方を見てきました。
Amazon RDSのようなMySQLのマネージドサービスを使えばMySQLもすぐに準備できるので、
Railsは使っていないけど、Rubyが使える環境で、ジョブキューの仕組みを使いたいといった
状況ではPerfectQueueは最適ではないかと思います。
またキューを投入した状態のMySQLをdumpしておけば、キューを処理した後でもリストアして再度
キューを処理することができるので、handlerの実装と動作確認も簡単に行うことができます。