.Net Core商城マイクロサービスプロジェクトシリーズ(十一):MQ消費者は独立してWindowサービス+メッセージ処理サービス

18834 ワード

以前はMQを使用していたときにdllパブリケーションNugetパッケージにカプセル化して使用していましたが、メッセージのパブリケーションと消費は使用するサイトとサービスに結合されており、2つの問題が発生します.
1.メッセージの消費のたびにインタフェースの呼び出しを意味するため、サービスとサイトの圧力を増加させる.この部分の圧力は、使用するサイトとサービスのマシンに加えられる.
2.変更の複雑さを増し、2つの消費ログを追加する必要がある場合は、dll参照を再発行する必要があります.
 
次の2つの作業が必要です
1.MQの受信はWindowsサービスに分割され、zokeerperによってプライマリスレーブが単一の障害を防止する.
2.MQの消費はここで単独のWebApiサービスを作ります.
 
このようなメリットには、次のようなものがあります.
1.デカップリング.MQの消費は利用するサイトやサービスから切り離され,サービスストレスを軽減する.
2.プログラムの保守性とデバッグ性を高める.
3.単独配置でスループットを向上させる.
 
まず、MQの消費者サービス側を見てみましょう.実は、前にインタフェースを調整する方法をWebApiに単独で配置することで、サーバーの圧力を軽減することができます.
        /// 
        /// MQ          
        /// 
        [HttpPost]
        public async Task ConsumerProcessEventAsync([FromBody]ConsumerProcessEventRequest request)
        {
            ConsumerProcessEventResponse response = new ConsumerProcessEventResponse();
            try
            {
                _logger.LogInformation($"MQ    ConsumerProcessEvent  ,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}");
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
                    //     routingKey       
                    var subscriptions = await StackRedis.Current.GetAllList(request.RoutingKey);
                    if (!subscriptions.Any())
                    {
                        //  Redis                Redis 
                        var queryRoutingKeyApiUrlResponse = _apiHelperService.PostAsync(ServiceAddress.QueryRoutingKeyApiUrlAsync, new QueryRoutingKeyApiUrlRequest { RoutingKey = request.RoutingKey });
                        if (queryRoutingKeyApiUrlResponse.Result != null && queryRoutingKeyApiUrlResponse.Result.ApiUrlList.Any())
                        {
                            subscriptions = queryRoutingKeyApiUrlResponse.Result.ApiUrlList;
                            Task.Run(() =>
                            {
                                StackRedis.Current.SetLists(request.RoutingKey, queryRoutingKeyApiUrlResponse.Result.ApiUrlList);
                            });
                        }
                    }
                    if(subscriptions!=null && subscriptions.Any())
                    {
                        foreach (var apiUrl in subscriptions)
                        {
                            Task.Run(() =>
                            {
                                _logger.LogInformation(request.MQBodyMessage);
                            });

                            //          MQ                                            
//
await _apiHelperService.PostAsync(apiUrl, request.MQBodyMessage); } _logger.LogInformation($"MQ ProcessEvent ,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } } } catch(Exception ex) { response.Successful = false; response.Message = ex.Message; _logger.LogError(ex, $"MQ RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } return response; }

 

这个WebApi只有这一个方法,就是根据RoutingKey查找对应的MQ配置,然后根据配置的接口地址调用指定的接口,比较简单哈,之前也写过,就不细说了。

 

我们来看接收MQ消息的Windows服务端,MQ首次使用都需要重新绑定Routingkey、队列和交换器,所以我在Monitor服务里写了一个绑定的方法,在Windows服务端启动的时候调用一次:

public class MQConsumerService
    {
        private readonly IApiHelperService _apiHelperService;
        private ILog _logger;

        public MQConsumerService(IApiHelperService apiHelperService,ILog logger)
        {
            _apiHelperService = apiHelperService;
            _logger = logger;
        }

        /// 
        ///   MQ MQ     
        /// 
        /// 
        /// 
        public void ProcessEvent(string routingKey, string message)
        {
            try
            {
                _logger.Info($"MQ    ProcessEvent  ,RoutingKey:{routingKey} Message:{message}");
                _apiHelperService.PostAsync(ServiceUrls.ConsumerProcessEvent,new ConsumerProcessEventRequest { RoutingKey=routingKey,MQBodyMessage=message});
            }
            catch(Exception ex)
            {
                _logger.Error($"MQ          RoutingKey:{routingKey} Message:{message}",ex);
            }
        }

        /// 
        /// MQ               
        /// 
        /// 
        public async Task MQSubscribeAsync()
        {
            try
            {
                var response= await _apiHelperService.PostAsync(ServiceUrls.MQSubscribe, new MQSubscribeRequest());
                if(!response.Successful)
                {
                    _logger.Error($"MQ  RoutingKey    : {response.Message}");
                }
            }
            catch(Exception ex)
            {
                _logger.Error($"MQ  RoutingKey    ",ex);
            }            
        }
    }

 

这里为了简单起见,交换器和队列使用的都是同一个,路由方式是“direct”模式,之后会继续修改的,先跑起来再说:

static void Main(string[] args)
        {
            //   (Exchange)
            const string BROKER_NAME = "mi_event_bus";
            //  (Queue)
            var SubscriptionClientName = "RabbitMQ_Bus_MI";
            //log4net    
            ILoggerRepository repository = LogManager.CreateRepository("MI.WinService.MQConsumer");
            XmlConfigurator.Configure(repository, new FileInfo("log4net.config"));
            ILog log = LogManager.GetLogger(repository.Name, "MI.WinService.MQConsumer");
            //      
            IServiceCollection serviceCollection = new ServiceCollection();
            //WebApi   
            serviceCollection.AddTransient();
            var serviceProvider = serviceCollection.AddHttpClient().BuildServiceProvider();
            serviceProvider.GetService();
            var apiHelperService = serviceProvider.GetService();
            //MQ   (  MQ      、       )
            MQConsumerService consumerService = new MQConsumerService(apiHelperService,log);

            //MQ   
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "",
                Password = "",
                HostName = ""
            };

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

            channel.QueueDeclare(queue: SubscriptionClientName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ch, ea) =>
            {
                //   MQ     
                var message = Encoding.UTF8.GetString(ea.Body);
                log.Info($"MQ       RoutingKey:{ea.RoutingKey} Message:{message}");

                //   MQ     MQStationServer
                Task result= Task.Run(() =>
                {
                    consumerService.ProcessEvent(ea.RoutingKey, message);
                });
                if(!result.IsFaulted)
                {
                    //  ack
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(SubscriptionClientName, false, consumer);
            Console.WriteLine("");

            //    RoutingKey
            Task taskResult= Task.Run(async() =>
            {
                await consumerService.MQSubscribeAsync();
            });

            taskResult.Wait();


            Console.WriteLine("  RoutingKey    !");

            Console.ReadKey();
            channel.Dispose();
            connection.Close();
        }

 
最後に消費者側消費MQプロセスを整理する.
MQがリリースされると、Windowsサービス側はMQメッセージを受け取り、呼び出しインタフェースを通じてMQコンシューマサービス側にメッセージを送信し、RoutingKeyを通じてデータベースから対応するMQとインタフェース構成を検索し、指定インタフェースを呼び出す.健康診断およびサーバレベルのクラスタは、単一の障害を防止します.