分散メッセージ-RocketMQ


一、中間件の紹介:
  • プロジェクトの住所:http://rocketmq.apache.org/
  • プロジェクト展開(windows環境):
  • > git clone -b develop https://github.com/apache/incubator-rocketmq.git
    > mvn clean install -Prelease-all -DskipTests -U
    > cd distribution/target/apache-rocketmq
    修正/bin/runserver.shおよび/bin/runbroker.sh起動パラメータ
    #===========================================================================================
    # JVM Configuration
    #===========================================================================================
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
    git bashでコマンドを実行します.
    > nohup sh bin/mqnamesrv &
    [1] 8596
    nohup: ignoring input and appending output to 'nohup.out'
    > tail nohup.out
    The Name Server boot success. serializeType=JSON
    >  nohup sh bin/mqbroker -n localhost:9876 &
    [2] 10420
    nohup: ignoring input and appending output to 'nohup.out'
    > export NAMESRV_ADDR=localhost:9876
    
    二、全体構成図
    分布式消息-RocketMQ_第1张图片は上の図に示すように、RocketMQの配備構造は以下の特徴がある.*Name Serverはほとんど無状態ノードであり、クラスタ化が可能であり、ノード間にはいかなる情報も同期されていない*Brokerの配置は比較的複雑で、多Master多Slaveモードを採用して、非同期でコピーすることを提案します.各マスターはSlaveを配置し、複数のMaster-Selaveがあり、HAは非同期コピー方式を採用しており、主に短いメッセージ遅延、ミリ秒レベルを備えている.利点:磁気ディスクが壊れても、メッセージが非常に少なくなり、メッセージのリアルタイム性は影響を受けない.マスターズマシンの後、消費者は依然としてSlaveから消費でき、このプロセスは透明である.手管はいらない.性能はマルチマスターモードとほぼ同じです.短所:Masterあたご、ディスクが破損した場合、少量のメッセージが失われます.Name Serverクラスタのうちの一つのノードと長い接続を確立して、定期的にName ServerからTopicルート情報を取って、Topicサービスを提供するMasterに長い接続を確立して、Masterに心拍を送ります.Producerは完全に無状態で、クラスタ配置が可能です.*ConsmerとName Serverクラスタのうちの一つのノード(ランダム選択)との長い接続を確立し、定期的にName ServerからTopicルーティング情報を取得し、Topicサービスを提供するMaster、Slaveに長い接続を確立し、タイマーでMaster、Slaveに心拍を送る.ConsmerはMasterからメッセージを購読することもできますし、Slaveからメッセージを購読することもできます.購読規則はBrokerによって構成されます.
    三、RMQ特性
    1、高効率スループット性
  • 独特のデータ構造であり、Rocketmqのメッセージの格納は、consume queueとcomitLogとの協働によってなされた
  • である.
  • consume queueは、メッセージのディレクトリに相当します.comitLogはメッセージが実際に保存されている物理的な位置です.メッセージの持続性は、ファイルシステム、kv型データベース、関係データベースとすることができます.システム非同期スレッドによってcommtLogOffsetと書くのは、このメッセージがcomitLogファイルに実際に保存されているオフセット量を指します.sizeはメッセージのサイズです.メッセージのhash値
  • 2、順番の問題
  • RocketMQは、すべてのキューをポーリングすることによって、メッセージがどのキューに送信されたかを判定する(負荷等化ポリシー)
  • .
  • 多くのサービスは、シーケンスメッセージの必要性があります.RocketMQはグローバルおよびローカルの順序をサポートしています.一般的には、ローカル順序を使用することを推奨しています.順序が要求されるようなメッセージhashを同じ列に並べて維持することができます.分布式消息-RocketMQ_第2张图片欠点:
  • 消費順序メッセージの並列度は、キュー数
  • に依存する.
  • 列のホットスポット問題は、個々のキューがハッシュの不均一によってメッセージが多すぎ、消費速度が追いつけなくなり、メッセージ蓄積問題が発生する
  • .
  • メッセージ失敗のメッセージがあって、スキップできませんでした.現在のキュー消費が一時停止されました.
    //producer.java
      SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    //consumer.java
     consumer.registerMessageListener(new MessageListenerOrderly() {
                AtomicLong consumeTimes = new AtomicLong(0);
    
                @Override
                public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(false);
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    this.consumeTimes.incrementAndGet();
                    if ((this.consumeTimes.get() % 2) == 0) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    } else if ((this.consumeTimes.get() % 3) == 0) {
                        return ConsumeOrderlyStatus.ROLLBACK;
                    } else if ((this.consumeTimes.get() % 4) == 0) {
                        return ConsumeOrderlyStatus.COMMIT;
                    } else if ((this.consumeTimes.get() % 5) == 0) {
                        context.setSuspendCurrentQueueTimeMillis(3000);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
    
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    分布式消息-RocketMQ_第3张图片
    3、メッセージ重複性の問題
    RocketMQメッセージが重複していないことを保証していません.もしあなたの業務が厳格な重複メッセージを保証する必要があるならば、あなた自身が業務上重いものを持っていく必要があります.消費者側はメッセージを処理する業務ロジックを累乗性に保つ*各メッセージに固有の番号があることを保証し、メッセージ処理の成功とデバッグのログが同時に表示されることを保証する.
    4、メッセージフィルタリング機構
  • は、リリース/購読メッセージモードにおいて、Broker側でMessage Tagペアを行い、まずConsme Queを巡回し、記憶されているMessage Tagが購読されているMessage Tagと一致しない場合は、スキップして、次の対に続き、該当する場合はConsmerに送信する.注意:Message Tagは文字列形式であり、Conssume Queにはその対応するhashcodeが格納されています.マッチング時もhashcodeに対してです.Consmerはフィルタリングされたメッセージを受け取ってからもBroker側での操作を行いますが、Hashcodeではなく本物のMessage Tag文字列が正しいです.
  • フィルタリング中にCommt Logデータにアクセスしません.積み上げ時にも効率的にフィルタリングできます.
    5、タイミングメッセージ
  • アプリケーションシーン、注文書の自動確認、注文書のクローズなどのタイミングタスク分布式消息-RocketMQ_第4张图片
  • 6、事務情報
  • は方向MQサーバにメッセージを送信する.
  • MQ Serverはメッセージを永続化した後、送信者にACK確認メッセージを送信しました.このときメッセージは半メッセージです.
  • 送信者はローカルトランザクション論理の実行を開始する.
  • 送信者は、ローカル事務の実行結果に基づいてMQ Serverに二次確認(ComitまたはRollback)を提出し、MQ ServerはComit状態を受信すると半メッセージを配達可能としてマークし、購読者は最終的にこのメッセージを受信する.MQ ServerはRollback状態を受信すると半メッセージを削除し、購読者はこのメッセージを受け付けません.
  • は、ネットワークを切断したり、アプリケーションを再起動したりする特殊な場合に、上記のステップ4によって提出された2回の確認は最終的にMQ Serverに到達していない.固定時間が経過すると、MQ Serverはメッセージを再確認する.
  • 送信者は、メッセージを受信した後、対応するメッセージのローカルトランザクションの最終結果を確認する必要がある.
  • 送信者は、検査によって得られたローカル・トランザクションの最終状態に基づいて再度二次確認を提出し、MQ Serverは依然としてステップ4に従って半メッセージを動作させる.トランザクションメッセージは、対応するステップ1、2、3、4を送信し、トランザクションメッセージは、対応するステップ5、6、7を調べる.
  • 四、メッセージ送信実現の詳細
    分布式消息-RocketMQ_第5张图片
    五、業界主流MQ対比
    分布式消息-RocketMQ_第6张图片 分布式消息-RocketMQ_第7张图片
    六、応用及び開発注意事項
  • 一般的なシーン:非同期処理、アプリケーションの結合、トラフィックカットフロントとメッセージ通信の4つのシーン1.異なるアプリケーション間のサービス呼び出しは、一般的に中間hsf、dubboなどのサービス、二乗のパケット依存、http方式およびwsサービス方式があり、内部アプリケーション間の呼び出しが頻繁であり、例えば、トラフィック設定パラメータは、メッセージ2.注文システム、在庫システム3.秒殺サービス4.ログ収集
  • 開発上の注意事項:
  • 一つのアプリケーションは、できるだけ一つのTopicを使って、Topicはコンサートコンソールを通じて申請します.メッセージのサブタイプはtagsで識別され、自分で定義できます.tagsを定義し、consumer端末でメッセージを購読する場合にのみ、tagsに基づいてbrook端末でメッセージをフィルタリングすることができます.
  • 各メッセージは、トラフィックレベルにおける一意の識別コードであり、将来のロケーションメッセージの損失問題を容易にするために、keysフィールドに設定される.サーバは、メッセージごとにインデックス(ハッシュインデックス)を作成し、アプリケーションは、topic、keyを介してこのメッセージの内容を照会し、メッセージが誰によって消費されるかを調べることができる.ハッシュインデックスですので、可能な限り唯一のkeyが、潜在的なハッシュ衝突を回避できるようにしてください.
  • メッセージの送信に成功したり失敗したりした場合、メッセージログを印刷するには、必ずsendresultとkeyフィールドを印刷します.
  • メッセージの送信に失敗し、アプリケーションレベルでスレッドタイミング再送信によって処理される
  • .
  • consumer端末は重複メッセージを処理し、アプリケーションレベルは累乗などの処理を行うべきであり、メッセージのビジネスレベルの識別コードkeysフィールドを使用して
  • を重くすることを提案する.