RabbitMQは浅入から門に深く入り込む全総括(二)
12952 ワード
いちばん前に書く
前回文章を出してからもうずいぶん経ちましたが、実はこの时間はずっとペンを止めていません.ただ、仕事を探したり、学校の授業を終えたりするのに忙しいだけです.ブログをやり直して、後で文章を最近更新します.この文章は少し長いので、2編の に分かれています. PS:あのGithubでJavaの知識問答の文章も止めず、最近は も続々と更新されています
6.ステップアップ補充
6.1有効期限設定(TTL)
有効期限(TTL)は、メッセージまたはキューに時効を設定し、消費者に受信されて取得されるのは時間範囲内であり、有効期限を超えると自動的にメッセージが削除される.
注意:主にメッセージの期限切れについて説明します.メッセージの期限切れの最初の方法では、キューの期限切れの設定方法についても言及します.キュー属性の設定により、キュー内のすべてのメッセージに同じ期限切れ がある.はメッセージを個別に設定し、各メッセージTTLは と異なることができる.
2つの方法が同時に使用される場合、両者の有効期限TTLが小さい値を基準とする.メッセージのキュー生存時間が設定されたTTL値を超えると、Dead Messageと呼ばれるデッドメッセージキューに送信され、消費者はメッセージを受信できなくなります(デッドメッセージキューは次の点です).
6.1.1すべてのメッセージに適用される有効期限構成クラス パラメータMapコンテナの作成:タイプはQueueパラメータで要求され、要求に従って作成されます. メッセージの有効期限を設定します.ここで設定したメッセージの有効期限は、すべてのメッセージに適用されます. キュー有効期限 を設定する.入力追加パラメータ:上記構成の有効期限設定をQueue入力で入力します. 生産者
消費者を構成しないで、Webマネージャで効果を見ることができます.
6.1.2個別メッセージに適用される有効期限の構成で最初のままで良いので、構成の有効期限が切れる は不要です.生産者における構成メッセージの個別の有効期限 .
6.2デッドメッセージキュー
デッドレター公式の原文はDead letterで、RabbitMQのメッセージメカニズムです.メッセージを消費している間に、キューとキューのメッセージに以下の状況が発生した場合、現在のメッセージが「デッドレター」になっていることを示しています.デッドレターキューを構成すると、これらのデータが転送され、構成がなければ直接破棄されます.メッセージが拒否された メッセージ失効 キュー最大長 しかし、死信キューは特別な存在ではありません.私たちはスイッチを構成するだけで、消費されたキューの中で構成する必要があります.死信が現れたら、さっき構成したスイッチに再送信し、スイッチにバインドされたキューにルーティングされます.このキュー、つまり死信キューなので、作成から見ると、普通のキューと変わらないです.
6.2.1シーンを適用
例えば、いくつかの重要なビジネス・キューでは、正しく消費されていないメッセージは、破棄後にこれらのデータを復元するには、メンテナンス担当者がログから元のメッセージを取得し、メッセージを再送信する必要があるため、デッドライン・キューを構成し、正しく消費されていないメッセージに一時的な位置を与え、後でリカバリが必要な場合に相当します.対応するコードを書くだけでいいです.
6.2.2実現方式は、デッドラインを処理するスイッチおよびキュー を定義する.通常の消費キューにおいてデッドメッセージキュー を指定する.
6.3メモリとディスクの監視
6.3.1メモリアラームおよび制御
サーバがメモリ不足でクラッシュしないようにするため、RabbitMQは閾値を設定し、メモリ使用量が閾値を超えると、RabbitMQはすべてのクライアントの接続を一時的にブロックし、新しいメッセージの受信を停止します.
このしきい値を変更するには2つの方法があります命令を通す(二択一でよい) コマンドは、Brokerが再起動すると に無効になります.
プロファイルrabbitmqを変更する.conf プロファイルは起動するたびにロードされ、永続的に有効な に属します.
6.3.2メモリスワップ
クライアント接続とプロバイダがブロックされる前に、キュー内のメッセージをディスクに変更しようとします.この考え方は、オペレーティングシステムでは、メッセージの正常な処理を最大限に満たすためによく見られます.
メモリのページングが発生すると、永続化または非永続化のメッセージはディスクに転送されますが、永続化されたメッセージはもともとディスクに永続化されたコピーがあるため、永続化されたメッセージは優先的に削除されます.
デフォルトでは、メモリがしきい値の50%に達すると、ページ変更処理が行われます.
vm_を設定することでmemory_high_watermark_paging_ratio修正
6.3.3ディスク・アラート
限りなくページを変更すると、ディスク容量が消費されてサーバがクラッシュする可能性が高いため、RabbitMQはディスクアラートのしきい値を提供し、この値を下回るとアラームが発生します.デフォルトは50 MBです.コマンドで変更できます.
6.4メッセージの確実な伝達
生産者がRabbitMQにメッセージを送信する場合、ネットワークなど様々な原因で送信に失敗する可能性があるため、RabbitMQはメッセージの確実な伝達を保証する一連のメカニズムを提供し、生産者と消費者の2つの部分の処理に大別することができる.
6.4.1生産者における仕組み
生産者はメッセージの送信者として、自分のメッセージの送信に成功することを保証する必要があります.RabbitMQはこの点を保証する2つの方法を提供しています.- confirm確認モード return返却モード 6.4.1.1 confirm確認モード
生産者がメッセージを送信すると、ack応答の受信を非同期で待機し、返されたack確認メッセージを受信した後、ackがtrueかfalseかに基づいてconfirmCallbackインタフェースを呼び出して処理する構成クラス ConfirmCallbackインタフェースを実現するconfirmメソッド キューとスイッチ を宣言生産者
6.4.1.2 return返却モード
ExchangeがQueueに送信されて失敗すると、returnsCallbackが呼び出され、このインタフェースを実装して失敗を処理することができます.プロファイルでコールバック の送信を開始する. ReturnsCallbackを実現するreturnedMessage法 キューとスイッチ(Directモード) を宣言生産者 異なるルーティングkeyを変更すると、結果がテストされます.
6.4.2消費者におけるメカニズム
6.4.2.1 ack確認メカニズム
ackは受信メッセージの確認を表し、デフォルトは自動確認ですが、3種類あります.
acknowledge-modeオプションの紹介 auto:自動確認、デフォルトオプション manual:手動確認(能力配分による手動確認が必要) none:確認せず、送信後自動破棄 自動確認とは、メッセージが消費者に受信されると、自動的に受信を確認し、そのメッセージをキューから削除することである.
しかし、実際の業務処理では、正しく受信メッセージは、業務上の問題により正しく処理されない可能性があるが、手動確認方式が設定されている場合は、業務処理に成功した後にchannelを呼び出す必要がある.basicAck()は、手動で署名し、異常が発生した場合はchannelを呼び出す.basicNack()メソッドは、メッセージを自動的に再送信させる.プロファイル 消費者
6.5クラスタ&6.6分散トランザクション(更新対象)
この二つの点も短くないので、簡単に書きたくないので、後ろに置いて単独の文章を書いて、発表します.
クラスタの構築については、https://blog.csdn.net/belongh...を参照してください.
前回文章を出してからもうずいぶん経ちましたが、実はこの时間はずっとペンを止めていません.ただ、仕事を探したり、学校の授業を終えたりするのに忙しいだけです.ブログをやり直して、後で文章を最近更新します.
6.ステップアップ補充
6.1有効期限設定(TTL)
有効期限(TTL)は、メッセージまたはキューに時効を設定し、消費者に受信されて取得されるのは時間範囲内であり、有効期限を超えると自動的にメッセージが削除される.
注意:主にメッセージの期限切れについて説明します.メッセージの期限切れの最初の方法では、キューの期限切れの設定方法についても言及します.
2つの方法が同時に使用される場合、両者の有効期限TTLが小さい値を基準とする.メッセージのキュー生存時間が設定されたTTL値を超えると、Dead Messageと呼ばれるデッドメッセージキューに送信され、消費者はメッセージを受信できなくなります(デッドメッセージキューは次の点です).
6.1.1すべてのメッセージに適用される有効期限
@Configuration
public class RabbitMqConfiguration {
public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
// Map
Map args = new HashMap<>();
// 5000
args.put("x-message-ttl", 5000);
//
args.put("x-expires", 8000);
//
return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
}
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
}
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* RabbitTemplate
*/
@Autowired
@Test
public void testTopicSendMessage() {
rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
}
}
消費者を構成しないで、Webマネージャで効果を見ることができます.
6.1.2個別メッセージに適用される有効期限
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* RabbitTemplate
*/
@Autowired
@Test
public void testTopicSendMessage2() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
public Message postProcessMessage(Message message){
// “5000”
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
"This is a message 002 !",messagePostProcessor);
}
}
6.2デッドメッセージキュー
デッドレター公式の原文はDead letterで、RabbitMQのメッセージメカニズムです.メッセージを消費している間に、キューとキューのメッセージに以下の状況が発生した場合、現在のメッセージが「デッドレター」になっていることを示しています.デッドレターキューを構成すると、これらのデータが転送され、構成がなければ直接破棄されます.
6.2.1シーンを適用
例えば、いくつかの重要なビジネス・キューでは、正しく消費されていないメッセージは、破棄後にこれらのデータを復元するには、メンテナンス担当者がログから元のメッセージを取得し、メッセージを再送信する必要があるため、デッドライン・キューを構成し、正しく消費されていないメッセージに一時的な位置を与え、後でリカバリが必要な場合に相当します.対応するコードを書くだけでいいです.
6.2.2実現方式
@Configuration
public class DeadRabbitMqConfiguration{
@Bean
public DirectExchange deadDirect(){
return new DirectExchange("dead_direct_exchange");}
@Bean
public Queue deadQueue(){
return new Queue("dead_direct_queue");}
@Bean
public Binding deadBinds(){
return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
}
}
@Configuration
public class RabbitMqConfiguration {
public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
//
Map args = new HashMap<>();
args.put("x-message-ttl", 5000);
//
args.put("x-dead-letter-exchange","dead_direct_exchange");
// key fanout
args.put("x-dead-letter-routing-key","dead");
return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
}
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
}
6.3メモリとディスクの監視
6.3.1メモリアラームおよび制御
サーバがメモリ不足でクラッシュしないようにするため、RabbitMQは閾値を設定し、メモリ使用量が閾値を超えると、RabbitMQはすべてのクライアントの接続を一時的にブロックし、新しいメッセージの受信を停止します.
このしきい値を変更するには2つの方法があります
# 0.6
rabbitmqctl set_vm_memory_high_watermark
# 700MB
rabbitmqctl set_vm_memory_high_watermark absolute
# 0.4 0.4-0.7
vm_memory_high_watermark.relative = 0.5
#
vm_memory_high_watermark.absolute = 2GB
6.3.2メモリスワップ
クライアント接続とプロバイダがブロックされる前に、キュー内のメッセージをディスクに変更しようとします.この考え方は、オペレーティングシステムでは、メッセージの正常な処理を最大限に満たすためによく見られます.
メモリのページングが発生すると、永続化または非永続化のメッセージはディスクに転送されますが、永続化されたメッセージはもともとディスクに永続化されたコピーがあるため、永続化されたメッセージは優先的に削除されます.
デフォルトでは、メモリがしきい値の50%に達すると、ページ変更処理が行われます.
vm_を設定することでmemory_high_watermark_paging_ratio修正
# 1, 1
vm_memory_high_watermark_paging_ratio = 0.6
6.3.3ディスク・アラート
限りなくページを変更すると、ディスク容量が消費されてサーバがクラッシュする可能性が高いため、RabbitMQはディスクアラートのしきい値を提供し、この値を下回るとアラームが発生します.デフォルトは50 MBです.コマンドで変更できます.
#
rabbitmqctl set_disk_free_limit
#
rabbitmqctl set_disk_free_limit memory_limit
6.4メッセージの確実な伝達
生産者がRabbitMQにメッセージを送信する場合、ネットワークなど様々な原因で送信に失敗する可能性があるため、RabbitMQはメッセージの確実な伝達を保証する一連のメカニズムを提供し、生産者と消費者の2つの部分の処理に大別することができる.
6.4.1生産者における仕組み
生産者はメッセージの送信者として、自分のメッセージの送信に成功することを保証する必要があります.RabbitMQはこの点を保証する2つの方法を提供しています.-
生産者がメッセージを送信すると、ack応答の受信を非同期で待機し、返されたack確認メッセージを受信した後、ackがtrueかfalseかに基づいてconfirmCallbackインタフェースを呼び出して処理する
spring:
rabbitmq:
#
publisher-confirm-type: correlated
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData
* @param ack exchange 。true ,false
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
//
System.out.println(" ");
} else {
//
System.out.println(" , : " + cause);
// TODO ,
}
}
}
@Configuration
public class RabbitMqConfig {
@Bean()
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean()
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirm_test_exchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue() {
return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
}
}
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* RabbitTemplate
*/
@Autowired
/**
* ConfirmCallbackService
*/
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Test
public void testConfirm() {
//
rabbitTemplate.setConfirmCallback(confirmCallbackService);
//
rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !");
}
}
6.4.1.2 return返却モード
ExchangeがQueueに送信されて失敗すると、returnsCallbackが呼び出され、このインタフェースを実装して失敗を処理することができます.
spring:
rabbitmq:
#
publisher-returns: true
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned);
}
}
@Configuration
public class RabbitMqConfig {
@Bean()
public Queue returnsTestQueue() {
return new Queue("return_test_queue", true, false, false);
}
@Bean()
public DirectExchange returnsTestExchange() {
return new DirectExchange("returns_test_exchange");
}
@Bean
public Binding returnsTestDirectExchangeAndQueue() {
return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
}
}
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* RabbitTemplate
*/
@Autowired
/**
* ConfirmCallbackService
*/
@Autowired
private ConfirmCallbackService confirmCallbackService;
/**
* ReturnCallbackService
*/
@Autowired
private ReturnCallbackService returnCallbackService;
@Test
public void testReturn() {
//
rabbitTemplate.setMandatory(true);
//
rabbitTemplate.setReturnsCallback(returnCallbackService);
//
rabbitTemplate.setConfirmCallback(confirmCallbackService);
//
rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
}
}
6.4.2消費者におけるメカニズム
6.4.2.1 ack確認メカニズム
ackは受信メッセージの確認を表し、デフォルトは自動確認ですが、3種類あります.
acknowledge-modeオプションの紹介
しかし、実際の業務処理では、正しく受信メッセージは、業務上の問題により正しく処理されない可能性があるが、手動確認方式が設定されている場合は、業務処理に成功した後にchannelを呼び出す必要がある.basicAck()は、手動で署名し、異常が発生した場合はchannelを呼び出す.basicNack()メソッドは、メッセージを自動的に再送信させる.
spring:
rabbitmq:
listener:
simple:
#
acknowledge-mode: manual
@Component
@RabbitListener(queues = "confirm_test_queue")
public class TestConsumer {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(" : " + new String(message.getBody()));
System.out.println(" :");
int i = 66 / 0;
// deliveryTag
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//
channel.basicNack(deliveryTag, true, true);
}
}
}
6.5クラスタ&6.6分散トランザクション(更新対象)
この二つの点も短くないので、簡単に書きたくないので、後ろに置いて単独の文章を書いて、発表します.
クラスタの構築については、https://blog.csdn.net/belongh...を参照してください.