RabbitMQ三四事
データの永続化
非常に堅牢で安定したバックグラウンドシステムでは、物理的なダウンタイム、アプリケーション自体のエラーのクラッシュなど、さまざまなダウンタイムを考慮する必要がありますが、このとき、アプリケーションは再起動してもデータが失われないようにする必要があります.この問題は、データの持続化、つまりデータのディスクへの持続化です.RabbitMQでは、メッセージが永続化された をオンにします.永続化された をオンにします.永続化された に設定.
注意すべき点は、永続化はパフォーマンス損失(書き込みディスク操作)をもたらすことですが、本番環境のデータ整合性を保証するためには、そうしなければなりません.
メッセージを送信するconfirmメカニズム
実は光は以上の3点をやり遂げて、データは依然として失う可能性があります.クライアントがapiを呼び出してメッセージを保存することに成功した後、RabbitMQはまだしばらくの時間(短いですが、無視できません)を必要とします.RabbitMQはすべてのメッセージのためにfsyncの処理をするのではなく、cacheに保存するだけで物理ディスクではなく、その間にRabbitMQ brokerでcrashが発生し、cacheに保存されていますが、まだドロップする時間がないと、これらのメッセージは失われます.以上の問題を解決するために,RabbitMQの生産者確認モードを用いる必要がある.確認モードをオンにするには、生産者がchannelをconfirmモードに設定する必要があり、channelがconfirmモードに入ると、チャネル上で発行されたすべてのメッセージに一意のID(1から)が割り当てられ、メッセージがすべての一致するキューに送信されると、brokerは生産者(メッセージを含む一意のID)に確認を送信し、これにより、生産者は、メッセージが目的のキューに正しく到着したことを認識し、メッセージとキューが持続可能である場合、確認メッセージは、メッセージをディスクに書き込んだ後に発行され、brokerは、生産者に返信された確認メッセージの
confirmモードの最大の利点は、彼が非同期であることであり、メッセージを発行すると、生産者アプリケーションはチャネルが確認を返すのを待っている間に次のメッセージを送信し続けることができ、メッセージが最終的に確認された後、生産者アプリケーションはコールバック方法で確認メッセージを処理することができ、RabbitMQが自身の内部エラーでメッセージが失われた場合、nackメッセージが送信され、プロバイダアプリケーションはコールバックメソッドでこのnackメッセージを処理することができます(1参照).
単純confirmの例
サンプルコードはNodeJSを使用して実装され、RabbitMQサービスは前編RabbitMQ二三事の
説明
消費メッセージのackメカニズム
今私たちは私たちの消費者を考慮する必要があります.消費者もプログラムのエラーや物理的なダウンタイムの問題に直面します.RabbitMQの公式も解決策を提供しています.confirmメカニズムと似ています.ackメカニズムです.ackメカニズムでは、消費者がビジネスロジックを自分で処理した後、ackメッセージを送信する必要があり、brokerはこのメッセージが正しく消費されたと判断し、メモリとディスクから削除し、消費者のacknowledgmentを受信しない限り、brokerはこのメッセージを保存し続ける.ある消費者がクラッシュした(接続が切断された)のにackを送信しなかった場合、brokerはこのメッセージが完全に処理されていないと理解し、別の消費者に再処理される.このようなメカニズムの下で、消費者が崩壊してもメッセージを失うことはありません.
単純ack例
に注意
自動ackはデフォルトで開かれています.つまり、メッセージが消費者に送信されると自動的にackされますが、多くの場合、手動ackが必要になるため、
ackにはタイムアウトの制限はありません.
いくつかの問題
Several years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.
したがって、心拍数は低すぎる(短いネットワーク混雑やストリーム制御のため)、低すぎると誤報を招きやすく、経験5 s-20 sによると合理的である.
参考記事: RabbitMQ(四):channelのconfirmモードを深く学ぶ when-publishes-are-confirmed Channel-oriented API reference
非常に堅牢で安定したバックグラウンドシステムでは、物理的なダウンタイム、アプリケーション自体のエラーのクラッシュなど、さまざまなダウンタイムを考慮する必要がありますが、このとき、アプリケーションは再起動してもデータが失われないようにする必要があります.この問題は、データの持続化、つまりデータのディスクへの持続化です.RabbitMQでは、メッセージが
broker
に送信されることを保証するには、まず3点を達成する必要があります.exchange
(スイッチ):宣言時にdurable
オプションqueue
(キュー):宣言時にdurable
オプションmessage
:delivery_mode
は2(php,pythonのようなライブラリ、2はより友好的な定数に置き換えることができる)に設定、nodeのamqp.node
ライブラリにはpersistent
がtrue
注意すべき点は、永続化はパフォーマンス損失(書き込みディスク操作)をもたらすことですが、本番環境のデータ整合性を保証するためには、そうしなければなりません.
メッセージを送信するconfirmメカニズム
実は光は以上の3点をやり遂げて、データは依然として失う可能性があります.クライアントがapiを呼び出してメッセージを保存することに成功した後、RabbitMQはまだしばらくの時間(短いですが、無視できません)を必要とします.RabbitMQはすべてのメッセージのためにfsyncの処理をするのではなく、cacheに保存するだけで物理ディスクではなく、その間にRabbitMQ brokerでcrashが発生し、cacheに保存されていますが、まだドロップする時間がないと、これらのメッセージは失われます.以上の問題を解決するために,RabbitMQの生産者確認モードを用いる必要がある.確認モードをオンにするには、生産者がchannelをconfirmモードに設定する必要があり、channelがconfirmモードに入ると、チャネル上で発行されたすべてのメッセージに一意のID(1から)が割り当てられ、メッセージがすべての一致するキューに送信されると、brokerは生産者(メッセージを含む一意のID)に確認を送信し、これにより、生産者は、メッセージが目的のキューに正しく到着したことを認識し、メッセージとキューが持続可能である場合、確認メッセージは、メッセージをディスクに書き込んだ後に発行され、brokerは、生産者に返信された確認メッセージの
delivery-tag
ドメインに確認メッセージのシーケンス番号が含まれている.confirmモードの最大の利点は、彼が非同期であることであり、メッセージを発行すると、生産者アプリケーションはチャネルが確認を返すのを待っている間に次のメッセージを送信し続けることができ、メッセージが最終的に確認された後、生産者アプリケーションはコールバック方法で確認メッセージを処理することができ、RabbitMQが自身の内部エラーでメッセージが失われた場合、nackメッセージが送信され、プロバイダアプリケーションはコールバックメソッドでこのnackメッセージを処理することができます(1参照).
単純confirmの例
サンプルコードはNodeJSを使用して実装され、RabbitMQサービスは前編RabbitMQ二三事の
docker-compose.yml
を使用して迅速に起動することができる.const QUEUE_NAME = 'test_queue'
const config = require("./config")
const amqp = require('amqplib')
async function getMQConnection() {
return await amqp.connect({
protocol: 'amqp',
hostname: config.MQ.host,
port: config.MQ.port,
username: config.MQ.user,
password: config.MQ.pass,
locale: 'en_US',
frameMax: 0,
heartbeat: 5, //
vhost: config.MQ.vhost,
})
}
async function run(rmqConn, msgArr) {
try {
const channel = await rmqConn.createConfirmChannel() // confirm
const exchangeName = `${QUEUE_NAME}_exchange`
await channel.assertExchange(exchangeName, 'direct', { durable: true, autoDelete: false }) // exchange exchange
await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) // queue
await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) //
// queue name routing key
msgArr.forEach(str => {
channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true })
})
await channel.waitForConfirms()
console.log(' ')
await channel.close()
} catch(err) {
// do something with err
console.log(' :' + err.message)
}
}
async function testSendBatchMsg() {
const conn = await getMQConnection()
await run(conn, [
'cat',
'dog',
'pig',
'mouse',
'mouse',
'penguin'
])
await conn.close()
}
testSendBatchMsg()
説明
assertExchange
およびassertQueue
は、スイッチおよびキューが必ず存在することを保証するものであり、ここでのexchangeは、単純なdirect
スイッチConfirmChannel#publish
メソッドがpromise
を返さない消費メッセージのackメカニズム
今私たちは私たちの消費者を考慮する必要があります.消費者もプログラムのエラーや物理的なダウンタイムの問題に直面します.RabbitMQの公式も解決策を提供しています.confirmメカニズムと似ています.ackメカニズムです.ackメカニズムでは、消費者がビジネスロジックを自分で処理した後、ackメッセージを送信する必要があり、brokerはこのメッセージが正しく消費されたと判断し、メモリとディスクから削除し、消費者のacknowledgmentを受信しない限り、brokerはこのメッセージを保存し続ける.ある消費者がクラッシュした(接続が切断された)のにackを送信しなかった場合、brokerはこのメッセージが完全に処理されていないと理解し、別の消費者に再処理される.このようなメカニズムの下で、消費者が崩壊してもメッセージを失うことはありません.
単純ack例
const QUEUE_NAME = 'test_queue'
const config = require("./config")
const amqp = require('amqplib')
async function getMQConnection() {
return await amqp.connect({
protocol: 'amqp',
hostname: config.MQ.host,
port: config.MQ.port,
username: config.MQ.user,
password: config.MQ.pass,
locale: 'en_US',
frameMax: 0,
heartbeat: 5, //
vhost: config.MQ.vhost,
})
}
async function sleep(ms) {
return new Promise(resolve =>
setTimeout(resolve, ms))
}
async function start() {
const mqConn = await getMQConnection()
console.log('connecting RabbitMQ successfully!')
const channel = await mqConn.createChannel()
const exchangeName = `${QUEUE_NAME}_exchange`
await channel.assertExchange(exchangeName, 'direct', { durable: true, autoDelete: false })
await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false})
await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME)
channel.consume(QUEUE_NAME, async function(msg) {
console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString())
console.log('consuming message...')
try {
await sleep(500) //
console.log('consuming ends')
channel.ack(msg) // , ack
} catch(e) {
console.log('consuming failed: ' + e.message)
channel.nack(msg) // , nack
}
}, {noAck: false}) // ack
}
start()
に注意
自動ackはデフォルトで開かれています.つまり、メッセージが消費者に送信されると自動的にackされますが、多くの場合、手動ackが必要になるため、
autoAsk=false
を明示的に設定してこのメカニズムを閉じる必要があります(例ではnoAck: false
).ackにはタイムアウトの制限はありません.
broker
は、消費者が切断されたときにのみ再配達される.メッセージを処理するのに長い時間がかかります.いくつかの問題
amqp.node
このライブラリは心拍検出機能(heartbeat
オプション)を提供していますが、自動再接続は行われていません.heartbeat
の値に対して、RabbitMQ公式サイトは説明ありますSeveral years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.
したがって、心拍数は低すぎる(短いネットワーク混雑やストリーム制御のため)、低すぎると誤報を招きやすく、経験5 s-20 sによると合理的である.
参考記事: