九、springbootはrabbiitMQを整合する.
springboot整合rabbiitMQ
概要ラビットMQは、最も広く展開されているオープンソースメッセージエージェントである. ラビットMQライト級で、内部と雲の中に配置しやすい.多くのメッセージ伝達プロトコルをサポートします.RabbiitMQは、大規模で利用可能性の高い要件を満たすために配置され得る. ラビットMQの取り付け
ブログではLinuxシステムとWindowsシステムのインストールを紹介しています. Linux Linuxにrabitmq 3.7.8をインストールします. win Windows下のRabbiitMQインストール及び注意事項 準備工作 1 pom.xml jar導入: appration.ymlファイル構成(ローカルインストールであれば、デフォルトのパスワードポートを使用して、この構成は無視できます) メッセージ受信者Receiver 4ラビットMqConfigファイルを書く メッセージ送信者Runner プロジェクトの運行は以下の結果を見ました.
遅延列とは何ですか?遅延列はどのような場面に適用されますか?
遅延列は名前の通り、すぐに消費する必要はないというメッセージではなく、しばらく待ってから消費を取り出す.なぜ消費を遅らせる必要がありますか?次のような場面を見に来ました. オンラインショッピングセンターで注文してから30分後に支払いが完了しませんでした.注文をキャンセルします. システムは予約を作成した後、予約時間の1時間前に予約された双方の参加を通知する必要があります. システムの業務が失敗したら、再試行が必要です.
Rabbitmq実施遅延行列は、一般的に2つの形態がある. 第1の方法:2つの特性を利用する:Time To Live(TTL)、Dead Letter Exchanges(DLX) 第2の方法:rabitmqにおけるプラグインx-delay-message を利用する.
TTL DLXを利用して遅延列を実現する方式
TTL DXとは何ですか TTL RabbirtMQは、キューに対してx-expiresを設定してもよく、タイムアウト(両方とも最初の満了時間を基準として設定しても良い)、またはメッセージに対してx-message-ttlを設定してもよく、メッセージの生存時間を制御してもよい. Dead Letter Exchange(DLX)RabbiitMQのQueは、x-dead-letter-exchangeとx-dead-letter-routing-keyの2つのパラメータを設定することができ、列内にdead letterがあれば、この2つのパラメータに従って指定されたキューに再ルーティングする.x-dead-letter-exchange:dead letterが現れたら、dead letterを指定されたexchange x-dead-letter-routing-keyに再送信します.dead letterが現れたら、dead letterを再度指定されたrouting-keyに従って を送ります.
第一の方式を実現して直接コードを貼ります.は、Rabbiit MqConfig(queueとexchangeとの間のbinding関係を初期化する) を作成する.生産者 消費者Receiver 試験類 起動項目はコンソールの印刷効果を見ます.
https://gitee.com/honghh/boot-demo.git
参考文献
https://spring.io/guides/gs/messaging-rabbitmq/
概要
ブログではLinuxシステムとWindowsシステムのインストールを紹介しています.
org.springframework.boot
spring-boot-starter-amqp
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received ");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
@Configuration
public class RabbitMqConfig {
final static String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
@Component
public class Runner implements CommandLineRunner {
final static String queueName = "spring-boot";
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private final ConfigurableApplicationContext context;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate,
ConfigurableApplicationContext context) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
this.context = context;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
context.close();
}
}
Sending message...
Received
これは簡単なspringboot rabbiitMQ統合です.もちろん、rabitMQのサポートはそれだけではなく、遅延行列も提供しています.遅延列とは何ですか?遅延列はどのような場面に適用されますか?
遅延列は名前の通り、すぐに消費する必要はないというメッセージではなく、しばらく待ってから消費を取り出す.なぜ消費を遅らせる必要がありますか?次のような場面を見に来ました.
Rabbitmq実施遅延行列は、一般的に2つの形態がある.
TTL DLXを利用して遅延列を実現する方式
TTL DXとは何ですか
第一の方式を実現して直接コードを貼ります.
@Configuration
public class RabbitMqConfig {
/**
*
* @return
*/
@Bean
public Queue immediateQueue() {
// queue ,
return new Queue(Constants.IMMEDIATE_QUEUE, true);
}
/**
*
* @return
*/
@Bean
public Queue delayQueue() {
Map params = new HashMap<>();
// x-dead-letter-exchange DLX ,
params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key routing-key 。
params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
}
@Bean
public DirectExchange immediateExchange() {
// :
// , exchange ,
// , exchange , , ,
// , Map,Map exchange
return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
// , exchange , , exchange , , ,
// Map,Map exchange
return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false);
}
/**
* exchange
* @return
*/
@Bean
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(Constants.IMMEDIATE_ROUTING_KEY);
}
/**
* exchange
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(Constants.DELAY_ROUTING_KEY);
}
}
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* Description :
* Group :
*
* @author honghh
* @date 2019/3/8 0008 18:03
* @param msg
* @param delayTime
*/
public void send(String msg, int delayTime) {
System.out.println("delayTime:[ms]" + delayTime);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
this.rabbitTemplate.convertAndSend(
Constants.DEAD_LETTER_EXCHANGE,
Constants.DELAY_ROUTING_KEY,
msg,
message -> {
message.getMessageProperties().setExpiration(delayTime + "");
System.out.println(sdf.format(new Date()) + " .");
return message;
});
}
}
@Component
@EnableRabbit
@Configuration
public class Receiver {
@RabbitListener(queues = Constants.IMMEDIATE_QUEUE)
@RabbitHandler
public void get(String msg) {
System.out.println(new Date() + " : " + msg);
}
}
@Autowired
Sender sender;
@Test
public void contextLoads() {
sender.send("msg--send", 10000);
}
delayTime:[ms]10000
2019-03-08 18:30:40 .
Fri Mar 08 18:30:50 CST 2019 : msg--send
##
コード取得https://gitee.com/honghh/boot-demo.git
参考文献
https://spring.io/guides/gs/messaging-rabbitmq/