amqplibの使用に対する心得
7064 ワード
最近はnodejsでamqplib-rabitmqのnodejsクライアントを使用しています.expressに封入しました.まずコードをください.
enqueueはメッセージがキューに入ると接続がすぐに閉じられます.リソースを占用しないようにします.
転載先:https://www.cnblogs.com/ryansecreat/p/6030246.html
1 var amqp = require('amqplib/callback_api');
2 var config=require('../config/config');
3 var log=require('../util/loghelp');
4 function fail(err, conn) {
5 log.error(err);
6 if (conn) conn.close();
7 }
8 exports.StartConsumer=function (action,qname) {
9 function on_connect(err, conn) {
10 if (err !== null) return fail(err);
11 function on_channel_open(err, ch) {
12 ch.assertQueue(qname, {durable: true}, function(err, ok) {
13 if (err !== null) return bail(err, conn);
14 ch.consume(qname, function(msg) {
15
16 log.info(`Received ${msg.content.toString()},start process`);
17 action(JSON.parse(msg.content))
18 .then(d=> {
19 log.info("mq , ");ch.ack(msg)
20 }
21 )
22 .catch(err=>
23 ch.nack(msg));
24 }, {noAck: false} );
25 });
26 }
27 conn.createChannel(on_channel_open);
28 }
29 amqp.connect(config.amqp.url,on_connect);
30 };
31
32 exports.enqueue=function (data,qname) {
33 function on_connect(err, conn) {
34 if (err !== null) return bail(err);
35
36 function on_channel_open(err, ch) {
37 if (err !== null) return bail(err, conn);
38 ch.assertQueue(qname, {durable: true}, function(err, ok) {
39 if (err !== null) return bail(err, conn);
40 var msg=JSON.stringify(data);
41 ch.sendToQueue(qname, new Buffer(msg));
42 log.info(`mq send ${msg}`);
43 ch.close(function() { conn.close(); });
44 });
45 }
46 conn.createChannel(on_channel_open);
47 }
48 amqp.connect(config.amqp.url,on_connect);
49 };
このうちStartConsmerはプロジェクト起動時に起動し、ライフサイクル全体でモニター状態を維持し、プログラム終了時にmqのリンクが閉じられます.注意したいのは noAckというパラメータは、falseがメッセージがチームから離れると自動的に削除されません.trueに設定されているなら、メッセージ処理が成功するかどうかは関係なく、このメッセージは削除されます.メッセージが成功していないことに気づいたのは、ch.nack(msg)を呼び出して、この方法はメッセージを再入力することです.enqueueはメッセージがキューに入ると接続がすぐに閉じられます.リソースを占用しないようにします.
転載先:https://www.cnblogs.com/ryansecreat/p/6030246.html