RabbitMQ自動拡張消費者ソース分析
10613 ワード
1はじめに
RabbitMQ異常監視およびキュー消費を動的に制御するソリューションでは、オンラインで消費者数を動的に変更する方法が提供されているが、この方法を使用するには、偶発的なメッセージ蓄積シーンに対応するために、キューメッセージの蓄積状況をタイムリーに監視する必要があり、消費者数を自動的に拡張(増加または減少)することはできない.ソースコードを読むことによって、RabbitMQはすでに消費者数を自動的に拡張する方法を提供していることが分かった.
2 SimpleMessageListenerContainerソース分析
SimpleMessageListenerContainerは、メッセージキューをリスニングするコンテナであり、メッセージキューの各リスニング者はAsyncMessageProcessingConsumerインスタンスである.RabbitMQ自動拡張消費者数の構成およびコアロジックはいずれもこのクラスにあり、具体的なソースコード分析は以下の通りである.
3 A syncMessageProcessingConsumerソース分析
AsyncMessageProcessingConsumerは、SimpleMessageListenerContainerのプライベート内部クラスであり、メッセージキュー消費メッセージをリスニングするコアクラスである.消費者数を自動的に拡張するコアロジックもこのクラスにあり、具体的なソースコード分析は以下の通りである.
4消費者構成の自動拡張
アプリケーションMQ構成クラスに次の構成を追加すればよい
RabbitMQ異常監視およびキュー消費を動的に制御するソリューションでは、オンラインで消費者数を動的に変更する方法が提供されているが、この方法を使用するには、偶発的なメッセージ蓄積シーンに対応するために、キューメッセージの蓄積状況をタイムリーに監視する必要があり、消費者数を自動的に拡張(増加または減少)することはできない.ソースコードを読むことによって、RabbitMQはすでに消費者数を自動的に拡張する方法を提供していることが分かった.
2 SimpleMessageListenerContainerソース分析
SimpleMessageListenerContainerは、メッセージキューをリスニングするコンテナであり、メッセージキューの各リスニング者はAsyncMessageProcessingConsumerインスタンスである.RabbitMQ自動拡張消費者数の構成およびコアロジックはいずれもこのクラスにあり、具体的なソースコード分析は以下の通りである.
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
implements ApplicationEventPublisherAware {
private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
public static final int DEFAULT_PREFETCH_COUNT = 1;
public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
// , 10s,
private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
// , 60s,
private volatile long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL;
// considerAddingAConsumer, 10 ,
private volatile int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER;
// considerStoppingAConsumer, 10 ,
private volatile int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER;
// ( ) , 1,volatile ,
private volatile int concurrentConsumers = 1;
// ,
private volatile Integer maxConcurrentConsumers;
//
private volatile long lastConsumerStarted;
//
private volatile long lastConsumerStopped;
// , 1s,
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
// ,
private Set consumers;
// , , 、 , ,
private final Object consumersMonitor = new Object();
// , : ,
private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();
/**
* 。 。
* , ,
*/
public void setConcurrentConsumers(final int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
Assert.isTrue(!this.exclusive || concurrentConsumers == 1,
"When the consumer is exclusive, the concurrency must be 1");
if (this.maxConcurrentConsumers != null) {
Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
"'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
}
synchronized (this.consumersMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
}
// ( )
int delta = this.concurrentConsumers - concurrentConsumers;
this.concurrentConsumers = concurrentConsumers;
if (isActive() && this.consumers != null) {
if (delta > 0) {
// delta
Iterator consumerIterator = this.consumers.iterator();
while (consumerIterator.hasNext() && delta > 0) {
BlockingQueueConsumer consumer = consumerIterator.next();
consumer.basicCancel(true);
consumerIterator.remove();
delta--;
}
}
else {
// delta
addAndStartConsumers(-delta);
}
}
}
}
/**
* 。 。
*/
protected void addAndStartConsumers(int delta) {
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
for (int i = 0; i < delta; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Starting a new consumer: " + consumer);
}
this.taskExecutor.execute(processor);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
try {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
this.consumers.remove(consumer);
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
consumer.stop();
logger.error("Error starting new consumer", e);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}
}
}
/**
* 。
* consumers Container stop, 。
*/
private boolean isActive(BlockingQueueConsumer consumer) {
boolean consumerActive;
synchronized (this.consumersMonitor) {
consumerActive = this.consumers != null && this.consumers.contains(consumer);
}
return consumerActive && this.isActive();
}
/**
* 。 。
* ( ) consecutiveActiveTrigger( 10) , 。
* , stopConsumerMinInterval , 。
*/
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
synchronized (this.consumersMonitor) {
if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
consumer.basicCancel(true);
this.consumers.remove(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Idle consumer terminating: " + consumer);
}
this.lastConsumerStopped = now;
}
}
}
}
}
3 A syncMessageProcessingConsumerソース分析
AsyncMessageProcessingConsumerは、SimpleMessageListenerContainerのプライベート内部クラスであり、メッセージキュー消費メッセージをリスニングするコアクラスである.消費者数を自動的に拡張するコアロジックもこのクラスにあり、具体的なソースコード分析は以下の通りである.
private final class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalListenerStartupException startupException;
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
@Override
public void run() {
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
// some other code
try {
// some other code
//
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
//
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
// N , considerAddingAConsumer
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
// N , considerStoppingAConsumer
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
if (SimpleMessageListenerContainer.this.idleEventInterval != null) {
// publishIdleContainerEvent
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
// do nothing
}
}
}
catch (Exception e) {
// , ,
}
finally {
if (SimpleMessageListenerContainer.this.transactionManager != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
this.start.countDown();
/*
* , 。 :
* (1)aborted=true, Error ;
* (2)isActive=false, considerStoppingAConsumer, 。
*/
if (!isActive(this.consumer) || aborted) {
logger.debug("Cancelling " + this.consumer);
try {
this.consumer.stop();
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (SimpleMessageListenerContainer.this.applicationEventPublisher != null) {
SimpleMessageListenerContainer.this.applicationEventPublisher.publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
}
catch (AmqpException e) {
logger.info("Could not cancel message consumer", e);
}
if (aborted) {
logger.error("Stopping container from aborted consumer");
//
stop();
}
}
else {
logger.info("Restarting " + this.consumer);
// , ,
restart(this.consumer);
}
if (routingLookupKey != null) {
SimpleResourceHolder.unbind(getRoutingConnectionFactory());
}
}
}
4消費者構成の自動拡張
アプリケーションMQ構成クラスに次の構成を追加すればよい
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory(
ConnectionFactory myConnectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// fetch
factory.setPrefetchCount(2);
//
factory.setConcurrentConsumers(2);
// ,
factory.setMaxConcurrentConsumers(5);
// ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, myConnectionFactory);
return factory;
}