.Net Core商城マイクロサービスプロジェクトシリーズ(十一):MQ消費者は独立してWindowサービス+メッセージ処理サービス
18834 ワード
以前はMQを使用していたときにdllパブリケーションNugetパッケージにカプセル化して使用していましたが、メッセージのパブリケーションと消費は使用するサイトとサービスに結合されており、2つの問題が発生します.
1.メッセージの消費のたびにインタフェースの呼び出しを意味するため、サービスとサイトの圧力を増加させる.この部分の圧力は、使用するサイトとサービスのマシンに加えられる.
2.変更の複雑さを増し、2つの消費ログを追加する必要がある場合は、dll参照を再発行する必要があります.
次の2つの作業が必要です
1.MQの受信はWindowsサービスに分割され、zokeerperによってプライマリスレーブが単一の障害を防止する.
2.MQの消費はここで単独のWebApiサービスを作ります.
このようなメリットには、次のようなものがあります.
1.デカップリング.MQの消費は利用するサイトやサービスから切り離され,サービスストレスを軽減する.
2.プログラムの保守性とデバッグ性を高める.
3.単独配置でスループットを向上させる.
まず、MQの消費者サービス側を見てみましょう.実は、前にインタフェースを調整する方法をWebApiに単独で配置することで、サーバーの圧力を軽減することができます.
最後に消費者側消費MQプロセスを整理する.
MQがリリースされると、Windowsサービス側はMQメッセージを受け取り、呼び出しインタフェースを通じてMQコンシューマサービス側にメッセージを送信し、RoutingKeyを通じてデータベースから対応するMQとインタフェース構成を検索し、指定インタフェースを呼び出す.健康診断およびサーバレベルのクラスタは、単一の障害を防止します.
1.メッセージの消費のたびにインタフェースの呼び出しを意味するため、サービスとサイトの圧力を増加させる.この部分の圧力は、使用するサイトとサービスのマシンに加えられる.
2.変更の複雑さを増し、2つの消費ログを追加する必要がある場合は、dll参照を再発行する必要があります.
次の2つの作業が必要です
1.MQの受信はWindowsサービスに分割され、zokeerperによってプライマリスレーブが単一の障害を防止する.
2.MQの消費はここで単独のWebApiサービスを作ります.
このようなメリットには、次のようなものがあります.
1.デカップリング.MQの消費は利用するサイトやサービスから切り離され,サービスストレスを軽減する.
2.プログラムの保守性とデバッグ性を高める.
3.単独配置でスループットを向上させる.
まず、MQの消費者サービス側を見てみましょう.実は、前にインタフェースを調整する方法をWebApiに単独で配置することで、サーバーの圧力を軽減することができます.
///
/// MQ /// [HttpPost] public async Task
ConsumerProcessEventAsync([FromBody]ConsumerProcessEventRequest request) { ConsumerProcessEventResponse response = new ConsumerProcessEventResponse(); try { _logger.LogInformation($"MQ ConsumerProcessEvent ,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { // routingKey var subscriptions = await StackRedis.Current.GetAllList(request.RoutingKey); if (!subscriptions.Any()) { // Redis Redis var queryRoutingKeyApiUrlResponse = _apiHelperService.PostAsync (ServiceAddress.QueryRoutingKeyApiUrlAsync, new QueryRoutingKeyApiUrlRequest { RoutingKey = request.RoutingKey }); if (queryRoutingKeyApiUrlResponse.Result != null && queryRoutingKeyApiUrlResponse.Result.ApiUrlList.Any()) { subscriptions = queryRoutingKeyApiUrlResponse.Result.ApiUrlList; Task.Run(() => { StackRedis.Current.SetLists(request.RoutingKey, queryRoutingKeyApiUrlResponse.Result.ApiUrlList); }); } } if(subscriptions!=null && subscriptions.Any()) { foreach (var apiUrl in subscriptions) { Task.Run(() => { _logger.LogInformation(request.MQBodyMessage); }); // MQ
//
await _apiHelperService.PostAsync(apiUrl, request.MQBodyMessage); } _logger.LogInformation($"MQ ProcessEvent ,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } } } catch(Exception ex) { response.Successful = false; response.Message = ex.Message; _logger.LogError(ex, $"MQ RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } return response; }
这个WebApi只有这一个方法,就是根据RoutingKey查找对应的MQ配置,然后根据配置的接口地址调用指定的接口,比较简单哈,之前也写过,就不细说了。
我们来看接收MQ消息的Windows服务端,MQ首次使用都需要重新绑定Routingkey、队列和交换器,所以我在Monitor服务里写了一个绑定的方法,在Windows服务端启动的时候调用一次:
public class MQConsumerService { private readonly IApiHelperService _apiHelperService; private ILog _logger; public MQConsumerService(IApiHelperService apiHelperService,ILog logger) { _apiHelperService = apiHelperService; _logger = logger; } ///
/// MQ MQ /// /// /// public void ProcessEvent(string routingKey, string message) { try { _logger.Info($"MQ ProcessEvent ,RoutingKey:{routingKey} Message:{message}"); _apiHelperService.PostAsync
(ServiceUrls.ConsumerProcessEvent,new ConsumerProcessEventRequest { RoutingKey=routingKey,MQBodyMessage=message}); } catch(Exception ex) { _logger.Error($"MQ RoutingKey:{routingKey} Message:{message}",ex); } } /// /// MQ /// ///
public async Task MQSubscribeAsync() { try { var response= await _apiHelperService.PostAsync (ServiceUrls.MQSubscribe, new MQSubscribeRequest()); if(!response.Successful) { _logger.Error($"MQ RoutingKey : {response.Message}"); } } catch(Exception ex) { _logger.Error($"MQ RoutingKey ",ex); } } }
这里为了简单起见,交换器和队列使用的都是同一个,路由方式是“direct”模式,之后会继续修改的,先跑起来再说:
static void Main(string[] args)
{
// (Exchange)
const string BROKER_NAME = "mi_event_bus";
// (Queue)
var SubscriptionClientName = "RabbitMQ_Bus_MI";
//log4net
ILoggerRepository repository = LogManager.CreateRepository("MI.WinService.MQConsumer");
XmlConfigurator.Configure(repository, new FileInfo("log4net.config"));
ILog log = LogManager.GetLogger(repository.Name, "MI.WinService.MQConsumer");
//
IServiceCollection serviceCollection = new ServiceCollection();
//WebApi
serviceCollection.AddTransient();
var serviceProvider = serviceCollection.AddHttpClient().BuildServiceProvider();
serviceProvider.GetService();
var apiHelperService = serviceProvider.GetService();
//MQ ( MQ 、 )
MQConsumerService consumerService = new MQConsumerService(apiHelperService,log);
//MQ
ConnectionFactory factory = new ConnectionFactory
{
UserName = "",
Password = "",
HostName = ""
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
channel.QueueDeclare(queue: SubscriptionClientName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
// MQ
var message = Encoding.UTF8.GetString(ea.Body);
log.Info($"MQ RoutingKey:{ea.RoutingKey} Message:{message}");
// MQ MQStationServer
Task result= Task.Run(() =>
{
consumerService.ProcessEvent(ea.RoutingKey, message);
});
if(!result.IsFaulted)
{
// ack
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(SubscriptionClientName, false, consumer);
Console.WriteLine(" !");
// RoutingKey
Task taskResult= Task.Run(async() =>
{
await consumerService.MQSubscribeAsync();
});
taskResult.Wait();
Console.WriteLine(" RoutingKey !");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
最後に消費者側消費MQプロセスを整理する.
MQがリリースされると、Windowsサービス側はMQメッセージを受け取り、呼び出しインタフェースを通じてMQコンシューマサービス側にメッセージを送信し、RoutingKeyを通じてデータベースから対応するMQとインタフェース構成を検索し、指定インタフェースを呼び出す.健康診断およびサーバレベルのクラスタは、単一の障害を防止します.