Spring BootはRabbiitMQとRabbiitMQを使って紹介します.
12845 ワード
title: SpringBoot MQ
date: 2019-06-24
author: maxzhao
tags:
- SpringBoot
- MQ
categories:
- DevelopTools
- SpringBoot
-
SpringBootはRabbiitMQを使用しています.ラビットMQは
作用:主にアプリケーションの非同期と結合を実現するために用いられ、同時にメッセージバッファ、メッセージ配信の役割も果たすことができる.
プロセス:
一般的なメッセージ・キューは、生産者がメッセージをキューに送り、消費者がモニターして消費するものです.
rabbiitmqのうちの一つの仮想ホスト(vhostデフォルト/)は、一つ以上のスイッチ(Exchange)を持っている.ユーザーは仮想ホストの粒度のみで権限制御を行うことができ、スイッチは、あるポリシーに従ってキューにバインディング(Binding)し、このように生産者とキューは直接に連絡せずに、メッセージを送信するスイッチであり、スイッチは、メッセージをバインディングされたキューに転送する.
スイッチ(Exchange)は
rabbitmq
独特の概念で、最もよく使われるのは4つのタイプです.例をあげて説明する
2つのスイッチ
directExchange
、fanoutExchange
、3つのキューqueueA
、queueB
、queueC
.キュー
directExchange
は、ポイントとして送信され、キューA Bを含む.キュー
fanoutExchange
は、ブロードキャストとして送信され、キューA B Cを含む.@Configuration
public class RabbitConfig {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* Direct .
*
* @return the exchange
*/
@Bean("directExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
}
/**
* fanout .
*
* @return the exchange
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
}
/**
* .
*
* @return the queue
*/
@Bean("queueA")
public Queue directQueue() {
return QueueBuilder.durable("QUEUE_A").build();
}
@Bean("queueB")
public Queue directQueue() {
return QueueBuilder.durable("QUEUE_B").build();
}
@Bean("queueC")
public Queue directQueue() {
return QueueBuilder.durable("QUEUE_C").build();
}
/**
* A direct .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queue,
@Qualifier("directExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_A").noargs();
}
@Bean
public Binding bindingA(@Qualifier("queueB") Queue queue,
@Qualifier("directExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_B").noargs();
}
/**
* A fanout .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queue,
@Qualifier("fanoutExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingA(@Qualifier("queueB") Queue queue,
@Qualifier("fanoutExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).;
}
@Bean
public Binding bindingA(@Qualifier("queueC") Queue queue,
@Qualifier("fanoutExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).;
}
}
メッセージ送信クラス@Service
public class SenderService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private RabbitTemplate rabbitTemplate;
/**
* .
*
* @param p the p
* @return the response entity
*/
public void broadcast(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
}
/**
* Direct .
*
* @param p the p
* @return the response entity
*/
public void directA(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_A", p, correlationData);
}
public void directB(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_B", p, correlationData);
}
public void directNull(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "", p, correlationData);
}
}
メッセージ受信クラス@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
/**
* FANOUT .
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception
*/
@RabbitListener(queues = {"QUEUE_A"})
public void on(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
}
@RabbitListener(queues = {"QUEUE_B"})
public void t(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
}
@RabbitListener(queues = {"QUEUE_C"})
public void t(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_C " + new String(message.getBody()));
}
/**
* DIRECT .
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception
*/
@RabbitListener(queues = {"DIRECT_QUEUE"})
public void message(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("DIRECT_QUEUE " + new String(message.getBody()));
}
}
テストクラス@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class SenderServiceTest {
@Autowired
private SenderService senderService;
@Test
public void testCache() throws InterruptedException {
//
senderService.broadcast(" Test !");
// Direct
senderService.directA(" Test A ");
senderService.directB(" Test B ");
senderService.directNull(" Test null key ");
Thread.sleep(5000L);
}
}
結果DIRECT_QUEUE_A " Test !"
FANOUT_QUEUE_B " Test !"
FANOUT_QUEUE_C " Test !"
DIRECT_QUEUE_A" Test A "
DIRECT_QUEUE_B" Test B "
null keyは現れていないので、directExchange
にはブロードキャスト可能なキューがありません.Maven依存
org.springframework.boot
spring-boot-starter-amqp
設定spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: maxzhao
password: maxzhao
#
publisher-confirms: true
#
publisher-returns: true
# /
virtual-host: maxzhao_vhost
listener:
simple:
acknowledge-mode: manual #
concurrency: 1 #
max-concurrency: 1 #
retry:
enabled: true #
その他の参考JMS紹介と使用シーン
プロフィール:
ニュースキューとJMSの基礎知識と使用シーンを説明します.
専門用語
RabbiitMQは優れたメッセージングエージェントです.メッセージを受け取り、転送します.これを郵便局と見なしてもいいです.自分の手紙を宛先の住所に書いてポストに入れると大丈夫です.郵便局が責任を持って目的地に送ります.
一、据え付ける
ここで使っているのは
ArchLInux
です.sudo pacman -S rabbitmq
# socat erlang-nox rabbitmq
RPM実装# erlang
rpm -Uvh https://mirrors.ustc.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm
yum install erlang
# RabbitMQ
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
wget ftp://195.220.108.108/linux/centos/7.4.1708/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm
yum localinstall -C -y --disablerepo=* *.rpm
web管理画面のインストールsudo rabbitmq-plugins enable rabbitmq_management
#
sudo rabbitmq-plugins list
起動#
systemctl enable rabbitmq-server
# RabbitMQ
systemctl start rabbitmq-server
#
systemctl restart rabbitmq-server
設定vim /etc/rabbitmq/rabbitmq.config
設定ポート、備考:メッセージポート5672
であれば、ウェブアクセスポートは15672
である.[
{rabbit,
[
{loopback_users, []},
{tcp_listeners, [5672]}
]
}
]
ユーザー管理# guest
sudo rabbitmqctl list_users
sudo rabbitmqctl change_password guest guest
# test/test:
sudo rabbitmqctl add_user maxzhao maxzhao
#
sudo rabbitmqctl set_user_tags maxzhao administrator
# / vhost Configure regexp Write regexp Read regexp
sudo rabbitmqctl set_permissions -p / maxzhao ".*" ".*" ".*"
# Sets user topic permissions for an exchange, AMQP default exchange
# sudo rabbitmqctl set_topic_permissions
vhostを追加#
sudo rabbitmqctl --help
# vhost
sudo rabbitmqctl add_vhost --help
#
sudo rabbitmqctl add_vhost maxzhao_vhost
#
sudo rabbitmqctl list_vhosts
#
sudo rabbitmqctl set_permissions -p /maxzhao_vhost maxzhao ".*" ".*" ".*"
vhostを削除sudo rabbitmqctl add_vhost maxzhaoTest
sudo rabbitmqctl delete_vhost maxzhaoTest
二、タスクのキューなどの参照本論文の住所:SpringBootはRabbiitMQ及びRabbiitMQを使用して紹介する.
おすすめ
公式の例
下の2017年に書いたのですが、比較的全面的な新米教材です.
RabbiitMQ簡易教程-タスクキュー
RabbiitMQ簡易教程-購読を発表します.
RabbiitMQ簡易教程-ルート
RabbiitMQ簡易教程-テーマ
RabbiitMQ簡易教程-RPC
RabbitMQ簡易教程-Websocket
RabbiitMQ簡易教程-併発スケジュール