RabbitMQの一般的な方法pythonの紹介
15042 ワード
紹介:
RabbitMQはメッセージエージェントであり、メッセージシステムのメディアである.それはあなたのアプリケーションに共通のメッセージ送信と受信プラットフォームを提供し、メッセージの伝送中の安全を保証することができます.RabbitMQの長所、特徴を詳しく紹介するには、中国語ドキュメントポイントのChinese Documentを参照してください.
スイッチとスイッチのタイプ
Name(スイッチタイプ)Default pre-declared names(事前に宣言されたデフォルト名)Direct exchange(ダイレクトスイッチ)(Empty string)and amq.义齿テーマスイッチamq.topic Headers exchange(ヘッドスイッチ)amq.match (and amq.headers in RabbitMQ)
CentoOS RabbitMQをインストールするサーバサービス
MacOS
pythonクライアントのインストール
1.Hello world acknowledgment no-ack=False、消費者が状況に遭遇した場合(its channel is closed,connection is closed,or TCP connection is lost)、RabbitMQは、タスクをキューに追加し直します.チャンネルを設定するだけで
これで端末で私たちのプログラムを実行できます.まずsend.pyメッセージを再送信します.
生産者(producer)プログラムsend.pyは実行するたびに停止します.メッセージを受信します.
成功した!私たちはRabbitMQを通じて最初のメッセージを送信しました.気づいたかもしれないpyプログラムは終了していません.メッセージを取得する準備ができており、Ctrl-Cで中止することができます.
2.メッセージ永続化durable RabbitMQをわざわざ伝えなかった場合、終了またはクラッシュしたときに、すべてのキューとメッセージが失われます.情報が失われないようにするには、キューとメッセージを永続化する必要があります.まず、キューが消えないように、キューを永続化(durable)と宣言する必要があります.
この行のコード自体は正しいが、正しく動作しない.helloという非永続化キューを定義したからです.RabbitMqでは、異なるパラメータを使用してキューを再定義することはできません.エラーが返されます.しかし、taskのような異なる名前で迅速な解決策を使用しています.queue. このqueue_declareは、生産者(producer)と消費者(consumer)に対応するコードで変更する必要があります.この場合、RabbitMqが再起動した後にqueue_declareキューは失われません.また、私たちのメッセージも永続化する必要があります.delivery_modeのプロパティを2に設定します.
3.サブスクリプションのパブリケーションサブスクリプションと簡単なメッセージキューの違いは、パブリケーションサブスクリプションがすべてのサブスクリプションにメッセージを送信し、メッセージキュー内のデータが消費されると一度に消失することです.したがって、RabbitMQがパブリケーションとサブスクリプションを実装すると、各サブスクリプションにキューが作成され、パブリケーションがメッセージをパブリッシュすると、すべての関連キューにメッセージが配置されます.exchange type = fanout
4.キーワード送信exchange type=direct
以前の例では、メッセージを送信する際にキューを明示的に指定してメッセージを送信していたが、RabbitMQはキーワードに基づいて送信することをサポートしていた.すなわち、キューはキーワードをバインドし、送信者はキーワードに基づいてメッセージexchangeにデータを送信し、exchangeはキーワードに基づいてデータを指定キューに送信すべきだと判定した.
6.ファジイマッチングexchange type=topic
topicタイプでは、キューにいくつかのあいまいなキーワードをバインドし、送信者がexchangeにデータを送信し、exchangeが「ルーティング値」と「キーワード」を入力してマッチングし、マッチングに成功すると、指定したキューにデータを送信します.
RabbitMQはメッセージエージェントであり、メッセージシステムのメディアである.それはあなたのアプリケーションに共通のメッセージ送信と受信プラットフォームを提供し、メッセージの伝送中の安全を保証することができます.RabbitMQの長所、特徴を詳しく紹介するには、中国語ドキュメントポイントのChinese Documentを参照してください.
スイッチとスイッチのタイプ
Name(スイッチタイプ)Default pre-declared names(事前に宣言されたデフォルト名)Direct exchange(ダイレクトスイッチ)(Empty string)and amq.义齿テーマスイッチamq.topic Headers exchange(ヘッドスイッチ)amq.match (and amq.headers in RabbitMQ)
CentoOS RabbitMQをインストールするサーバサービス
epel
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
erlang
$ yum -y install erlang
RabbitMQ
$ yum -y install rabbitmq-server
MacOS
brew install rabbitmq
pythonクライアントのインストール
pip install pika
1.Hello world
#!/usr/bin/env python
import pika
# ######################### #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
''' , , 。 ,RabbitMQ 。 hello , 。'''
channel.queue_declare(queue='hello')
''' RabbitMQ , , (exchange) , 。 ,routing_key '''
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
'''
, 、 RabbitMQ。 ( )
'''
connection.close()
#!/usr/bin/env python
import pika
# ########################## ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
channel.basic_consume(callback, queue='hello', no_ack=False)
これで端末で私たちのプログラムを実行できます.まずsend.pyメッセージを再送信します.
$ python send.py
[x] Sent 'Hello World!'
生産者(producer)プログラムsend.pyは実行するたびに停止します.メッセージを受信します.
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
成功した!私たちはRabbitMQを通じて最初のメッセージを送信しました.気づいたかもしれないpyプログラムは終了していません.メッセージを取得する準備ができており、Ctrl-Cで中止することができます.
2.メッセージ永続化durable RabbitMQをわざわざ伝えなかった場合、終了またはクラッシュしたときに、すべてのキューとメッセージが失われます.情報が失われないようにするには、キューとメッセージを永続化する必要があります.まず、キューが消えないように、キューを永続化(durable)と宣言する必要があります.
channel.queue_declare(queue='hello', durable=True)
この行のコード自体は正しいが、正しく動作しない.helloという非永続化キューを定義したからです.RabbitMqでは、異なるパラメータを使用してキューを再定義することはできません.エラーが返されます.しかし、taskのような異なる名前で迅速な解決策を使用しています.queue. このqueue_declareは、生産者(producer)と消費者(consumer)に対応するコードで変更する必要があります.この場合、RabbitMqが再起動した後にqueue_declareキューは失われません.また、私たちのメッセージも永続化する必要があります.delivery_modeのプロパティを2に設定します.
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
3.サブスクリプションのパブリケーションサブスクリプションと簡単なメッセージキューの違いは、パブリケーションサブスクリプションがすべてのサブスクリプションにメッセージを送信し、メッセージキュー内のデータが消費されると一度に消失することです.したがって、RabbitMQがパブリケーションとサブスクリプションを実装すると、各サブスクリプションにキューが作成され、パブリケーションがメッセージをパブリッシュすると、すべての関連キューにメッセージが配置されます.exchange type = fanout
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
4.キーワード送信exchange type=direct
以前の例では、メッセージを送信する際にキューを明示的に指定してメッセージを送信していたが、RabbitMQはキーワードに基づいて送信することをサポートしていた.すなわち、キューはキーワードをバインドし、送信者はキーワードに基づいてメッセージexchangeにデータを送信し、exchangeはキーワードに基づいてデータを指定キューに送信すべきだと判定した.
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
6.ファジイマッチングexchange type=topic
topicタイプでは、キューにいくつかのあいまいなキーワードをバインドし、送信者がexchangeにデータを送信し、exchangeが「ルーティング値」と「キーワード」を入力してマッチングし、マッチングに成功すると、指定したキューにデータを送信します.
# 0
*
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()