ASP.NET Core 3.x同時制限
10237 ワード
前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0以降増加する、入力要求に対してキュー処理を行い、スレッドプールの不足を回避する.我々は日常開発でよくあるウェブサーバに接続数を構成し、要求キューサイズを設定することができるが、今日はミドルウェア形式で同時量とキュー長の制限を実現する方法を見てみよう.
Queueポリシー
Nugetの追加
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services)
{
services.AddQueuePolicy(options =>
{
//
options.MaxConcurrentRequests = 2;
//
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
//
app.UseConcurrencyLimiter();
app.Run(async context =>
{
Task.Delay(100).Wait(); // 100ms sync-over-async
await context.Response.WriteAsync("Hello World!");
});
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
上記の簡単な構成によって、私たちは彼を私たちのコードに導入することができて、それによって同時量の制限をして、キューの長さをすることができます.では問題が来て、彼はどのように実現したのでしょうか.
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action configure)
{
services.Configure(configure);
services.AddSingleton();
return services;
}
QueuePolicyはSemaphoreSlim信号量設計を採用しており、SemaphoreSlim、Semaphore(信号量)は同時マルチスレッドが被保護コードに入ることをサポートし、オブジェクトは初期化時に最大タスク数を指定し、スレッドがリソースへのアクセスを要求すると信号量が減少し、彼らが解放されると信号量カウントが増加する.
///
/// ( Queue )
///
///
public QueuePolicy(IOptions options)
{
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
if (_maxConcurrentRequests <= 0)
{
throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
}
_requestQueueLimit = options.Value.RequestQueueLimit;
if (_requestQueueLimit < 0)
{
throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
}
// SemaphoreSlim
_serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
}
ConcurrencyLimiterMiddlewareミドルウェア
///
/// Invokes the logic of the middleware.
///
/// The .
/// A that completes when the request leaves.
public async Task Invoke(HttpContext context)
{
var waitInQueueTask = _queuePolicy.TryEnterAsync();
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
bool result;
if (waitInQueueTask.IsCompleted)
{
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else
{
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}
私たちが要求するたびに、まず
_queuePolicy.TryEnterAsync()
が呼び出され、この方法に入った後、まずプライベートロックを開き、次に総要求量が≧(要求キュー制限の大きさ+最大同時要求数)であるかどうかを判断し、現在の数が超えている場合、私は直接投げ出して、503状態を送ります. if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
問題が来て、私の側はまだあなたの設置の大きさに着いていないと言って、私のこの要求はあなたのサーバーに圧力を作ることができなくて、それではあなたは私に処理してください.
await _serverSemaphore.WaitAsync();
非同期待機アクセス信号量は、スレッドが信号量へのアクセス権限を付与されていない場合、実行保護コードに入る.信号量が解放されるまで、このスレッドはここで待機します. lock (_totalRequestsLock)
{
if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
{
return false;
}
TotalRequests++;
}
// , , ; ,
await _serverSemaphore.WaitAsync();
return true;
}
成功に戻るとミドルウェア側は処理を行い、
_queuePolicy.OnExit();
はこの呼び出しにより_serverSemaphore.Release();
を呼び出して信号灯を解放し、総要求数を減算するStackポリシー
もう一つの方法、スタック戦略を見てみましょう.彼はどうやってやったのですか.一緒に見てみましょう.どのように使うかのコードを添付します.
public void ConfigureServices(IServiceCollection services)
{
services.AddStackPolicy(options =>
{
//
options.MaxConcurrentRequests = 2;
//
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
上記の構成により、アプリケーションに対して対応するポリシーを実行できます.どうやって実現したのか見てみましょう
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action configure)
{
services.Configure(configure);
services.AddSingleton();
return services;
}
今回は
StackPolicy
類による戦略が見られる.主な方法を見てみましょう ///
/// ( )
///
///
public StackPolicy(IOptions options)
{
//
_buffer = new List();
//
_maxQueueCapacity = options.Value.RequestQueueLimit;
//
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
//
_freeServerSpots = options.Value.MaxConcurrentRequests;
}
ミドルウェアリクエスト呼び出し、
_queuePolicy.TryEnterAsync()
を通過すると、まずアクセスリクエスト回数があるかどうかを判断します.freeServerSpots>0であれば、trueを直接返して、ミドルウェアに次のステップを直接実行させます.現在のキュー=私たちが設定したキューサイズであれば、以前のリクエストをキャンセルする必要があります.キャンセルのたびに、先にキャンセルする前の保留後の要求です. public ValueTask TryEnterAsync()
{
lock (_bufferLock)
{
if (_freeServerSpots > 0)
{
_freeServerSpots--;
return _trueTask;
}
// ,
if (_queueLength == _maxQueueCapacity)
{
_hasReachedCapacity = true;
_buffer[_head].Complete(false);
_queueLength--;
}
var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
_cachedResettableTCS = null;
if (_hasReachedCapacity || _queueLength < _buffer.Count)
{
_buffer[_head] = tcs;
}
else
{
_buffer.Add(tcs);
}
_queueLength++;
// increment _head for next time
_head++;
if (_head == _maxQueueCapacity)
{
_head = 0;
}
return tcs.GetValueTask();
}
}
リクエスト後に
_queuePolicy.OnExit();
スタックを呼び出し、リクエスト長を減算します. public void OnExit()
{
lock (_bufferLock)
{
if (_queueLength == 0)
{
_freeServerSpots++;
if (_freeServerSpots > _maxConcurrentRequests)
{
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}
return;
}
// step backwards and launch a new task
if (_head == 0)
{
_head = _maxQueueCapacity - 1;
}
else
{
_head--;
}
// ,
_buffer[_head].Complete(true);
_queueLength--;
}
}
まとめ
スタック構造の特徴に基づいて、実際の応用では、通常、スタックに対して以下の2つの操作しか実行されません.
キューストレージ構造の実装には、次の2つの方法があります.