RabbitMQ実戦経験共有
38066 ワード
前言
最近1つの大学入試の項目に忙しくて、システムが順調に今回の大学入試を完成したのを見て、やっとほっとすることができます.大学入試を受ける学生を見て、その年の高校3年生の自分を思い出した.
RabbitMQの実戦経験を共有し、皆さんの役に立つことを望んでいます.
一、生産情報
RabbitMQの基本使用についてはここでは紹介しませんが、プロジェクトではExchangeでのtopicモードを使用しています.
メッセージを先に送信するコード
メッセージを送るのは特にありません.ニュースの持続化についての紹介はここでも紹介しません.分からないのは前の文章を読むことができます.メッセージを送信する上で注意しなければならないのは、複数のメッセージを一緒に送信することを選択して、最後にメッセージの送信に成功したことを確定することができて、このように効率が高いことです.また、各メッセージの長さをできるだけ短くする必要があります(スレ主はここで損をしたことがあります).そうしないと、メッセージが長すぎるため、送信時間が長くなります.実際のプロジェクトでは一度に4万件以上のデータを送ったが、問題はなかった.
二、受信メッセージ
次に、メッセージを消費するプロセスについてお話しします.私は単一の複数のchannelを接続し、各channelは毎回1つのメッセージメソッドしか取りません.単一のTCP接続で、複数のchannelが通信効率に影響するかどうかを聞く人がいます.この理論にはきっと影響があるに違いない.影響は大きくないと思う.私が運転しているchannelの数は普通30ぐらいまで行って、効率に影響を与えていないのは、私がchannelごとにメッセージを持っているからかもしれません.複数のchannelを単一に接続する方法で、多くの接続を少なくすることができます.私がなぜチャンネルごとに1つのメッセージしか取らないのかについては、外部の要因が制限されており、具体的には自分のニーズを見ています.
次にメッセージを受信するプロセスでは、まずRabbitMQHelperクラスを定義します.グローバルなconn接続変数が含まれています.また、接続の作成、接続のクローズ、接続が開いているかどうかを確認する方法もあります.プログラムはタイマーを実行し、
接続が開いていないことが検出された場合、接続処理メッセージをアクティブに作成します.
まずrabbitインスタンスを初期化し、RabbitConnectionメソッドでRabbitMQ接続を作成します.
接続が開くと、スレッドプールでメッセージを受信する方法が実行されます.ここで開いているスレッドは、開いているchannelの数と一致しなければならないことに注意してください.そうしないと問題が発生します(具体的な問題は、RabbitMQ接続タイムアウト時間を10秒に設定し、使用しない場合があります.原因は不明です.RabbitMQ接続のデフォルトタイムアウト時間は30秒で、この時間内に作成を呼び出すと、2倍のchannelが得られる可能性があります.)
次にメッセージを受信する方法であり,メッセージを処理する過程は省略される.
三、エラーメッセージの処理
処理に失敗したメッセージを「エラーキュー」に配置し、その後、元のキューのメッセージを削除します(ここで主な問題は、処理に失敗したり処理できないメッセージが複数ある場合、これらのメッセージを元のキューに戻すと、他のスレッドのchannelに配布され続けますが、結果的に処理できず、デッドサイクルをもたらし、後のメッセージが処理できなくなります).最初に処理できなかったメッセージを「エラーキュー」に配置した後、新しい接続を再開して「エラーキュー」のメッセージを処理します.
まとめ:RabbitMQ自体は安定しており、性能もよく、すべての不安定な要素がメッセージを処理する過程にあるので、安心して使用できます.
Demoソースアドレス:https://github.com/Bingjian-Zhu/RabbitMQHelper
最近1つの大学入試の項目に忙しくて、システムが順調に今回の大学入試を完成したのを見て、やっとほっとすることができます.大学入試を受ける学生を見て、その年の高校3年生の自分を思い出した.
RabbitMQの実戦経験を共有し、皆さんの役に立つことを望んでいます.
一、生産情報
RabbitMQの基本使用についてはここでは紹介しませんが、プロジェクトではExchangeでのtopicモードを使用しています.
メッセージを先に送信するコード
private bool MarkErrorSend(string[] lstMsg)
{
try
{
var factory = new ConnectionFactory()
{
UserName = "guest",//
Password = "guest",//
HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
};
//
var connection = factory.CreateConnection();
//
var channel = connection.CreateModel();
try
{
// Direct
channel.ExchangeDeclare(
exchange: "TestTopicChange", //exchange
type: ExchangeType.Topic, //Topic ,
durable: true,//exchange
autoDelete: false,// , false
arguments: null// , :alternate-exchange
);
//
channel.QueueDeclare(
queue: "Test_Queue", //
durable: true, // ( )
exclusive: false,// ,false。 , ,
autoDelete: false,// , false
arguments: null
);
//
string routeKey = "TestRouteKey.*";//*
channel.QueueBind(
queue: "Test_Queue",
exchange: "TestTopicChange",
routingKey: routeKey,
arguments: null
);
// , DeliveryMode 2( )
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.ConfirmSelect();//
foreach (var itemMsg in lstMsg)
{
byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
//
channel.BasicPublish(
exchange: "TestTopicChange",
routingKey: "TestRouteKey.one",
basicProperties: properties,
body: sendBytes
);
}
bool isAllPublished = channel.WaitForConfirms();// (channel) true
return isAllPublished;
}
catch (Exception ex)
{
//
return false;
}
finally
{
channel.Close();
connection.Close();
}
}
catch
{
//RabbitMQ.Client.Exceptions.BrokerUnreachableException:
//When the configured hostname was not reachable.
return false;
}
}
メッセージを送るのは特にありません.ニュースの持続化についての紹介はここでも紹介しません.分からないのは前の文章を読むことができます.メッセージを送信する上で注意しなければならないのは、複数のメッセージを一緒に送信することを選択して、最後にメッセージの送信に成功したことを確定することができて、このように効率が高いことです.また、各メッセージの長さをできるだけ短くする必要があります(スレ主はここで損をしたことがあります).そうしないと、メッセージが長すぎるため、送信時間が長くなります.実際のプロジェクトでは一度に4万件以上のデータを送ったが、問題はなかった.
二、受信メッセージ
次に、メッセージを消費するプロセスについてお話しします.私は単一の複数のchannelを接続し、各channelは毎回1つのメッセージメソッドしか取りません.単一のTCP接続で、複数のchannelが通信効率に影響するかどうかを聞く人がいます.この理論にはきっと影響があるに違いない.影響は大きくないと思う.私が運転しているchannelの数は普通30ぐらいまで行って、効率に影響を与えていないのは、私がchannelごとにメッセージを持っているからかもしれません.複数のchannelを単一に接続する方法で、多くの接続を少なくすることができます.私がなぜチャンネルごとに1つのメッセージしか取らないのかについては、外部の要因が制限されており、具体的には自分のニーズを見ています.
次にメッセージを受信するプロセスでは、まずRabbitMQHelperクラスを定義します.グローバルなconn接続変数が含まれています.また、接続の作成、接続のクローズ、接続が開いているかどうかを確認する方法もあります.プログラムはタイマーを実行し、
接続が開いていないことが検出された場合、接続処理メッセージをアクティブに作成します.
public class RabbitMQHelper { public IConnection conn = null; ///
/// RabbitMQ /// ///
public IConnection RabbitConnection(string sHostName, ushort nChannelMax) { try { if (conn == null) { var factory = new ConnectionFactory() { UserName = "guest",// Password = "guest",// HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"], AutomaticRecoveryEnabled = false,// , RequestedConnectionTimeout = 10000,// 10 , 30 RequestedChannelMax = nChannelMax// }; // conn = factory.CreateConnection(); Console.WriteLine("RabbitMQ !"); } return conn; } catch { Console.WriteLine(" , RabbitMQ !"); return null; } } /// /// RabbitMQ /// public void Close() { try { if (conn != null) { if (conn.IsOpen) conn.Close(); conn = null; Console.WriteLine("RabbitMQ !"); } } catch { } } ///
/// RabbitMQ /// ///
public bool IsOpen() { try { if (conn != null) { if (conn.IsOpen) return true; } return false; } catch { return false; } } }
接下来我们看具体如何接收消息。
private static AutoResetEvent myEvent = new AutoResetEvent(false);
private RabbitMQHelper rabbit = new RabbitMQHelper();
private ushort nChannel = 10;//
まずrabbitインスタンスを初期化し、RabbitConnectionメソッドでRabbitMQ接続を作成します.
接続が開くと、スレッドプールでメッセージを受信する方法が実行されます.ここで開いているスレッドは、開いているchannelの数と一致しなければならないことに注意してください.そうしないと問題が発生します(具体的な問題は、RabbitMQ接続タイムアウト時間を10秒に設定し、使用しない場合があります.原因は不明です.RabbitMQ接続のデフォルトタイムアウト時間は30秒で、この時間内に作成を呼び出すと、2倍のchannelが得られる可能性があります.)
///
/// RabbitMQ , channel
///
private void CreateConnecttion()
{
try
{
rabbit.RabbitConnection("localhost", nChannel);
if (rabbit.conn != null)
{
ThreadPool.SetMinThreads(1, 1);
ThreadPool.SetMaxThreads(100, 100);
for (int i = 1; i <= nChannel; i++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), "");
}
myEvent.WaitOne();// ,
rabbit.Close();
}
}
catch (Exception ex)
{
rabbit.Close();
Console.WriteLine(ex.Message);
}
}
次にメッセージを受信する方法であり,メッセージを処理する過程は省略される.
///
/// , (channel),
///
/// RabbitMQ
private void ReceiveMsg(object obj)
{
IModel channel = null;
try
{
#region ,
channel = rabbit.conn.CreateModel();
channel.ExchangeDeclare(
exchange: "TestTopicChange", //exchange
type: ExchangeType.Topic, //Topic ,
durable: true,//exchange
autoDelete: false,// , false
arguments: null// , :alternate-exchange
);
//
channel.QueueDeclare(
queue: "Test_Queue", //
durable: true, // ( )
exclusive: false,// ,false。 , ,
autoDelete: false,
arguments: null
);
#endregion
channel.BasicQos(0, 1, false);//
channel.QueueBind(queue: "Test_Queue",
exchange: "TestTopicChange",
routingKey: "TestRouteKey.*");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
//
try
{
bool isMark = AutoMark(message);
if (isMark)
{
//Function.writeMarkLog(message);
// , RabbitMQ
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
if (MarkErrorSend(message))//
channel.BasicReject(ea.DeliveryTag, false);
else
// , , ,
channel.BasicReject(ea.DeliveryTag, true);
}
}
catch (Exception ex)
{
try
{
Console.WriteLine(ex.Message);
if (channel != null && channel.IsOpen)// RabbitMQ
{
// , , ,
channel.BasicReject(ea.DeliveryTag, true);
}
}
catch { }
}
};
//
channel.BasicConsume(queue: "Test_Queue",
autoAck: false,
consumer: consumer);
}
catch (Exception ex)
{
try
{
Console.WriteLine(" :" + ex.Message);
if (channel != null && channel.IsOpen)//
channel.Close();
if (rabbit.conn != null)// RabbitMQ
rabbit.Close();
}
catch { }
}
}
三、エラーメッセージの処理
処理に失敗したメッセージを「エラーキュー」に配置し、その後、元のキューのメッセージを削除します(ここで主な問題は、処理に失敗したり処理できないメッセージが複数ある場合、これらのメッセージを元のキューに戻すと、他のスレッドのchannelに配布され続けますが、結果的に処理できず、デッドサイクルをもたらし、後のメッセージが処理できなくなります).最初に処理できなかったメッセージを「エラーキュー」に配置した後、新しい接続を再開して「エラーキュー」のメッセージを処理します.
///
/// “ ”
///
///
///
private bool MarkErrorSend(string msg)
{
RabbitMQHelper MQ = new RabbitMQHelper();
MQ.RabbitConnection("localhost",1);
//
var channel = MQ.conn.CreateModel();
try
{
// Direct
channel.ExchangeDeclare(
exchange: "ErrorTopicChange", //exchange
type: ExchangeType.Topic, //Topic ,
durable: true,//exchange
autoDelete: false,// , false
arguments: null// , :alternate-exchange
);
//
channel.QueueDeclare(
queue: "Error_Queue", //
durable: true, // ( )
exclusive: false,// ,false。 , ,
autoDelete: false,// , false
arguments: null
);
//
string routeKey = "ErrorRouteKey.*";//*
channel.QueueBind(
queue: "Error_Queue",
exchange: "ErrorTopicChange",
routingKey: routeKey,
arguments: null
);
// , DeliveryMode 2( )
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.ConfirmSelect();//
byte[] sendBytes = Encoding.UTF8.GetBytes(msg);
//
channel.BasicPublish(
exchange: "ErrorTopicChange",
routingKey: "ErrorRouteKey.one",
basicProperties: properties,
body: sendBytes
);
bool isAllPublished = channel.WaitForConfirms();// (channel) true
return isAllPublished;
}
catch (Exception ex)
{
//
return false;
}
finally
{
channel.Close();
MQ.conn.Close();
}
}
まとめ:RabbitMQ自体は安定しており、性能もよく、すべての不安定な要素がメッセージを処理する過程にあるので、安心して使用できます.
Demoソースアドレス:https://github.com/Bingjian-Zhu/RabbitMQHelper