RabbitMQ自動拡張消費者ソース分析

10613 ワード

1はじめに
RabbitMQ異常監視およびキュー消費を動的に制御するソリューションでは、オンラインで消費者数を動的に変更する方法が提供されているが、この方法を使用するには、偶発的なメッセージ蓄積シーンに対応するために、キューメッセージの蓄積状況をタイムリーに監視する必要があり、消費者数を自動的に拡張(増加または減少)することはできない.ソースコードを読むことによって、RabbitMQはすでに消費者数を自動的に拡張する方法を提供していることが分かった.
2 SimpleMessageListenerContainerソース分析
SimpleMessageListenerContainerは、メッセージキューをリスニングするコンテナであり、メッセージキューの各リスニング者はAsyncMessageProcessingConsumerインスタンスである.RabbitMQ自動拡張消費者数の構成およびコアロジックはいずれもこのクラスにあり、具体的なソースコード分析は以下の通りである.
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
		implements ApplicationEventPublisherAware {
		
	private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
	private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
	private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
	private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
	public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
	public static final int DEFAULT_PREFETCH_COUNT = 1;
	public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;

	//              ,  10s,   
	private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
	//              ,  60s,   
	private volatile long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL;
	//                 considerAddingAConsumer,  10 ,   
	private volatile int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER;
	//             considerStoppingAConsumer,  10 ,   
	private volatile int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER;
	//   (   )     ,  1,volatile  ,       
	private volatile int concurrentConsumers = 1;
	//        ,   
	private volatile Integer maxConcurrentConsumers;
	//              
	private volatile long lastConsumerStarted;
	//             
	private volatile long lastConsumerStopped;
	//                ,  1s,   
	private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
	//           ,          
	private Set consumers;
	//       ,              ,  、         ,       ,    
	private final Object consumersMonitor = new Object();
	//            ,     :            ,   
	private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();

	/**
	*          。                  。
	*       ,               ,               
	*/
	public void setConcurrentConsumers(final int concurrentConsumers) {
		Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
		Assert.isTrue(!this.exclusive || concurrentConsumers == 1,
				"When the consumer is exclusive, the concurrency must be 1");
		if (this.maxConcurrentConsumers != null) {
			Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
					"'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
		}
		synchronized (this.consumersMonitor) {
			if (logger.isDebugEnabled()) {
				logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
			}
			//     ( )        
			int delta = this.concurrentConsumers - concurrentConsumers;
			this.concurrentConsumers = concurrentConsumers;
			if (isActive() && this.consumers != null) {
				if (delta > 0) {
					//   delta    
					Iterator consumerIterator = this.consumers.iterator();
					while (consumerIterator.hasNext() && delta > 0) {
						BlockingQueueConsumer consumer = consumerIterator.next();
						consumer.basicCancel(true);
						consumerIterator.remove();
						delta--;
					}
				}
				else {
					//   delta    
					addAndStartConsumers(-delta);
				}
			}
		}
	}

	/**
	*         。                。
	*/
	protected void addAndStartConsumers(int delta) {
		synchronized (this.consumersMonitor) {
			if (this.consumers != null) {
				for (int i = 0; i < delta; i++) {
					BlockingQueueConsumer consumer = createBlockingQueueConsumer();
					this.consumers.add(consumer);
					AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
					if (logger.isDebugEnabled()) {
						logger.debug("Starting a new consumer: " + consumer);
					}
					this.taskExecutor.execute(processor);
					if (this.applicationEventPublisher != null) {
						this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
					}
					try {
						FatalListenerStartupException startupException = processor.getStartupException();
						if (startupException != null) {
							this.consumers.remove(consumer);
							throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
						}
					}
					catch (InterruptedException ie) {
						Thread.currentThread().interrupt();
					}
					catch (Exception e) {
						consumer.stop();
						logger.error("Error starting new consumer", e);
						this.cancellationLock.release(consumer);
						this.consumers.remove(consumer);
					}
				}
			}
		}
	}

	/**
	*          。
	*       consumers  Container  stop,      。
	*/
	private boolean isActive(BlockingQueueConsumer consumer) {
		boolean consumerActive;
		synchronized (this.consumersMonitor) {
			consumerActive = this.consumers != null && this.consumers.contains(consumer);
		}
		return consumerActive && this.isActive();
	}

	/**
	*          。              。
	*      (          )      consecutiveActiveTrigger(  10)    ,       。
	*                 ,   stopConsumerMinInterval        ,        。
	*/
	private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
		synchronized (this.consumersMonitor) {
			if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
				long now = System.currentTimeMillis();
				if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
					consumer.basicCancel(true);
					this.consumers.remove(consumer);
					if (logger.isDebugEnabled()) {
						logger.debug("Idle consumer terminating: " + consumer);
					}
					this.lastConsumerStopped = now;
				}
			}
		}
	}


}


