RocketMQ:メッセージプルとメッセージキュー負荷分散メカニズム
69493 ワード
一、前言
Consumerメッセージ消費プロセスは複雑で、モジュールには、メッセージ検索、負荷等化、メッセージフィルタリング、メッセージ処理、再送確認、メッセージ進捗維持などが含まれています.紙面に限られ、本編では主にConsumer起動プロセスとメッセージ抽出実現メカニズムを紹介する.
メッセージ消費はグループのモードで展開され、1つの消費グループ内に複数の消費者を含むことができ、各消費グループは複数のTopicを購読することができ、消費グループの間にはクラスタモードと放送モードの2つの消費モードがある.クラスタモード:Topicの下の同じメッセージは、そのうちの1つの消費者によってのみ消費されることを許可する. ブロードキャストモード:Topicの下の同じメッセージはクラスタ内のすべての消費者に一度消費される.
メッセージングサーバと消費者間のメッセージングにも2つの方法があります.プルモード:消費者側が自発的にプルメッセージ要求を開始する. プッシュモード:メッセージがメッセージサーバに到着した後、メッセージ消費者にプッシュする.
なお、RocketMQメッセージプッシュモードの実現は、プルモードに基づいて、プルモードに1層包装され、1つのプルタスクが完了した後、次のプルタスクが開始される.
二、問題
Consumerの起動時にNamesrvアドレス、購読したTopic、メッセージリスナーを指定する必要があることは、公式サンプルを参照してください.
では、Consumerはどのように起動し、Brokerからメッセージ消費を引き出すのか.
では、クラスタモードでは、複数の消費者がメッセージキューをどのように負荷するのでしょうか.メッセージキュー負荷メカニズムは、1つのメッセージキューが同じ時間に1つの消費者によって消費されることを許可し、1つの消費者が複数のメッセージキューを消費することができるという共通の考え方に従う.
三、消費者起動プロセス
メッセージの消費と取得方法は2種類ありますが、ここでは日常開発でよく使われる方法:クラスタ消費、プッシュパターンについて分析し、D e f a u l t M QpushConsumerImpl#startメソッドのソースコードに入りましょう.
org.apache.rocketmq.client.impl.consumer.DefaultMQPShConsumerImplコードフラグメント
1.Topicの購読メッセージを構築し、Consumer起動前に設定したtopicテーマ名とフィルタ条件をカプセル化し、RebalanceImplに割り当て、後続の負荷均衡配分に遍歴条件を提供する.
2、MQClientインスタンスを作成し、いくつかのサービスインスタンスを初期化し、後続のメッセージ負荷分配、メッセージ引き抜きをサポートする.
3、Consuemrロードバランシングの例、割り当てポリシーを初期化し、ロードバランシングサービスによって定期的に呼び出し、消費キューを再割り当てする.
4、Consumemr消費進捗ストレージ、ブロードキャストモードは、ローカルクライアントで消費進捗を維持する.クラスタモードは、Brokerから消費の進捗を取得し、ローカル消費が成功すると、Brokerに進捗を更新します.
5、Comsuemrメッセージ消費サービス、メッセージ抽出サービスとのデカップリング、内部独立スレッドプール、抽出メッセージに対する非同期消費:成功、Brokerに消費の進度を更新する;失敗し、ブロードカーにメッセージを戻し、消費を遅らせる.
6、Consumemr登録リスト.
7、MQClientインスタンスを起動し、メッセージ引き出しサービス、メッセージ負荷分配サービスを起動し、Topicルーティング更新、Consumer消費進捗持続化などの定期的な実現などを起動する.
具体的なソース分析を行います.
org.apache.rocketmq.client.impl.factory.MQClientInstanceコードフラグメント
クラスタモードでは、同じ消費グループ内に複数のメッセージ消費者があり、同じTopicに複数の消費キューが存在する場合、消費者はどのようにメッセージキュー負荷の均衡を行うのか.
メッセージ・キューのロード・バランシングは、通常、1つの消費キューが同じ時間に1つのメッセージ・コンシューマによって消費されることを許可し、1つのメッセージ・コンシューマが複数のメッセージ・キューを同時に消費できるようにする方法ですが、RocketMQはどのように実現されますか?
上記の問題を踏まえて,RocketMQメッセージの消費メカニズムを検討し始めた.MQClientInstanceの起動プロセスから、RocketMQはメッセージのプルメッセージングを担当するために別のスレッドサービスPullMessageServiceを使用していることがわかります.
org.apache.rocketmq.client.impl.consumer.PullMessageServicerunコードフラグメント
PullMessageServiceメッセージはサービススレッドを引き、pullRequestQueueキューからPullRequestメッセージ引きタスクを取得し、pullRequestQueueが空の場合、スレッドは引きタスクが挿入されるまでブロックされ、pullMessageメソッドを呼び出してメッセージ引きを行います.
では、PullRequestはいつキューに入れられたのでしょうか.
org.apache.rocketmq.client.impl.consumer.PullMessageServiceコードフラグメント
追跡により、PullRequestは2つの場所で作成されます.
1、RocketMQはPullRequest引取タスクに従ってメッセージ引取を1回実行した後、PullRequestオブジェクトをpullRequestQueueに入れる.
org.apache.rocketmq.client.impl.consumer.DefaultMQPShConsumerImplコードフラグメント
2、RebalanceServiceサービススレッドによって登録された消費者がTopicを購読した情報をポーリングし、RebalanceImplで作成する.
org.apache.rocketmq.client.impl.consumer.RebalanceService
RebalanceServiceスレッドは、デフォルトで20 sごとにmqClientFactoryを実行します.doRevalance()メソッド
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalanceコードクリップ
これで,プルメッセージ要求はキューに入れられ,PullMessageServiceサービスによって消費される.
参照
『RocketMQテクノロジーの内幕』
Consumerメッセージ消費プロセスは複雑で、モジュールには、メッセージ検索、負荷等化、メッセージフィルタリング、メッセージ処理、再送確認、メッセージ進捗維持などが含まれています.紙面に限られ、本編では主にConsumer起動プロセスとメッセージ抽出実現メカニズムを紹介する.
メッセージ消費はグループのモードで展開され、1つの消費グループ内に複数の消費者を含むことができ、各消費グループは複数のTopicを購読することができ、消費グループの間にはクラスタモードと放送モードの2つの消費モードがある.
メッセージングサーバと消費者間のメッセージングにも2つの方法があります.
なお、RocketMQメッセージプッシュモードの実現は、プルモードに基づいて、プルモードに1層包装され、1つのプルタスクが完了した後、次のプルタスクが開始される.
二、問題
Consumerの起動時にNamesrvアドレス、購読したTopic、メッセージリスナーを指定する必要があることは、公式サンプルを参照してください.
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
では、Consumerはどのように起動し、Brokerからメッセージ消費を引き出すのか.
では、クラスタモードでは、複数の消費者がメッセージキューをどのように負荷するのでしょうか.メッセージキュー負荷メカニズムは、1つのメッセージキューが同じ時間に1つの消費者によって消費されることを許可し、1つの消費者が複数のメッセージキューを消費することができるという共通の考え方に従う.
三、消費者起動プロセス
メッセージの消費と取得方法は2種類ありますが、ここでは日常開発でよく使われる方法:クラスタ消費、プッシュパターンについて分析し、D e f a u l t M QpushConsumerImpl#startメソッドのソースコードに入りましょう.
org.apache.rocketmq.client.impl.consumer.DefaultMQPShConsumerImplコードフラグメント
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// ...
//1. topic
this.copySubscription();
//2. MQClientInstance 、
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//3
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//4.
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//5. , 、
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
//6. Consumer
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
//7. MQClientInstance
mQClientFactory.start();
this.serviceState = ServiceState.RUNNING;
break;
}
// , NameServer topic
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
1.Topicの購読メッセージを構築し、Consumer起動前に設定したtopicテーマ名とフィルタ条件をカプセル化し、RebalanceImplに割り当て、後続の負荷均衡配分に遍歴条件を提供する.
2、MQClientインスタンスを作成し、いくつかのサービスインスタンスを初期化し、後続のメッセージ負荷分配、メッセージ引き抜きをサポートする.
3、Consuemrロードバランシングの例、割り当てポリシーを初期化し、ロードバランシングサービスによって定期的に呼び出し、消費キューを再割り当てする.
4、Consumemr消費進捗ストレージ、ブロードキャストモードは、ローカルクライアントで消費進捗を維持する.クラスタモードは、Brokerから消費の進捗を取得し、ローカル消費が成功すると、Brokerに進捗を更新します.
5、Comsuemrメッセージ消費サービス、メッセージ抽出サービスとのデカップリング、内部独立スレッドプール、抽出メッセージに対する非同期消費:成功、Brokerに消費の進度を更新する;失敗し、ブロードカーにメッセージを戻し、消費を遅らせる.
6、Consumemr登録リスト.
7、MQClientインスタンスを起動し、メッセージ引き出しサービス、メッセージ負荷分配サービスを起動し、Topicルーティング更新、Consumer消費進捗持続化などの定期的な実現などを起動する.
具体的なソース分析を行います.
org.apache.rocketmq.client.impl.factory.MQClientInstanceコードフラグメント
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
// ...
//
this.mQClientAPIImpl.start();
// , namesrc
this.startScheduledTask();
//
this.pullMessageService.start();
// ,
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.serviceState = ServiceState.RUNNING;
break;
}
}
}
クラスタモードでは、同じ消費グループ内に複数のメッセージ消費者があり、同じTopicに複数の消費キューが存在する場合、消費者はどのようにメッセージキュー負荷の均衡を行うのか.
メッセージ・キューのロード・バランシングは、通常、1つの消費キューが同じ時間に1つのメッセージ・コンシューマによって消費されることを許可し、1つのメッセージ・コンシューマが複数のメッセージ・キューを同時に消費できるようにする方法ですが、RocketMQはどのように実現されますか?
上記の問題を踏まえて,RocketMQメッセージの消費メカニズムを検討し始めた.MQClientInstanceの起動プロセスから、RocketMQはメッセージのプルメッセージングを担当するために別のスレッドサービスPullMessageServiceを使用していることがわかります.
org.apache.rocketmq.client.impl.consumer.PullMessageServicerunコードフラグメント
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take(); // ,
this.pullMessage(pullRequest); //
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
PullMessageServiceメッセージはサービススレッドを引き、pullRequestQueueキューからPullRequestメッセージ引きタスクを取得し、pullRequestQueueが空の場合、スレッドは引きタスクが挿入されるまでブロックされ、pullMessageメソッドを呼び出してメッセージ引きを行います.
では、PullRequestはいつキューに入れられたのでしょうか.
org.apache.rocketmq.client.impl.consumer.PullMessageServiceコードフラグメント
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest); // ,
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
追跡により、PullRequestは2つの場所で作成されます.
1、RocketMQはPullRequest引取タスクに従ってメッセージ引取を1回実行した後、PullRequestオブジェクトをpullRequestQueueに入れる.
org.apache.rocketmq.client.impl.consumer.DefaultMQPShConsumerImplコードフラグメント
public void pullMessage(final PullRequest pullRequest) {
// ...
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// , , pullRequest pullRequestQueue
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// , pullRequestQueue,
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
// ...
break;
// ...
}
}
}
@Override
public void onException(Throwable e) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
// ...
try {
this.pullAPIWrapper.pullKernelImpl( //
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback // ,
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
2、RebalanceServiceサービススレッドによって登録された消費者がTopicを購読した情報をポーリングし、RebalanceImplで作成する.
org.apache.rocketmq.client.impl.consumer.RebalanceService
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance(); //
}
log.info(this.getServiceName() + " service end");
}
RebalanceServiceスレッドは、デフォルトで20 sごとにmqClientFactoryを実行します.doRevalance()メソッド
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalanceコードクリップ
public void doRebalance(final boolean isOrder) {
// 、 , DefaultMQPushConsumerImpl ,
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder); // Topic ,
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic(); //
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case CLUSTERING: {
// Topic , Comsumer org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// Broker ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// , ,
Collections.sort(mqAll);
Collections.sort(cidAll);
// , AllocateMessageQueueAveragely ,
// :8 ,3 , c1:q1、q2、q3,c2:q4、q5、q6,c3:q7、q8
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// ,
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
}
}
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
これで,プルメッセージ要求はキューに入れられ,PullMessageServiceサービスによって消費される.
参照
『RocketMQテクノロジーの内幕』