RabbiitMQ-メッセージ確認機構(事務+confirm)
記事の目次 RabbiitMQ-メッセージ確認メカニズム(事務+confirm) 1.RabitMQ-メッセージ確認メカニズム(事務+confirm) .事務機構 生産者 消費者 .Confirmモード 3.1生産者端Confirmモードの実現原理 普通のシングルは を送信します。生産者 消費者 複数のバッチ 生産者 消費者 生産者 消費者 RabbiitMQ-メッセージ確認機構(事務+confirm)
1.RabbiitMQ-メッセージ確認メカニズム(事務+confirm)
ラジビットMQでは、サーバの異常によるデータ損失問題を解決します。生産者がメッセージを送信した後、メッセージはラジビットMQサーバですか? デフォルトでは知ることのできない には二つの方法があります。 AMQPプロトコル:AMQPは、トランザクション機構 を実現する。 Confirmモード 2.事務の仕組み
TxSelect TxComit TxRollBack TxSelect:現在のチャンネルをtranseモードに設定するための TxComit:トランザクションを提出するための TxRollBack:ロールバックトランザクション用 生産者
3.1生産者端Confirmモードの実現原理
生産者はチャネルをConfirmモードに設定し、一旦チャネルがconfirmモードに入ると、このチャネル上で発行されたメッセージには一意のIDが割り当てられます。(1から開始します。)メッセージが全てのマッチするキューに送られると、brook erは生産者に確認を送ります。これにより、生産者は、メッセージが正しく目的のキューに到達していることを知っています。メッセージとキューが持続可能であれば、確認メッセージはディスクに書き込まれて発行され、生産者にbrookフィードバックされる確認メッセージには、deliver-deliver-が確認メッセージのシリアル番号が含まれています。このシーケンス番号にマークされる前のメッセージはすべて処理されました。
Confirmモードの最大の利点は、非同期処理であり、Nack(Negative ACKnowledment、否定応答)がconfrimモードをオンすることである。普通はwaitForConfirmsを呼び出すために送信します。() 一括でwaitForConfirmsを呼び出すバッチを送ります。 非同期confirmモード:フィードバックを提供する 普通郵便で送る
生産者
を送ってから確認します。
生産者
Channelオブジェクトから提供されるConfriimListener()コールバック方法は、deliveryTag(現在Chanelから送信されているメッセージ番号)のみを含み、サブセットは各Chanelのためにunconfirmのメッセージシーケンス番号を維持し、各publishのデータに、セットの要素を1つ追加して、一回ごとにhandleAck方法を変更します。(multiple=true)記録。プログラムの動作効率から見ると、このunconfirmセットは、順序の良いセットSortedert記憶構造を採用したほうがいいです。
生産者
1.RabbiitMQ-メッセージ確認メカニズム(事務+confirm)
ラジビットMQでは、サーバの異常によるデータ損失問題を解決します。生産者がメッセージを送信した後、メッセージはラジビットMQサーバですか? デフォルトでは知ることのできない には二つの方法があります。
TxSelect TxComit TxRollBack
package com.ithzk.rabbitmq.transation;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import javax.sound.midi.Soundbank;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/10
*/
public class TxSend {
private final static String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "hello tx";
try {
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
int ex = 1/0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
System.out.println("Send msg rollback");
}
System.out.println("Send msg"+msg);
channel.close();
connection.close();
}
}
消費者package com.ithzk.rabbitmq.transation;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/10
*/
public class TxRecv1 {
private final static String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
//
Connection connection = RabbitMQConnectionUtils.getConnection();
//
final Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//
channel.basicQos(1);
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv tx msg:" + msg);
}
});
System.out.println("[Consumer 1 start]");
}
}
3.Confirmモード3.1生産者端Confirmモードの実現原理
生産者はチャネルをConfirmモードに設定し、一旦チャネルがconfirmモードに入ると、このチャネル上で発行されたメッセージには一意のIDが割り当てられます。(1から開始します。)メッセージが全てのマッチするキューに送られると、brook erは生産者に確認を送ります。これにより、生産者は、メッセージが正しく目的のキューに到達していることを知っています。メッセージとキューが持続可能であれば、確認メッセージはディスクに書き込まれて発行され、生産者にbrookフィードバックされる確認メッセージには、deliver-deliver-が確認メッセージのシリアル番号が含まれています。このシーケンス番号にマークされる前のメッセージはすべて処理されました。
Confirmモードの最大の利点は、非同期処理であり、Nack(Negative ACKnowledment、否定応答)がconfrimモードをオンすることである。
channel.confirmSelect()
プログラミングモード:生産者
package com.ithzk.rabbitmq.confirm;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author hzk
* @date 2018/3/10
*/
public class ConfrimSend {
private final static String QUEUE_NAME = "test_queue_confrim_one";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabbitMQConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// confirmSelect() channel confirm
channel.confirmSelect();
String msg = "hello confirm one";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (!channel.waitForConfirms()){
System.out.println("message send fail!");
}else{
System.out.println("Send msg"+msg);
}
channel.close();
connection.close();
}
}
消費者package com.ithzk.rabbitmq.confirm;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/10
*/
public class ConfirmRecv1 {
private final static String QUEUE_NAME = "test_queue_confrim_one";
public static void main(String[] args) throws IOException, TimeoutException {
//
Connection connection = RabbitMQConnectionUtils.getConnection();
//
final Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv tx msg:" + msg);
}
});
System.out.println("[Consumer 1 start]");
}
}
複数のロットを送ってから確認します。
生産者
package com.ithzk.rabbitmq.confirm;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author hzk
* @date 2018/3/10
*/
public class ConfrimSend2 {
private final static String QUEUE_NAME = "test_queue_confrim_one";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabbitMQConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// confirmSelect() channel confirm
channel.confirmSelect();
String msg = "hello confirm batch";
//
for (int i = 0 ;i < 50 ; i++){
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//
if (!channel.waitForConfirms()){
System.out.println("message send fail!");
}else{
System.out.println("Send msg"+msg);
}
channel.close();
connection.close();
}
}
消費者package com.ithzk.rabbitmq.confirm;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/10
*/
public class ConfirmRecv1 {
private final static String QUEUE_NAME = "test_queue_confrim_one";
public static void main(String[] args) throws IOException, TimeoutException {
//
Connection connection = RabbitMQConnectionUtils.getConnection();
//
final Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv tx msg:" + msg);
}
});
System.out.println("[Consumer 1 start]");
}
}
非同期コンバージョンモードChannelオブジェクトから提供されるConfriimListener()コールバック方法は、deliveryTag(現在Chanelから送信されているメッセージ番号)のみを含み、サブセットは各Chanelのためにunconfirmのメッセージシーケンス番号を維持し、各publishのデータに、セットの要素を1つ追加して、一回ごとにhandleAck方法を変更します。(multiple=true)記録。プログラムの動作効率から見ると、このunconfirmセットは、順序の良いセットSortedert記憶構造を採用したほうがいいです。
生産者
package com.ithzk.rabbitmq.confirm;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
*
* @author hzk
* @date 2018/3/10
*/
public class ConfrimSend3 {
private final static String QUEUE_NAME = "test_queue_confrim_one";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabbitMQConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// confirmSelect() channel confirm
channel.confirmSelect();
//
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//
channel.addConfirmListener(new ConfirmListener() {
// handleAck
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(multiple){
System.out.println("handleAck multiple");
confirmSet.headSet(deliveryTag+1).clear();
}else{
System.out.println("handleAck multiple false");
confirmSet.remove(deliveryTag);
}
}
//handleNack 3s 10s xxx..
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple){
System.out.println("handleNack multiple");
confirmSet.headSet(deliveryTag+1).clear();
}else{
System.out.println("handleNack multiple false");
confirmSet.remove(deliveryTag);
}
}
});
String msg = "hello confirm async";
while (true){
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
confirmSet.add(seqNo);
}
}
}
消費者package com.ithzk.rabbitmq.confirm;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/10
*/
public class ConfirmRecv1 {
private final static String QUEUE_NAME = "test_queue_confrim_one";
public static void main(String[] args) throws IOException, TimeoutException {
//
Connection connection = RabbitMQConnectionUtils.getConnection();
//
final Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv tx msg:" + msg);
}
});
System.out.println("[Consumer 1 start]");
}
}
ps:txモードの効率が悪いので、使用後の2つを勧めます。