RabbitMQ学習のspring統合送信非同期メッセージ(注釈実装)

5972 ワード

実装で使用するExchangeタイプがDirectExchange.routingkeyの名前は、デフォルトでQueueの名前です.注記非同期送信メッセージを実装します.
1.生産者構成ProducerConfiguration.java
package cn.slimsmart.rabbitmq.demo.spring.async;

import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;

import com.rabbitmq.client.AMQP;

@Configuration
public class ProducerConfiguration {

	//        routingkey      Queue   ,  Exchange   DirectExchange
	protected final String helloWorldQueueName = "spring-queue-async";

	//     
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.36.102");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setPort(AMQP.PROTOCOL.PORT);
		return connectionFactory;
	}

	//   rabbitTemplate      
	@Bean
	public RabbitTemplate rabbitTemplate() {
		RabbitTemplate template = new RabbitTemplate(connectionFactory());
		template.setRoutingKey(this.helloWorldQueueName);
		return template;
	}

	//      
	@Bean
	public ScheduledProducer scheduledProducer() {
		return new ScheduledProducer();
	}

	@Bean
	public BeanPostProcessor postProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}

	
	static class ScheduledProducer {

		@Autowired
		private volatile RabbitTemplate rabbitTemplate;

		//    
		private final AtomicInteger counter = new AtomicInteger();
		/**
		 *  3       
		 * 
		 * Spring3         ,            ,                   :
		    Java ,             ,     @Scheduled      ;
		 Spring         <task:**** />  ;
		  :http://zywang.iteye.com/blog/949123
		 */
		@Scheduled(fixedRate = 3000)
		public void sendMessage() {
			rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
		}
	}

}
2.生産者起動クラスProducer,java
package cn.slimsmart.rabbitmq.demo.spring.async;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Producer {

	public static void main(String[] args) {
		new AnnotationConfigApplicationContext(ProducerConfiguration.class);
	}
}
3.受信メッセージ処理クラスReceiveMsgHandler.java
package cn.slimsmart.rabbitmq.demo.spring.async;

public class ReceiveMsgHandler {

	public void handleMessage(String text) {
		System.out.println("Received: " + text);
	}
}
4.コンシューマコンフィギュレーション
package cn.slimsmart.rabbitmq.demo.spring.async;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.rabbitmq.client.AMQP;

@Configuration
public class ConsumerConfiguration {

	//        routingkey      Queue   ,  Exchange   DirectExchange
	protected String springQueueDemo = "spring-queue-async";

	//     
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
				"192.168.36.102");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setPort(AMQP.PROTOCOL.PORT);
		return connectionFactory;
	}

	//   rabbitAdmin    
	@Bean
	public AmqpAdmin amqpAdmin() {
		return new RabbitAdmin(connectionFactory());
	}

	//   rabbitTemplate      
	@Bean
	public RabbitTemplate rabbitTemplate() {
		RabbitTemplate template = new RabbitTemplate(connectionFactory());
		// The routing key is set to the name of the queue by the broker for the
		// default exchange.
		template.setRoutingKey(this.springQueueDemo);
		// Where we will synchronously receive messages from
		template.setQueue(this.springQueueDemo);
		return template;
	}

	//
	// Every queue is bound to the default direct exchange
	public Queue helloWorldQueue() {
		return new Queue(this.springQueueDemo);
	}

	@Bean
	public SimpleMessageListenerContainer listenerContainer() {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(connectionFactory());
		container.setQueueNames(this.springQueueDemo);
		container.setMessageListener(new MessageListenerAdapter(
				new ReceiveMsgHandler()));
		return container;
	}

}
5.消費者起動クラスConsumer.java
package cn.slimsmart.rabbitmq.demo.spring.async;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Consumer {
	public static void main(String[] args) {
		new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
	}
}

受信メッセージを開始し、メッセージを送信
Received: Hello World 1
Received: Hello World 2
Received: Hello World 3
Received: Hello World 4
Received: Hello World 5
Received: Hello World 6
Received: Hello World 7
......

spring-queue-asyncメッセージキューが存在しないと報告された場合は、コンソールに追加します.