3 A syncMessageProcessingConsumerソース分析
AsyncMessageProcessingConsumerは、SimpleMessageListenerContainerのプライベート内部クラスであり、メッセージキュー消費メッセージをリスニングするコアクラスである.消費者数を自動的に拡張するコアロジックもこのクラスにあり、具体的なソースコード分析は以下の通りである.
	private final class AsyncMessageProcessingConsumer implements Runnable {

		private final BlockingQueueConsumer consumer;

		private final CountDownLatch start;

		private volatile FatalListenerStartupException startupException;

		private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
			this.consumer = consumer;
			this.start = new CountDownLatch(1);
		}

		@Override
		public void run() {

			boolean aborted = false;
			int consecutiveIdles = 0;
			int consecutiveMessages = 0;
			
			// some other code
			
			try {
				// some other code
				//                     
				while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
					try {
						boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
						if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
							//                 
							if (receivedOk) {
								if (isActive(this.consumer)) {
									consecutiveIdles = 0;
									if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
										//          N    ,  considerAddingAConsumer
										considerAddingAConsumer();
										consecutiveMessages = 0;
									}
								}
							}
							else {
								consecutiveMessages = 0;
								if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
									//        N  ,  considerStoppingAConsumer
									considerStoppingAConsumer(this.consumer);
									consecutiveIdles = 0;
								}
							}
						}
						if (SimpleMessageListenerContainer.this.idleEventInterval != null) {
							//      publishIdleContainerEvent  
						}
					}
					catch (ListenerExecutionFailedException ex) {
						// Continue to process, otherwise re-throw
						if (ex.getCause() instanceof NoSuchMethodException) {
							throw new FatalListenerExecutionException("Invalid listener", ex);
						}
					}
					catch (AmqpRejectAndDontRequeueException rejectEx) {
						// do nothing
					}
				}
			}
			catch (Exception e) {
				//         ,        ,        
			}
			finally {
				if (SimpleMessageListenerContainer.this.transactionManager != null) {
					ConsumerChannelRegistry.unRegisterConsumerChannel();
				}
			}

			this.start.countDown();
		
			/*
			*        ,                  。      :
			* (1)aborted=true,   Error               ;
			* (2)isActive=false,   considerStoppingAConsumer,         。
			*/
			if (!isActive(this.consumer) || aborted) {
				logger.debug("Cancelling " + this.consumer);
				try {
					this.consumer.stop();
					SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
					if (SimpleMessageListenerContainer.this.applicationEventPublisher != null) {
						SimpleMessageListenerContainer.this.applicationEventPublisher.publishEvent(
								new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
					}
				}
				catch (AmqpException e) {
					logger.info("Could not cancel message consumer", e);
				}
				if (aborted) {
					logger.error("Stopping container from aborted consumer");
					//        
					stop();
				}
			}
			else {
				logger.info("Restarting " + this.consumer);
				//      ,            ,         
				restart(this.consumer);
			}

			if (routingLookupKey != null) {
				SimpleResourceHolder.unbind(getRoutingConnectionFactory());
			}

		}

	}

4消費者構成の自動拡張
アプリケーションMQ構成クラスに次の構成を追加すればよい
		@Bean
        public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory(
                ConnectionFactory myConnectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            //        fetch       
            factory.setPrefetchCount(2);
            //          
            factory.setConcurrentConsumers(2);
            //          ,             
            factory.setMaxConcurrentConsumers(5);
            //     ACK
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            configurer.configure(factory, myConnectionFactory);
            return factory;
        }