RabbitMQ三四事


データの永続化
非常に堅牢で安定したバックグラウンドシステムでは、物理的なダウンタイム、アプリケーション自体のエラーのクラッシュなど、さまざまなダウンタイムを考慮する必要がありますが、このとき、アプリケーションは再起動してもデータが失われないようにする必要があります.この問題は、データの持続化、つまりデータのディスクへの持続化です.RabbitMQでは、メッセージがbrokerに送信されることを保証するには、まず3点を達成する必要があります.
  • 永続化されたexchange(スイッチ):宣言時にdurableオプション
  • をオンにします.
  • 永続化されたqueue(キュー):宣言時にdurableオプション
  • をオンにします.
  • 永続化されたmessage:delivery_modeは2(php,pythonのようなライブラリ、2はより友好的な定数に置き換えることができる)に設定、nodeのamqp.nodeライブラリにはpersistenttrue
  • に設定.
    注意すべき点は、永続化はパフォーマンス損失(書き込みディスク操作)をもたらすことですが、本番環境のデータ整合性を保証するためには、そうしなければなりません.
    メッセージを送信するconfirmメカニズム
    実は光は以上の3点をやり遂げて、データは依然として失う可能性があります.クライアントがapiを呼び出してメッセージを保存することに成功した後、RabbitMQはまだしばらくの時間(短いですが、無視できません)を必要とします.RabbitMQはすべてのメッセージのためにfsyncの処理をするのではなく、cacheに保存するだけで物理ディスクではなく、その間にRabbitMQ brokerでcrashが発生し、cacheに保存されていますが、まだドロップする時間がないと、これらのメッセージは失われます.以上の問題を解決するために,RabbitMQの生産者確認モードを用いる必要がある.確認モードをオンにするには、生産者がchannelをconfirmモードに設定する必要があり、channelがconfirmモードに入ると、チャネル上で発行されたすべてのメッセージに一意のID(1から)が割り当てられ、メッセージがすべての一致するキューに送信されると、brokerは生産者(メッセージを含む一意のID)に確認を送信し、これにより、生産者は、メッセージが目的のキューに正しく到着したことを認識し、メッセージとキューが持続可能である場合、確認メッセージは、メッセージをディスクに書き込んだ後に発行され、brokerは、生産者に返信された確認メッセージのdelivery-tagドメインに確認メッセージのシーケンス番号が含まれている.
    confirmモードの最大の利点は、彼が非同期であることであり、メッセージを発行すると、生産者アプリケーションはチャネルが確認を返すのを待っている間に次のメッセージを送信し続けることができ、メッセージが最終的に確認された後、生産者アプリケーションはコールバック方法で確認メッセージを処理することができ、RabbitMQが自身の内部エラーでメッセージが失われた場合、nackメッセージが送信され、プロバイダアプリケーションはコールバックメソッドでこのnackメッセージを処理することができます(1参照).
    単純confirmの例
    サンプルコードはNodeJSを使用して実装され、RabbitMQサービスは前編RabbitMQ二三事docker-compose.ymlを使用して迅速に起動することができる.
    const QUEUE_NAME = 'test_queue'
    const config = require("./config")
    const amqp = require('amqplib')
    
    async function getMQConnection() {
        return await amqp.connect({
            protocol: 'amqp',
            hostname: config.MQ.host,
            port: config.MQ.port,
            username: config.MQ.user,
            password: config.MQ.pass,
            locale: 'en_US',
            frameMax: 0,
            heartbeat: 5, //   
            vhost: config.MQ.vhost,
        })
    }
    
    async function run(rmqConn, msgArr) {
        try {
            const channel = await rmqConn.createConfirmChannel() //   confirm
            const exchangeName = `${QUEUE_NAME}_exchange`
            await channel.assertExchange(exchangeName, 'direct', { durable: true, autoDelete: false }) //    exchange   exchange
            await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) //    queue   
            await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) //      
    
            // queue name routing key
            msgArr.forEach(str => {
                channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true })
            })
            await channel.waitForConfirms()
            console.log('        ')
            await channel.close()
        } catch(err) {
            // do something with err
            console.log('        :' + err.message)
        }
    }
    
    async function testSendBatchMsg() {
        const conn = await getMQConnection()
        await run(conn, [
            'cat',
            'dog',
            'pig',
            'mouse',
            'mouse',
            'penguin'
        ])
        await conn.close()
    }
    testSendBatchMsg()

    説明assertExchangeおよびassertQueueは、スイッチおよびキューが必ず存在することを保証するものであり、ここでのexchangeは、単純なdirectスイッチConfirmChannel#publishメソッドがpromiseを返さない
    消費メッセージのackメカニズム
    今私たちは私たちの消費者を考慮する必要があります.消費者もプログラムのエラーや物理的なダウンタイムの問題に直面します.RabbitMQの公式も解決策を提供しています.confirmメカニズムと似ています.ackメカニズムです.ackメカニズムでは、消費者がビジネスロジックを自分で処理した後、ackメッセージを送信する必要があり、brokerはこのメッセージが正しく消費されたと判断し、メモリとディスクから削除し、消費者のacknowledgmentを受信しない限り、brokerはこのメッセージを保存し続ける.ある消費者がクラッシュした(接続が切断された)のにackを送信しなかった場合、brokerはこのメッセージが完全に処理されていないと理解し、別の消費者に再処理される.このようなメカニズムの下で、消費者が崩壊してもメッセージを失うことはありません.
    単純ack例
    const QUEUE_NAME = 'test_queue'
    const config = require("./config")
    const amqp = require('amqplib')
    
    async function getMQConnection() {
        return await amqp.connect({
            protocol: 'amqp',
            hostname: config.MQ.host,
            port: config.MQ.port,
            username: config.MQ.user,
            password: config.MQ.pass,
            locale: 'en_US',
            frameMax: 0,
            heartbeat: 5, //   
            vhost: config.MQ.vhost,
        })
    }
    
    async function sleep(ms) {
        return new Promise(resolve => 
            setTimeout(resolve, ms))
    }
    
    async function start() {
        const mqConn = await getMQConnection()
        console.log('connecting RabbitMQ successfully!')
        const channel = await mqConn.createChannel()
        const exchangeName = `${QUEUE_NAME}_exchange`
        await channel.assertExchange(exchangeName, 'direct', { durable: true, autoDelete: false })
        await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false})
        await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME)
    
        channel.consume(QUEUE_NAME, async function(msg) {
            console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString())
            console.log('consuming message...')
            try {
                await sleep(500) //       
                console.log('consuming ends')
                channel.ack(msg) //     ,  ack
            } catch(e) {
                console.log('consuming failed: ' + e.message)
                channel.nack(msg) //     ,  nack
            }
        }, {noAck: false}) // ack
    }
    
    start()

    に注意
    自動ackはデフォルトで開かれています.つまり、メッセージが消費者に送信されると自動的にackされますが、多くの場合、手動ackが必要になるため、autoAsk=falseを明示的に設定してこのメカニズムを閉じる必要があります(例ではnoAck: false).
    ackにはタイムアウトの制限はありません.brokerは、消費者が切断されたときにのみ再配達される.メッセージを処理するのに長い時間がかかります.
    いくつかの問題amqp.nodeこのライブラリは心拍検出機能(heartbeatオプション)を提供していますが、自動再接続は行われていません.heartbeatの値に対して、RabbitMQ公式サイトは説明あります
    Several years worth of feedback from the users and client library
    maintainers suggest that values lower than 5 seconds are fairly likely
    to cause false positives, and values of 1 second or lower are very
    likely to do so. Values within the 5 to 20 seconds range are optimal
    for most environments.
    したがって、心拍数は低すぎる(短いネットワーク混雑やストリーム制御のため)、低すぎると誤報を招きやすく、経験5 s-20 sによると合理的である.
    参考記事:
  • RabbitMQ(四):channelのconfirmモードを深く学ぶ
  • when-publishes-are-confirmed
  • Channel-oriented API reference