RabbitMQ学習三

6518 ワード

次の記事に続きます.
http://john88wang.blog.51cto.com/2165294/1670904
Work Queues
python-two.png
前の記事ではsend.pyプログラムはhelloという名前のキューにメッセージを送信し、receive.pyプログラムはhelloという名前のキューにメッセージを受信します.このセクションでは、ワークキューを作成して、比較的時間のかかるタスクを複数のワークに分散します.
Work QueuesワークキューまたはTask Queuesと呼ばれるタスクキューの主な概念は、リソースを消費するタスクをすぐに実行し、完了を待たなければならないことを避けることです.代わりに、私たちはこの任務を後で実行するようにスケジューリングしました.
タスクをメッセージとしてカプセル化し、キューに送信します.ワークプロセスはバックグラウンドで実行され、このキューからタスクが最後に実行されます.複数のworkersを実行すると、タスクはこれらのworkers間で共有されます.
この概念は特に,短いHTTP要求ウィンドウ時間で複雑なタスクを処理する必要がある場合に有用である.
前のsend.pyをnew_に変更task.pyコードを追加
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

コードが良くなると任意のパラメータを渡すことができ、パラメータがなければデフォルトでHello Worldを出力します!
前のreceive.pyをworkerに変更します.py
import time
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

Round-robin dispatching
タスクキューTask Queesを使用するメリットの1つは、paralleise workの並列作業を容易に実現できることです.積み重ねられた作業タスクを処理する必要がある場合は、より多くのworkerを追加して拡張できます.
2つのウィンドウを開いてworkerを実行します.py、ウィンドウを開いてnew_を実行task.py
shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....
shell1$ python worker.py 
 [*] Waiting for messages. To exit press CTRL+C 
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell1$ python worker.py 
 [*] Waiting for messages. To exit press CTRL+C 
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

デフォルトでは、RabbitMQは、各メッセージを次のconsumer消費者に順次送信し、平均して、各consumerが受信したメッセージの数は同じであり、このようなメッセージを分散する方法を輪訓、Round-robinと呼ぶ.
Message acknowledgment
タスクを実行するには時間がかかり、現在のコードRabbitMQを使用して消費者にメッセージを渡した後、すぐにメモリからメッセージを削除します.この場合、workerを殺すと、このworkerが処理しているメッセージが失われます.このworkerに配布されていないすべてのメッセージも失われます
しかし、私たちはタスクを失いたくありません.もしworkerが死んだら、私たちはこのタスクが別のworkerに転送されることを望んでいます.
メッセージが失われないように、RabbitMQはメッセージ確認メッセージをサポートする. 
特定のメッセージが消費者によって受信され処理された場合、消費者はRabbitMQにackを送信し、RabbitMQはこのメッセージを削除する.
消費者が死んだときにackに戻らなければ、RabbitMQは1つのメッセージが完全に処理されていないことを理解し、このメッセージを別の消費者に伝えます.このようにして、workerがたまに死んでも、メッセージが失われないことを確保することができます.
メッセージタイムアウトの設定はありません.RabbitMQはwoker接続が死んだときにメッセージを再送信するだけです.
メッセージ確認がデフォルトでオンになっています.
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)
    
    
channel.basic_consume(callback,
                      queue='hello')

Message durability
consumerが死んでも、タスクが失われないようにする方法が明らかになった.しかしRabbitMQが停止すると、タスクは失われます.
RabbitMQがリリースまたはクラッシュすると、キューとメッセージを格納することを忘れてしまいます.メッセージが失われないようにするには、2つのことが必要です.タグキューとメッセージが永続的なdurableである必要がある.
まず、RabbitMQがキューを失わないことを確認する必要があります.
channel.queue_declare(queue='hello', durable=True)

以前のプログラムでhelloキューが定義されているため、RabbitMQでは同じキューを異なるパラメータで再定義することはできません.
名前を変えて定義し直す
new_task.pyとwokerpyは再定義が必要です
channel.queue_declare(queue='task_queue', durable=True)

次に、タグメッセージを永続化し、delivery_を追加します.mode=2
channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

Fair dispatch
上記の分析により、Round-robinのメッセージ配信方式はまだ私たちのニーズを満たすのに十分ではないことがわかりました.例を挙げると、1つの場合、私たちは2つのworkerを持っています.奇数番号のメッセージは重いタスクで、偶数番号のメッセージは軽いタスクです.そうすると、1つのworkerはずっと忙しくて、もう1つのworkerはほとんど重い任務を処理しません.RabbitMQはこれらを知らず、この2つのworkerに平均的にメッセージを配布し続けた.
これは、RabbitMQがメッセージがキューに入ったときにこのメッセージを配信するだけであるためである.消費者がまだ確認されていないメッセージの数を確認しません.
この問題を解決するにはbasic_を使用します.qosにprefetch_を付けるcount=1の設定で、RabbitMQに1つのworkerに1つのメッセージを一度に追加しないように伝えます.あるいは、別の言い方では、このworkerが処理して確認するまで、新しいメッセージをworkerに配布しないでください.
prefetch-count.png
channel.basic_qos(prefetch_count=1)

Note about the queue sizキューサイズに注意
すべてのworkerが忙しい場合、キューがいっぱいになりやすいので、キューの長さに注目したり、より多くのworkerを追加したり、他の最適化措置を取ったりする必要があります.
new_task.pyフルコード
#!/usr/bin/python

import pika
import sys

connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel=connection.channel()

channel.queue_declare(queue='task_queue',durable=True)

message=' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
                     routing_key='task_queue',
                     body=message,
                     properties=pika.BasicProperties(delivery_mode=2,))
print "[x] Sent %r" % (message,)
connection.close()

worker.pyフルコード
#!/usr/bin/python

import pika
import time

connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel()

channel.queue_declare(queue='task_queue',durable=True)

print '[*] Waiting for messages. To exit press CTRL+C'

def callback(ch,method,properties,body):
    print "[x] Received %r" % (body,)
    time.sleep(body.count('.'))
    print "[x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,queue='task_queue')

channel.start_consuming()

参考資料:
http://previous.rabbitmq.com/v3_3_x/tutorials/tutorial-two-python.html