PythonはRabbitMQサーバを操作してメッセージキューのルーティング機能を実現する

5590 ワード

PythonはPikaライブラリ(インストール:sudo pip install pika)を使用してRabbitMQメッセージキューサーバ(インストール:sudo apt-get install rabbitmq-server)を操作できます.ここではMQ関連のルーティング機能を見てみましょう.
ルーティングキーの実装
例えば、すべての受信側にメッセージを送信する必要があるシーンがありますが、自由にカスタマイズする必要がある場合は、一部の受信側にメッセージを送信したり、他の受信側にメッセージを送信したりしますが、どうすればいいですか?この場合はルーティングキーが使われます.
ルーティングキーの動作原理:各受信側のメッセージキューは、スイッチをバインドするときに、対応するルーティングキーを設定することができます.送信側がスイッチを介して情報を送信する場合、ルーティングキーを指定することができ、交換機会はルーティングキーに基づいてメッセージを対応するメッセージキューに送信し、受信側がメッセージを受信することができる.
こちらは前回に続きsend.pyとreceive.pyはルーティングキーを実現する機能をシミュレートする.send.pyは送信側、receive.pyは受信側を表します.インスタンスの機能はinfo,warning,errorの3つのレベルの情報を異なる受信側に送信することである.
send.pyコード解析

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#     ,     direct
channel.exchange_declare(exchange='messages', type='direct')
 
#       
routings = ['info', 'warning', 'error']
 
#           ,      
for routing in routings:
  message = '%s message.' % routing
  channel.basic_publish(exchange='messages',
             routing_key=routing,
             body=message)
  print message
 
connection.close()


receive.pyコード解析

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#     ,     direct
channel.exchange_declare(exchange='messages', type='direct')
 
#           ,    ,    info
routings = sys.argv[1:]
if not routings:
  routings = ['info']
 
#      ,        ,     
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange='messages',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

2つの端末を開き、1つの実行コードpython receive.py info warningは、infoとwarningのメッセージのみを受信することを示す.別の端末はsendを実行する.py,受信端末はinfoとwarningのメッセージのみを受信していることが観察される.複数の端末を開くとreceiveを実行する.pyは、異なるルーティングキーパラメータを入力し、より顕著な効果を見ることができます.
受信側が実行中の場合、rabbitmqctl list_を使用できます.bindingsでバインド状況を表示します.
ルーティングキーファジイマッチングルーティングキーファジイマッチングは、正規表現を使用することができ、一般的な正規表現とは異なり、ここでは「#」はすべて、すべての意味を表す.「*」は1つの単語にのみ一致します.例を見て分かるようになりました.
ここでは上の例に続いてsendを使います.pyとreceive.pyは、ルーティングキーのファジイマッチング機能を実現します.send.pyは送信側、receive.pyは受信側を表します.例の機能は大体このようにします:例えばあなたは心の良い友达がいて、楽しくて、悲しくて、仕事の上でやはり生活の上の事はすべて彼女に言うことができます;楽しいことを分かち合う友达もいます.いくつかの友达がいて、あなたは不機嫌なことを彼女に言うことができます.
send.pyコード解析
ルーティングキーのブラーマッチングを行うため、スイッチのタイプをtopicに設定し、topicに設定すると、#,*のマッチング記号が使用できます.

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#     ,     topic
channel.exchange_declare(exchange='messages', type='topic')
 
#     
routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life']
 
#           ,      
for routing in routings:
  message = '%s message.' % routing
  channel.basic_publish(exchange='messages',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

前例では4種類のメッセージを定義しており,分かりやすく説明せずに順次送信する.
receive.pyコード解析
同様に、スイッチのタイプはtopicに設定すればよい.コマンドラインからパラメータを受信する機能を少し調整しましたが、パラメータタイムズエラーがなく終了しました.

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#     ,     topic
channel.exchange_declare(exchange='messages', type='topic')
 
#          ,    ,     
routings = sys.argv[1:]
if not routings:
  print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[0],)
  exit()
 
#      ,        ,     
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange='messages',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

4つの端末を開き、1つは以下のように動作し、何事も彼女に話すことができることを示します.

python receive.py "#"

もう一つの端末は、彼女と楽しいことを共有できることを示しています.

python receive.py "happy.*"

3つ目は、仕事上のことを彼女と共有できることを示しています.

python receive.py "*.work"

最後にpython sendを実行します.py.結果は想像に難くなく、貼らなくなりました.