(二)rabbitmq python踏み込み記録

3924 ワード

(二)RabbitMQ python部分apiの使用
前に書いてあるのは日常的に作っているものが雑で、多くて、あまり書かないと忘れがちです.apiの説明をいくつか記録して、後で使用するのに便利です.必要なハサミたちの参考にもなります(間違いがあるかもしれませんが、間違いがあったらメッセージを残して、レンガを思う存分撮ってください)
direct
a.直結、publisherがメッセージを送信すると、同じexchangeにバインドされたすべてのqueueに送信する.メッセージルーティングはrouting_を介してkeyがルーティングし、queue_とnameには大きな関係要件はありません:consumerのexchangenameとrouting_keyはpublisherと一致しなければならない.そうしないと、メッセージは処理されない.b.メッセージは各consumerに順番に送信される.
durable
durable=Trueを使用してqueueが永続化されていることを宣言すると、MQがクラッシュして再起動してもqueueが存在し、メッセージの損失を防止できます.
delivery_mode
queueが永続化されていることを宣言するほか、messageが永続化されたbasic_であることを宣言する必要があります.publishのpropertiesパラメータはmessageの属性を指定するここpika.BasicPropertiesのdelivery_mode=2はメッセージが永続的であることを示し、これによりRabbitMQクラッシュ再起動後もqueueが存在するメッセージも依然として存在する.ただし、メッセージを永続的にマークすることは、RabbitMQからメッセージを受信してdiskに格納するまで時間がかかるため、メッセージが失われないことを完全に保証することはできません.RabbitMQがクラッシュするとメッセージは失われます.それにRabbitMQはメッセージごとにfsync動作をしません.この場合、publisher confirmsにより、より強力な持続性保証を実現することができる.a.exchange永続化、宣言時にdurable=>1 b.queue永続化、宣言時にdurable=>1 c.メッセージ永続化、配達時にdelivery_を指定mode=>2(1非永続化)
channel.basic_publish(exchange=’test_exchange’, routing_key=’standard_key’, body=’queue:group’, properties=pika.BasicProperties(content_type=’text/plain’, delivery_mode=1))
basic_ack
メッセージの処理が完了すると、応答状態が返されます.黙認してつけて、忘れないで!!!channel.basic_ack(delivery_tag=method.delivery_tag)応答状態に戻らなければ、このconsumer処理が現在のタスクを完了しても、タスクプールからタスクを外すことはなく、ずっと待っています(これは合理的ではありません)、処理が終わったらpublishに知らせません.mqに言って、私はこのタスクを処理し終わったので、もう一つのタスクをください.さもないとmqは、こいつがそんなに忙しくて、私は他の人を探して仕事をして、彼に任務を与えないと思っています.
no_ack
channel.basic_consume(on_message, ‘queuename’,no_ack=False);basic_consumeのパラメータにno_ackの意味はno_ack=Falseは、basic_を呼び出すために手動で設定する必要があります.ackは、メッセージ処理が完了するか否かを決定する.この状態でpublishが送信したタスクはタスクプールに送られ,consumeを追加すると,タスクプールから直接タスクを取り出すbasic_を呼び出す.consume(...no_ack=True...)では、すべてのタスクが現在実行中のconsumeに割り当てられます.mqは現在のconsumeをアイドル状態としているため、consumeはタスクを受け入れた後、すぐにmqに戻って完了しました(perfetch_countと衝突しません).(例:現在3つのconsumeを実行中であり、4番目のconsumeが来るとタスクプールにタスクがないことが検索される.タスクはすべてconsumeによって実行されているため、publishは4回再実行して4番目のconsumeに発見される)
prefetch_count:
channel.basic_qos(prefetch_count=1):これはトラフィック制御です.例えば、私は5つのconsumerを同時に消費しています.publisherがタスクを送信した後、consumerは自動的にタスクプールに行ってタスクを取得し、処理が終わった後、再びタスクを取りに行って、タスクが完全に終わるまで取りに行きます.各consumerには最大1つのタスクが割り当てられます.6番目のconsumerが来た後も、直接任務を取ってやります.prefetch_count=2では、consumerごとに最大2つのタスクが収容されます.たとえば、タスクプールには10個のタスク(タスクが時間がかかると仮定し、処理が完了するまで時間がかかる)があり、現在は3個のconsumer(a,b,c)しか実行されていません.タスク割り当ての場合:a->1,b->2,c->3,a->4,b->5,c->6.(最初は任務を奪うのですが、consumerごとに1つを奪うのですが、ラウンドが終わった後に任務があることを発見して、更に第2ラウンドを奪うのです.この時aの第2ラウンドは任務5、任務6で、その効率によって決まります)任務7、8、9、10は待たなければなりません.この時4番目のconsumerが到着し、7,8を手に入れます.通俗的な例を挙げます:学校は10枚の机を1クラスに買って、1テーブルごとに最大2人の学生をすることができます.学生が10人いれば、先生は机ごとに学生を1人座ることにしました.ちょうど分け終わった.15人の学生がいれば、一部の机は2人で、残りは1人です.この时、また1枚の机が来て、また5人の学生が来て、先生は言って、新しい机は使ってはいけません、古い机はいついっぱい座って、いつまた新しい机を使います.では11枚目のテーブルは21人目の学生が来るまで待つしかありません(テーブル=consumer、学生=publisher、先生=ルール)
passive
channel.exchange_declare(exchange="test_exchange", exchange_type="direct", passive=False, durable=True, auto_delete=False)

例えば、ユーザがキューが存在するか否かのみを問い合おうとする場合、存在しない場合、キューを確立したくない場合はqueueを呼び出すことができる.declareは、パラメータpassiveをtrueに設定queueに渡す必要がある.declareは、キューがすでに存在する場合、trueを返します.存在しない場合はErrorが返されますが、新しいキューは作成されません.
Exclusive
排他キュー.キューが排他キューとして宣言された場合、最初に宣言された接続のみが表示され、接続が切断されたときに自動的に削除されます.ここで注意すべき点は、1つは、排他キューが接続に基づいて表示され、同じ接続の異なるチャネルが同じ接続によって作成された排他キューに同時にアクセスできることである.2つ目は、「初回」で、接続が排他キューを宣言した場合、他の接続は同じ名前の排他キューを確立することができません.これは通常のキューとは異なります.3つ目は、キューが永続化されていても、接続が閉じたり、クライアントが終了したりすると、排他的なキューが自動的に削除されます.このキューは、1つのクライアントのみが読み出しメッセージを送信するアプリケーションシーンに適用される.
auto_delete
自動的に削除されます.キューにサブスクリプションの消費者がいない場合、キューは自動的に削除されます.このキューは一時キューに適用されます