rabbitmqを用いてメッセージを手動で確認する場合、タイミング取得キューメッセージ実装
39675 ワード
問題の説明
最近のプロジェクトでは、データが増加し、サードパーティシステムと関連したインタラクションが必要なため、サードパーティシステムにプッシュする必要があるデータがあります.
関連業務
オンラインの稼働効率に影響を与えないという考え方に基づいて、私たちは増加したメッセージをrabbitmqに入れ、別のアプリケーションを使用して消費を取得します.データはプッシュするだけで、業務のデータは15分ほどの更新戦略があり、リアルタイム性はあまり高くありません.だから、rabbitをアクティブにリンクして消費し、データをネットワークで転送する必要があります.
そうかんぶんせき
ネットワーク上ではほぼ関連解決策が現れているが、関連データの紛失や処理、性能と効率などの関連基礎業務のワークロードを実現するため、後ずさりしている...
まだspringにはorgがあるspringframework.amqpキット、簡略化された大量の面倒>>始めましょう
rabbitに関するいくつかの概念を理解する Spring RabbitMQ Channel理解 ミドルウェアシリーズ2 RabbitMQのメッセージ持続性、確認メカニズム、拒否、プリフェッチ数、割当ポリシー topic
このいくつかの概念を理解した時、あなたはすでに私たちの今日のテーマに注目しているかもしれません.
S i m p l e M e s a g e ListenerContainerコンテナを使用して消費キューリスニングを設定し、次に特定のリスニングListenerを設定してメッセージ消費の具体的なロジックの作成を行います.
しかし、このコンテナを使用して大量に消費する方法については、公式には説明されていません.ネット上では、このS i m p l e M s s a g e ListenerContainerの大量メッセージ処理しか見つからないかもしれませんが、答えは簡単です.
次はこの質問の答えについてcodingをします.
解決策
まず失敗した再試行が必要なのでspringの
関連構成
次は私たちのテーマです.タイミングタスクはorgを使用しています.springframework.scheduling
に続く
もちろんタイミングタスクの起動は、rabbitコンテナの実装に関連することができますが、ここでは必要ありませんので、この小さな変更については、学生は自分で実現することができます
最近のプロジェクトでは、データが増加し、サードパーティシステムと関連したインタラクションが必要なため、サードパーティシステムにプッシュする必要があるデータがあります.
関連業務
オンラインの稼働効率に影響を与えないという考え方に基づいて、私たちは増加したメッセージをrabbitmqに入れ、別のアプリケーションを使用して消費を取得します.データはプッシュするだけで、業務のデータは15分ほどの更新戦略があり、リアルタイム性はあまり高くありません.だから、rabbitをアクティブにリンクして消費し、データをネットワークで転送する必要があります.
そうかんぶんせき
ネットワーク上ではほぼ関連解決策が現れているが、関連データの紛失や処理、性能と効率などの関連基礎業務のワークロードを実現するため、後ずさりしている...
まだspringにはorgがあるspringframework.amqpキット、簡略化された大量の面倒>>始めましょう
rabbitに関するいくつかの概念を理解する
このいくつかの概念を理解した時、あなたはすでに私たちの今日のテーマに注目しているかもしれません.
S i m p l e M e s a g e ListenerContainerコンテナを使用して消費キューリスニングを設定し、次に特定のリスニングListenerを設定してメッセージ消費の具体的なロジックの作成を行います.
しかし、このコンテナを使用して大量に消費する方法については、公式には説明されていません.ネット上では、このS i m p l e M s s a g e ListenerContainerの大量メッセージ処理しか見つからないかもしれませんが、答えは簡単です.
次はこの質問の答えについてcodingをします.
解決策
まず失敗した再試行が必要なのでspringの
RepublishMessageRecoverer , , 。
を使用します関連構成
1 @Bean
2 ObjectMapper objectMapper() {
3 ObjectMapper objectMapper = new ObjectMapper();
4 DateFormat dateFormat = objectMapper.getDateFormat();
5 JavaTimeModule javaTimeModule = new JavaTimeModule();
6
7 SimpleModule module = new SimpleModule();
8 module.addSerializer(new ToStringSerializer(Long.TYPE));
9 module.addSerializer(new ToStringSerializer(Long.class));
10 module.addSerializer(new ToStringSerializer(BigInteger.class));
11
12 javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
13 javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
14 javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
15
16 objectMapper.registerModule(module);
17 objectMapper.registerModule(javaTimeModule);
18 objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//
19 objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
20 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
21 return objectMapper;
22 }
23
24
25
26 @Bean
27 RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
28 return new RabbitAdmin(aConnectionFactory);
29 }
30
31 @Bean
32 MessageConverter jacksonAmqpMessageConverter( ) {
33 return new Jackson2JsonMessageConverter(objectMapper());
34 }
35
36
37 @Bean
38 Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
39 Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
40 rabbitAdmin.declareQueue(queue);
41 return queue;
42 }
43 @Bean
44 Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
45 Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
46 rabbitAdmin.declareQueue(queue);
47 return queue;
48 }
49 @Bean
50 Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
51 Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
52 rabbitAdmin.declareQueue(queue);
53 return queue;
54 }
55 /**
56 *
57 * @return
58 */
59 @Bean
60 TopicExchange controlExchange () {
61 return new TopicExchange(Exchanges.ExangeTOPIC);
62 }
63
64
65 /**
66 *
67 */
68 @Bean
69 public Queue bcwPayControlRetryQueue() {
70 Map arguments = new HashMap<>();
71 arguments.put("x-message-ttl", 10 * 1000);
72 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
73 // some-routing-key some.exchange.name, routing key
74 arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
75 return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
76 }
77 /**
78 *
79 */
80 @Bean
81 public Queue bcwPushControlRetryQueue() {
82 Map arguments = new HashMap<>();
83 arguments.put("x-message-ttl", 10 * 1000);
84 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
85 // some-routing-key some.exchange.name, routing key
86 arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
87 return new Queue("queue_bcw@push@retry", true, false, false, arguments);
88 }
89 /**
90 *
91 */
92 @Bean
93 public Queue bcwPullControlRetryQueue() {
94 Map arguments = new HashMap<>();
95 arguments.put("x-message-ttl", 10 * 1000);
96 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
97 // some-routing-key some.exchange.name, routing key
98 // arguments.put("x-dead-letter-routing-key", "queue_bcw");
99 return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
100 }
101 @Bean
102 public Binding bcwPayControlRetryBinding() {
103 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
104 }
105 @Bean
106 public Binding bcwPushControlRetryBinding() {
107 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
108 }
109 @Bean
110 public Binding bcwPullControlRetryBinding() {
111 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
112 }
113
114 /**
115 * RoutingKey
116 *
117 * @param queueMessages
118 * @param exchange
119 * @return
120 */
121 @Bean
122 Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
123 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
124 }
125 /**
126 * RoutingKey
127 *
128 * @param queueMessages
129 * @param exchange
130 * @return
131 */
132 @Bean
133 Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
134 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
135 }
136 /**
137 * RoutingKey
138 *
139 * @param queueMessages
140 * @param exchange
141 * @return
142 */
143 @Bean
144 Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
145 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
146 }
147
148 @Bean
149 @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
150 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
151 SimpleRabbitListenerContainerFactoryConfigurer configurer,
152 ConnectionFactory connectionFactory) {
153 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
154 configurer.configure(factory, connectionFactory);
155 factory.setMessageConverter(jacksonAmqpMessageConverter());
156 return factory;
157 }
次は私たちのテーマです.タイミングタスクはorgを使用しています.springframework.scheduling
1 /**
2 * ,
3 */
4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
5 protected final Logger logger = LoggerFactory.getLogger(getClass());
6 private List body = new LinkedList<>();
7 public long start_time;
8 private Channel channel;
9 @Autowired
10 private ObjectMapper objectMapper;
11 @Autowired
12 private RabbitTemplate rabbitTemplate;
13
14 public QuartzSimpleMessageListenerContainer() {
15 //
16 this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
17
18 this.setMessageListener((ChannelAwareMessageListener) (message,channel) -> {
19 long current_time = System.currentTimeMillis();
20 int time = (int) ((current_time - start_time)/1000);
21 logger.info("==== {} =====",message.getMessageProperties().getConsumerQueue());
22 Long retryCount = getRetryCount(message.getMessageProperties());
23 if (retryCount > 3) {
24 logger.info("==== {} =====",message.getMessageProperties().getConsumerQueue());
25 try {
26 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
27 } catch (IOException ex) {
28 ex.printStackTrace();
29 }
30 return;
31 }
32
33 this.body.add(message);
34 /**
35 * ,
36 * ,
37 */
38 if(body.size()>=3 || time>60){
39 this.channel = channel;
40 callback();
41 }
42 });
43
44
45
46 }
47 private void callback(){
48 // channel = getChannel(getTransactionalResourceHolder());
49 if(body.size()>0 && channel !=null && channel.isOpen()){
50 try {
51 callbackWork();
52 }catch (Exception e){
53 logger.error(" :{}",e.getMessage());
54
55 body.stream().forEach(message -> {
56 Long retryCount = getRetryCount(message.getMessageProperties());
57 if (retryCount <= 3) {
58 logger.info(" , :" + retryCount);
59 rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
60 }
61 });
62
63 } finally{
64
65 logger.info("flsher too data");
66
67 body.stream().forEach(message -> {
68 // acknowledge
69 try {
70 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
71 } catch (IOException e) {
72 logger.error(" !");
73 e.printStackTrace();
74 }
75 });
76
77 body.clear();
78 this.stop();
79
80 }
81 }
82
83 }
84 abstract void callbackWork() throws Exception;
85 /**
86 *
87 * @param properties
88 * @return
89 */
90 private long getRetryCount(MessageProperties properties){
91 long retryCount = 0L;
92 Map header = properties.getHeaders();
93 if(header != null && header.containsKey("x-death")){
94 List
に続く
もちろんタイミングタスクの起動は、rabbitコンテナの実装に関連することができますが、ここでは必要ありませんので、この小さな変更については、学生は自分で実現することができます
@Scheduled(cron = "0 0/2 * * * ? ")
public void start()