pythonでrabbimqを扱う


はじめに

RabbitMqはソフトウェア間でメッセージのやり取りやメッセージのキューイングなど(AMQP)ができるものです。使用方法については多くのサイトで書かれていますが実際にメッセージの管理については書かれているところが少ないため、RabbitMqやpythonのpikaの用意からメッセージの確認までをまとめました。

環境

  • python:3.6.5
  • イメージ:rabbitmq:3-management

RabbitMqを使うのに必要なもの

  1. RabbitMqサーバ:メッセージを受けたり送ったりするもの(メッセージをためる場所)
  2. producer:メッセージを送るもの(クライアント)
  3. consumer:メッセージを受け取るもの(ホスト)

1. RabbitMqの用意

dockerのRabbitMqイメージを取得

RabbitMqのイメージは標準のものと管理プラグインが有効なものの2つがあります。今回はキューの状況を見たいため管理プラグインのものをpullします。rabbitmqという名前でコンテナが提供されているのでイメージをpullします。

docker pull rabbitmq:3-management

普通のrabbitmqの場合は、rabbitmq:3-managementではなくrabbitmqをpullしてください。

RabbitMqの起動

RabbitMqを起動します。今回はバックグラウンドで動き続けてほしいため-dオプションをつけています。また、ポートは管理画面を見たいため、コンテナ内の15672をホストの8080に、実際にキューのやり取りをするポートをコンテナ内の5672とホストの5672に紐づけています。さらにキューを特定するために--hostnameを分かる名前にしています。

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 8080:15672 rabbitmq:3-management

普通のrabbitmqの場合は、rabbitmq:3-managementではなくrabbitmqをrunして、-p 8080:15672も不要です。

RabbitMqの起動確認

今回は、管理プラグインが有効なコンテナを起動したのでlocalhost:8080にブラウザからアクセスして中身を見てみます。アクセスするとログイン画面になるため、デフォルトのguestで入ります。現在の状態がみれます。Nodesのところに先ほど--hostnameで指定した名前が入っています。

2. producer:メッセージを送るもの(クライアント)の用意

producerの作成にはpythonでキューのやり取りを行うため、pikaというライブラリを使用してrabbitmqにアクセスします。

pikaのインストール

pikaのインストールは普通にpipでインストールします。

pip install pika

producerの実装

メッセージの送信には次の手順が必要です。これらの手順はすべてrabbitmqに対して行われるため、consumerは必要ありません。

  • コネクション作成
  • チャンネル作成・取得
  • キュー作成・取得
  • メッセージの送信

コネクション作成

pythonからRabbitMqに向けて接続を行います。作成するパラメータにホスト名(IPアドレス)やポート番号、タイムアウトなどの設定を与えます。RabbitMqコンテナはホストの5672ポートに紐づいているため、パラメータにlocalhostを与えて、ポートはデフォルトの5672のままなので指定はしていません。その後、パラメータを与えてコネクションを生成して接続完了になります。

client_main.py

import pika

pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)

チャンネル作成・取得

接続が完了したら次はチャンネルの作成を行います。チャンネルとはRabbitMqへの道のようなイメージです。チャンネルが同じproducerとconsumerがメッセージのやり取りの対象になります。

client_main.py

import pika

pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

最後の1行だけが追加されています。必要であればこの引数にチャンネルの識別番号を入力します。

キュー作成・取得

チャンネルを作成できたらキューの作成を行います。このキューとはメッセージをためる場所のようなイメージになります。そのため、キューの名前が同じでなければメッセージのやり取りは行えません。チャンネルとは違い必ず指定が必要になります。

client_main.py
import pika

pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

最後の1行だけが追加されています。必要であればこの引数にキューの設定を入力します。

メッセージの送信

事前の用意ができたため、メッセージの送信を行います。basic_publish()routing_keyにキューの名前を指定してbodyに送信したいメッセージを指定します。

client_main.py
import pika

pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

connection.close()

最後に送信が完了したらコネクションを閉じます。

実際に送信してみる

RabbitMqへの送信用ソースができたため、実行してみます。コマンドはすぐに帰ってきます。

PS C:\Users\xxxx\program\python\pika> python .\client_main.py

RabbitMqの管理画面でも確認してみます。Queuesタブを見てみるとMessageがReady:1になっているのがわかります。

3. consumer:メッセージを受け取るもの(ホスト)の用意

consumerも、pythonでキューのやり取りを行うため、pikaというライブラリを使用してRabbitMqにアクセスします。

consumerの実装

メッセージの送信には次の手順が必要です。これらの手順はすべてRabbitMqに対して行われるため、起動にproducerは必要ありません。コネクション作成からキュー作成まではproducerと同じです。

  • コネクション作成
  • チャンネル作成・取得
  • キュー作成・取得
  • コールバック(受信時処理)の作成
  • queueメッセージ受付開始

コールバック(受信時処理)の作成

メッセージを受信したときに処理させたい関数を記載します。関数の最後に受信したからキューからメッセージを削除するために応答関数basic_ack()を指定します。今回は例として受信したメッセージを表示する関数callbackを記載します。

host_main.py
import pika

pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')
![queue_receive.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/481713/d78bd155-4b9a-9093-998d-c85ff4af2cc7.png)
![queue_receive.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/481713/f6fbd5ef-322d-77d0-8e85-68626ac91783.png)

def callback(ch, method, properties, body):
    print("{} Received".format(body))
    ch.basic_ack(delivery_tag = method.delivery_tag)

queueメッセージ受付開始

作成したチャンネルのbasic_consume()にキュー名とコールバック関数を指定します。その後にstart_consuming()関数でメッセージの受信を開始します。この関数を開始すると関数内で延々とメッセージ待ちをするため終了するときは強制終了かコールバック関数に終了の契機を入れておく必要があります。

host_main.py
import pika

pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("{} Received".format(body))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue='hello', on_message_callback=callback)

channel.start_consuming()

実際に受信してみる

RabbitMqからの受信用ソースができたため、実行してみます。

PS C:\Users\xxxx\program\python\pika> python .\host_main.py
b'Hello World!' Received

producerで送信したメッセージが受信できて標準出力されていることが確認できます。

RabbitMqの管理画面でも確認してみます。Queuesタブを見てみるとMessageがReady:0になっているのがわかります。

おわりに

RabbitMqを使用してpythonでメッセージのやり取りを行う方法をまとめました。とはいえほとんど公式の内容と同じになってしまいました。これを使用することで容易に非同期な処理やキューに関する処理を実施することができそうです。