RabbitMQ実戦経験共有

38066 ワード

前言
最近1つの大学入試の項目に忙しくて、システムが順調に今回の大学入試を完成したのを見て、やっとほっとすることができます.大学入試を受ける学生を見て、その年の高校3年生の自分を思い出した.
RabbitMQの実戦経験を共有し、皆さんの役に立つことを望んでいます.
 
 
一、生産情報
RabbitMQの基本使用についてはここでは紹介しませんが、プロジェクトではExchangeでのtopicモードを使用しています.
メッセージを先に送信するコード
private bool MarkErrorSend(string[] lstMsg)
        {
            try
            {
                var factory = new ConnectionFactory()
                {
                    UserName = "guest",//   
                    Password = "guest",//  
                    HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
                };
                //    
                var connection = factory.CreateConnection();
                //    
                var channel = connection.CreateModel();
                try
                {
                    //    Direct     
                    channel.ExchangeDeclare(
                        exchange: "TestTopicChange", //exchange  
                        type: ExchangeType.Topic, //Topic  ,      
                        durable: true,//exchange   
                        autoDelete: false,//      ,    false
                        arguments: null//       ,  :alternate-exchange
                        );

                    //      
                    channel.QueueDeclare(
                        queue: "Test_Queue", //    
                        durable: true, //       (              )
                        exclusive: false,//     ,false。             ,             ,           
                        autoDelete: false,//      ,    false
                        arguments: null
                        );

                    //         
                    string routeKey = "TestRouteKey.*";//*      
                    channel.QueueBind(
                        queue: "Test_Queue",
                        exchange: "TestTopicChange",
                        routingKey: routeKey,
                        arguments: null
                        );

                    //       , DeliveryMode  2(              )
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;

                    channel.ConfirmSelect();//      
                    foreach (var itemMsg in lstMsg)
                    {
                        byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
                        //    
                        channel.BasicPublish(
                            exchange: "TestTopicChange",
                            routingKey: "TestRouteKey.one",
                            basicProperties: properties,
                            body: sendBytes
                            );
                    }
                    bool isAllPublished = channel.WaitForConfirms();//  (channel)           true
                    return isAllPublished;
                }
                catch (Exception ex)
                {
                    //     
                    return false;
                }
                finally
                {
                    channel.Close();
                    connection.Close();
                }
            }
            catch
            {
                //RabbitMQ.Client.Exceptions.BrokerUnreachableException:
                //When the configured hostname was not reachable.
                return false;
            }
        }

メッセージを送るのは特にありません.ニュースの持続化についての紹介はここでも紹介しません.分からないのは前の文章を読むことができます.メッセージを送信する上で注意しなければならないのは、複数のメッセージを一緒に送信することを選択して、最後にメッセージの送信に成功したことを確定することができて、このように効率が高いことです.また、各メッセージの長さをできるだけ短くする必要があります(スレ主はここで損をしたことがあります).そうしないと、メッセージが長すぎるため、送信時間が長くなります.実際のプロジェクトでは一度に4万件以上のデータを送ったが、問題はなかった.
 
 
二、受信メッセージ
次に、メッセージを消費するプロセスについてお話しします.私は単一の複数のchannelを接続し、各channelは毎回1つのメッセージメソッドしか取りません.単一のTCP接続で、複数のchannelが通信効率に影響するかどうかを聞く人がいます.この理論にはきっと影響があるに違いない.影響は大きくないと思う.私が運転しているchannelの数は普通30ぐらいまで行って、効率に影響を与えていないのは、私がchannelごとにメッセージを持っているからかもしれません.複数のchannelを単一に接続する方法で、多くの接続を少なくすることができます.私がなぜチャンネルごとに1つのメッセージしか取らないのかについては、外部の要因が制限されており、具体的には自分のニーズを見ています.
次にメッセージを受信するプロセスでは、まずRabbitMQHelperクラスを定義します.グローバルなconn接続変数が含まれています.また、接続の作成、接続のクローズ、接続が開いているかどうかを確認する方法もあります.プログラムはタイマーを実行し、
接続が開いていないことが検出された場合、接続処理メッセージをアクティブに作成します.
 public class RabbitMQHelper
    {
        public IConnection conn = null;

        /// 
        ///   RabbitMQ       
        /// 
        ///       
        public IConnection RabbitConnection(string sHostName, ushort nChannelMax)
        {
            try
            {
                if (conn == null)
                {
                    var factory = new ConnectionFactory()
                    {
                        UserName = "guest",//   
                        Password = "guest",//  
                        HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"],
                        AutomaticRecoveryEnabled = false,//
                        RequestedConnectionTimeout = 10000,//        10 ,    30 
                        RequestedChannelMax = nChannelMax//          
                    };
                    //    
                    conn = factory.CreateConnection();
                    Console.WriteLine("RabbitMQ     !");
                }

                return conn;
            }
            catch
            {
                Console.WriteLine("      ,   RabbitMQ      !");
                return null;
            }
        }

        /// 
        ///   RabbitMQ  
        /// 
        public void Close()
        {
            try
            {
                if (conn != null)
                {
                    if (conn.IsOpen)
                        conn.Close();
                    conn = null;
                    Console.WriteLine("RabbitMQ     !");
                }
            }
            catch { }
        }

        /// 
        ///   RabbitMQ      
        /// 
        /// 
        public bool IsOpen()
        {
            try
            {
                if (conn != null)
                {
                    if (conn.IsOpen)
                        return true;
                }
                return false;
            }
            catch
            {
                return false;
            }
        }
    }

 

       接下来我们看具体如何接收消息。

