Spring Boot入門のニュース中間件の使用
はじめに
メッセージ中間には2つの重要な概念がある。メッセージエージェントと宛先。メッセージ送信者がメッセージを送信すると、メッセージはメッセージエージェントによって引き継がれ、メッセージエージェント保証メッセージは指定された宛先に伝達される。
私たちがよく使うメッセージエージェントはJMSとAMQP仕様があります。対応して、それらの一般的な実装は、それぞれActiveMQおよびRabbiitMQである。
二、ActiveMQの整合
2.1依存関係を追加
2.3エンコーディング
クラスを設定:
メッセージ生産者:
2.4試験
キューメッセージを受信:hello activemq queue 1
キューメッセージを受信:hello activemq queue 2
キューメッセージ受信:ハローactivemq queue 3
キューメッセージを受信:hello activemq queue 4
キューメッセージの受信:hello activemq queue 5
リリース/購読モードをテストする時、spring.jms.pb-sub-doman=trueを設定します。
主題メッセージを受信:ハローactivemq topic 1
主題メッセージを受信する:hello activemq topic 2
主題メッセージを受信:ハローactivemq topic 3
主題メッセージを受信する:hello activemq topic 4
主題メッセージを受信する:hello activemq topic 5
三、ラビットMQを整合する。
3.1依存を追加
クラスを設定:
メッセージ生産者:
3.4テスト
ピットを踏んで注意する1:ACCESS_REFSED C Login was refused using authentication mechange ism PLAN
ソリューション:
1)ユーザー名とパスワードが正しいかどうかを確認してください。ユーザ名とパスワードの値にスペースやタブが含まれているかどうかは注意してください。
2)テストアカウントがgustである場合、rabitmq.comファイルを修正する必要があります。このファイルに「loopback_」を追加します。users=none」配置。
ステップリマインド2:Canot prepare queue for listener.Either the queue doesn't exist or the breoker will not allow us to use it
ソリューション:
私たちはRabbiitMQの管理画面にログインして、Queオプションに手動で対応するキューを追加できます。
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。
メッセージ中間には2つの重要な概念がある。メッセージエージェントと宛先。メッセージ送信者がメッセージを送信すると、メッセージはメッセージエージェントによって引き継がれ、メッセージエージェント保証メッセージは指定された宛先に伝達される。
私たちがよく使うメッセージエージェントはJMSとAMQP仕様があります。対応して、それらの一般的な実装は、それぞれActiveMQおよびRabbiitMQである。
二、ActiveMQの整合
2.1依存関係を追加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- , -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
2.2プロファイルを追加
# activemq
spring.activemq.broker-url=tcp://192.168.2.12:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=false
spring.activemq.pool.max-connections=50
# / , true
spring.jms.pub-sub-domain=false
ここのspring.activemq.pool.enabled=falseは接続池を閉じるという意味です。2.3エンコーディング
クラスを設定:
@Configuration
public class JmsConfirguration {
public static final String QUEUE_NAME = "activemq_queue";
public static final String TOPIC_NAME = "activemq_topic";
@Bean
public Queue queue() {
return new ActiveMQQueue(QUEUE_NAME);
}
@Bean
public Topic topic() {
return new ActiveMQTopic(TOPIC_NAME);
}
}
キューとテーマの作成を担当します。メッセージ生産者:
@Component
public class JmsSender {
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@Autowired
private JmsMessagingTemplate jmsTemplate;
public void sendByQueue(String message) {
this.jmsTemplate.convertAndSend(queue, message);
}
public void sendByTopic(String message) {
this.jmsTemplate.convertAndSend(topic, message);
}
}
メッセージ消費者:
@Component
public class JmsReceiver {
@JmsListener(destination = JmsConfirguration.QUEUE_NAME)
public void receiveByQueue(String message) {
System.out.println(" :" + message);
}
@JmsListener(destination = JmsConfirguration.TOPIC_NAME)
public void receiveByTopic(String message) {
System.out.println(" :" + message);
}
}
ニュース消費者は@Jms Listenerを使って、ニュースを監督する。2.4試験
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsTest {
@Autowired
private JmsSender sender;
@Test
public void testSendByQueue() {
for (int i = 1; i < 6; i++) {
this.sender.sendByQueue("hello activemq queue " + i);
}
}
@Test
public void testSendByTopic() {
for (int i = 1; i < 6; i++) {
this.sender.sendByTopic("hello activemq topic " + i);
}
}
}
印刷結果:キューメッセージを受信:hello activemq queue 1
キューメッセージを受信:hello activemq queue 2
キューメッセージ受信:ハローactivemq queue 3
キューメッセージを受信:hello activemq queue 4
キューメッセージの受信:hello activemq queue 5
リリース/購読モードをテストする時、spring.jms.pb-sub-doman=trueを設定します。
主題メッセージを受信:ハローactivemq topic 1
主題メッセージを受信する:hello activemq topic 2
主題メッセージを受信:ハローactivemq topic 3
主題メッセージを受信する:hello activemq topic 4
主題メッセージを受信する:hello activemq topic 5
三、ラビットMQを整合する。
3.1依存を追加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2プロファイルを追加
spring.rabbitmq.host=192.168.2.30
spring.rabbitmq.port=5672
spring.rabbitmq.username=light
spring.rabbitmq.password=light
spring.rabbitmq.virtual-host=/test
3.3符号化クラスを設定:
@Configuration
public class AmqpConfirguration {
//============= 、 ===============
public static final String SIMPLE_QUEUE = "simple_queue";
@Bean
public Queue queue() {
return new Queue(SIMPLE_QUEUE, true);
}
//=============== / ============
public static final String PS_QUEUE_1 = "ps_queue_1";
public static final String PS_QUEUE_2 = "ps_queue_2";
public static final String FANOUT_EXCHANGE = "fanout_exchange";
@Bean
public Queue psQueue1() {
return new Queue(PS_QUEUE_1, true);
}
@Bean
public Queue psQueue2() {
return new Queue(PS_QUEUE_2, true);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(psQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(psQueue2()).to(fanoutExchange());
}
//=============== ============
public static final String ROUTING_QUEUE_1 = "routing_queue_1";
public static final String ROUTING_QUEUE_2 = "routing_queue_2";
public static final String DIRECT_EXCHANGE = "direct_exchange";
@Bean
public Queue routingQueue1() {
return new Queue(ROUTING_QUEUE_1, true);
}
@Bean
public Queue routingQueue2() {
return new Queue(ROUTING_QUEUE_2, true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding directBinding1() {
return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");
}
@Bean
public Binding directBinding2() {
return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");
}
//=============== ============
public static final String TOPIC_QUEUE_1 = "topic_queue_1";
public static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static final String TOPIC_EXCHANGE = "topic_exchange";
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE_1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE_2, true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
}
}
RabbiitMQは複数の動作モードがあるので、配置が多いです。関連内容を知りたい方は、「RabbiitMQ仕事パターン紹介」または自分のBaidu関連資料を閲覧してください。メッセージ生産者:
@Component
public class AmqpSender {
@Autowired
private AmqpTemplate amqpTemplate;
/**
*
*
* @param message
*/
public void simpleSend(String message) {
this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message);
}
/**
* /
*
* @param message
*/
public void psSend(String message) {
this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message);
}
/**
*
*
* @param message
*/
public void routingSend(String routingKey, String message) {
this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message);
}
/**
*
*
* @param routingKey
* @param message
*/
public void topicSend(String routingKey, String message) {
this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message);
}
}
メッセージ消費者:
@Component
public class AmqpReceiver {
/**
*
*
* @param message
*/
@RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE)
public void simpleReceive(String message) {
System.out.println(" :" + message);
}
/**
* /
*
* @param message
*/
@RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1)
public void psReceive1(String message) {
System.out.println(AmqpConfirguration.PS_QUEUE_1 + " :" + message);
}
@RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2)
public void psReceive2(String message) {
System.out.println(AmqpConfirguration.PS_QUEUE_2 + " :" + message);
}
/**
*
*
* @param message
*/
@RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1)
public void routingReceive1(String message) {
System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + " :" + message);
}
@RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2)
public void routingReceive2(String message) {
System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + " :" + message);
}
/**
*
*
* @param message
*/
@RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1)
public void topicReceive1(String message) {
System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + " :" + message);
}
@RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2)
public void topicReceive2(String message) {
System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + " :" + message);
}
}
メッセージ消費者は@Rabbit Listenerを使ってメッセージを傍受する。3.4テスト
@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpTest {
@Autowired
private AmqpSender sender;
@Test
public void testSimpleSend() {
for (int i = 1; i < 6; i++) {
this.sender.simpleSend("test simpleSend " + i);
}
}
@Test
public void testPsSend() {
for (int i = 1; i < 6; i++) {
this.sender.psSend("test psSend " + i);
}
}
@Test
public void testRoutingSend() {
for (int i = 1; i < 6; i++) {
this.sender.routingSend("order", "test routingSend " + i);
}
}
@Test
public void testTopicSend() {
for (int i = 1; i < 6; i++) {
this.sender.topicSend("user.add", "test topicSend " + i);
}
}
}
テストの結果は省略しました。ピットを踏んで注意する1:ACCESS_REFSED C Login was refused using authentication mechange ism PLAN
ソリューション:
1)ユーザー名とパスワードが正しいかどうかを確認してください。ユーザ名とパスワードの値にスペースやタブが含まれているかどうかは注意してください。
2)テストアカウントがgustである場合、rabitmq.comファイルを修正する必要があります。このファイルに「loopback_」を追加します。users=none」配置。
ステップリマインド2:Canot prepare queue for listener.Either the queue doesn't exist or the breoker will not allow us to use it
ソリューション:
私たちはRabbiitMQの管理画面にログインして、Queオプションに手動で対応するキューを追加できます。
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。