private static AutoResetEvent myEvent = new AutoResetEvent(false);
private RabbitMQHelper rabbit = new RabbitMQHelper();
private ushort nChannel = 10;//                   

まずrabbitインスタンスを初期化し、RabbitConnectionメソッドでRabbitMQ接続を作成します.
接続が開くと、スレッドプールでメッセージを受信する方法が実行されます.ここで開いているスレッドは、開いているchannelの数と一致しなければならないことに注意してください.そうしないと問題が発生します(具体的な問題は、RabbitMQ接続タイムアウト時間を10秒に設定し、使用しない場合があります.原因は不明です.RabbitMQ接続のデフォルトタイムアウト時間は30秒で、この時間内に作成を呼び出すと、2倍のchannelが得られる可能性があります.)/// /// RabbitMQ , channel /// private void CreateConnecttion() { try { rabbit.RabbitConnection("localhost", nChannel); if (rabbit.conn != null) { ThreadPool.SetMinThreads(1, 1); ThreadPool.SetMaxThreads(100, 100); for (int i = 1; i <= nChannel; i++) { ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), ""); } myEvent.WaitOne();// rabbit.Close(); } } catch (Exception ex) { rabbit.Close(); Console.WriteLine(ex.Message); } }
 
次にメッセージを受信する方法であり,メッセージを処理する過程は省略される. /// /// , (channel), /// /// RabbitMQ private void ReceiveMsg(object obj) { IModel channel = null; try { #region channel = rabbit.conn.CreateModel(); channel.ExchangeDeclare( exchange: "TestTopicChange", //exchange type: ExchangeType.Topic, //Topic , durable: true,//exchange autoDelete: false,// , false arguments: null// , :alternate-exchange ); // channel.QueueDeclare( queue: "Test_Queue", // durable: true, // ( ) exclusive: false,// ,false。 , , autoDelete: false, arguments: null ); #endregion channel.BasicQos(0, 1, false);// channel.QueueBind(queue: "Test_Queue", exchange: "TestTopicChange", routingKey: "TestRouteKey.*"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; // try { bool isMark = AutoMark(message); if (isMark) { //Function.writeMarkLog(message); // , RabbitMQ channel.BasicAck(ea.DeliveryTag, false); } else { if (MarkErrorSend(message))// channel.BasicReject(ea.DeliveryTag, false); else // , , , channel.BasicReject(ea.DeliveryTag, true); } } catch (Exception ex) { try { Console.WriteLine(ex.Message); if (channel != null && channel.IsOpen)// RabbitMQ { // , , , channel.BasicReject(ea.DeliveryTag, true); } } catch { } } }; // channel.BasicConsume(queue: "Test_Queue", autoAck: false, consumer: consumer); } catch (Exception ex) { try { Console.WriteLine("" + ex.Message); if (channel != null && channel.IsOpen)// channel.Close(); if (rabbit.conn != null)// RabbitMQ rabbit.Close(); } catch { } } }
 
 
三、エラーメッセージの処理
処理に失敗したメッセージを「エラーキュー」に配置し、その後、元のキューのメッセージを削除します(ここで主な問題は、処理に失敗したり処理できないメッセージが複数ある場合、これらのメッセージを元のキューに戻すと、他のスレッドのchannelに配布され続けますが、結果的に処理できず、デッドサイクルをもたらし、後のメッセージが処理できなくなります).最初に処理できなかったメッセージを「エラーキュー」に配置した後、新しい接続を再開して「エラーキュー」のメッセージを処理します./// /// “ ” /// /// /// private bool MarkErrorSend(string msg) { RabbitMQHelper MQ = new RabbitMQHelper(); MQ.RabbitConnection("localhost",1); // var channel = MQ.conn.CreateModel(); try { // Direct channel.ExchangeDeclare( exchange: "ErrorTopicChange", //exchange type: ExchangeType.Topic, //Topic , durable: true,//exchange autoDelete: false,// , false arguments: null// , :alternate-exchange ); // channel.QueueDeclare( queue: "Error_Queue", // durable: true, // ( ) exclusive: false,// ,false。 , , autoDelete: false,// , false arguments: null ); // string routeKey = "ErrorRouteKey.*";//* channel.QueueBind( queue: "Error_Queue", exchange: "ErrorTopicChange", routingKey: routeKey, arguments: null ); // , DeliveryMode 2( ) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();// byte[] sendBytes = Encoding.UTF8.GetBytes(msg); // channel.BasicPublish( exchange: "ErrorTopicChange", routingKey: "ErrorRouteKey.one", basicProperties: properties, body: sendBytes ); bool isAllPublished = channel.WaitForConfirms();// (channel) true return isAllPublished; } catch (Exception ex) { // return false; } finally { channel.Close(); MQ.conn.Close(); } }
 
まとめ:RabbitMQ自体は安定しており、性能もよく、すべての不安定な要素がメッセージを処理する過程にあるので、安心して使用できます.
Demoソースアドレス:https://github.com/Bingjian-Zhu/RabbitMQHelper