ASP.NET Core 3.x同時制限

10237 ワード

前言


Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0以降増加する、入力要求に対してキュー処理を行い、スレッドプールの不足を回避する.我々は日常開発でよくあるウェブサーバに接続数を構成し、要求キューサイズを設定することができるが、今日はミドルウェア形式で同時量とキュー長の制限を実現する方法を見てみよう.

Queueポリシー


Nugetの追加Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                //       
                options.MaxConcurrentRequests = 2;
                //        
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            //         
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }     

上記の簡単な構成によって、私たちは彼を私たちのコードに導入することができて、それによって同時量の制限をして、キューの長さをすることができます.では問題が来て、彼はどのように実現したのでしょうか.
 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action configure)
{
        services.Configure(configure);
        services.AddSingleton();
        return services;
}

QueuePolicyはSemaphoreSlim信号量設計を採用しており、SemaphoreSlim、Semaphore(信号量)は同時マルチスレッドが被保護コードに入ることをサポートし、オブジェクトは初期化時に最大タスク数を指定し、スレッドがリソースへのアクセスを要求すると信号量が減少し、彼らが解放されると信号量カウントが増加する.
      /// 
        ///         (   Queue  )
        /// 
        /// 
        public QueuePolicy(IOptions options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //  SemaphoreSlim         
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

ConcurrencyLimiterMiddlewareミドルウェア
        /// 
        /// Invokes the logic of the middleware.
        /// 
        /// The .
        /// A  that completes when the request leaves.
        public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }

私たちが要求するたびに、まず_queuePolicy.TryEnterAsync()が呼び出され、この方法に入った後、まずプライベートロックを開き、次に総要求量が≧(要求キュー制限の大きさ+最大同時要求数)であるかどうかを判断し、現在の数が超えている場合、私は直接投げ出して、503状態を送ります.
  if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

問題が来て、私の側はまだあなたの設置の大きさに着いていないと言って、私のこの要求はあなたのサーバーに圧力を作ることができなくて、それではあなたは私に処理してください.await _serverSemaphore.WaitAsync();非同期待機アクセス信号量は、スレッドが信号量へのアクセス権限を付与されていない場合、実行保護コードに入る.信号量が解放されるまで、このスレッドはここで待機します.
 lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        //         ,                  ,         ;           ,          
        await _serverSemaphore.WaitAsync();
        return true;
    }

成功に戻るとミドルウェア側は処理を行い、_queuePolicy.OnExit();はこの呼び出しにより_serverSemaphore.Release();を呼び出して信号灯を解放し、総要求数を減算する

Stackポリシー


もう一つの方法、スタック戦略を見てみましょう.彼はどうやってやったのですか.一緒に見てみましょう.どのように使うかのコードを添付します.
     public void ConfigureServices(IServiceCollection services)
        {
            services.AddStackPolicy(options =>
            {
                //       
                options.MaxConcurrentRequests = 2;
                //        
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }

上記の構成により、アプリケーションに対して対応するポリシーを実行できます.どうやって実現したのか見てみましょう
  public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action configure)
        {
            services.Configure(configure);
            services.AddSingleton();
            return services;
        }

今回はStackPolicy類による戦略が見られる.主な方法を見てみましょう
        /// 
        ///         (     )
        /// 
        /// 
        public StackPolicy(IOptions options)
        {
            //   
            _buffer = new List();
            //    
            _maxQueueCapacity = options.Value.RequestQueueLimit;
            //       
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            //      
            _freeServerSpots = options.Value.MaxConcurrentRequests;
        }

ミドルウェアリクエスト呼び出し、_queuePolicy.TryEnterAsync()を通過すると、まずアクセスリクエスト回数があるかどうかを判断します.freeServerSpots>0であれば、trueを直接返して、ミドルウェアに次のステップを直接実行させます.現在のキュー=私たちが設定したキューサイズであれば、以前のリクエストをキャンセルする必要があります.キャンセルのたびに、先にキャンセルする前の保留後の要求です.
    public ValueTask TryEnterAsync()
        {
            lock (_bufferLock)
            {
                if (_freeServerSpots > 0)
                {
                    _freeServerSpots--;
                    return _trueTask;
                }
                //       ,       
                if (_queueLength == _maxQueueCapacity)
                {
                    _hasReachedCapacity = true;
                    _buffer[_head].Complete(false);
                    _queueLength--;
                }
                var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
                _cachedResettableTCS = null;
                if (_hasReachedCapacity || _queueLength < _buffer.Count)
                {
                    _buffer[_head] = tcs;
                }
                else
                {
                    _buffer.Add(tcs);
                }
                _queueLength++;
                // increment _head for next time
                _head++;
                if (_head == _maxQueueCapacity)
                {
                    _head = 0;
                }
                return tcs.GetValueTask();
            }
        }

リクエスト後に_queuePolicy.OnExit();スタックを呼び出し、リクエスト長を減算します.
    public void OnExit()
        {
            lock (_bufferLock)
            {
                if (_queueLength == 0)
                {
                    _freeServerSpots++;

                    if (_freeServerSpots > _maxConcurrentRequests)
                    {
                        _freeServerSpots--;
                        throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
                    }

                    return;
                }

                // step backwards and launch a new task
                if (_head == 0)
                {
                    _head = _maxQueueCapacity - 1;
                }
                else
                {
                    _head--;
                }
                //  ,  
                _buffer[_head].Complete(true);
                _queueLength--;
            }
        }

まとめ


スタック構造の特徴に基づいて、実際の応用では、通常、スタックに対して以下の2つの操作しか実行されません.
  • はスタックに要素を追加し、このプロセスを「スタックイン」(スタックインまたはスタックイン)と呼ぶ.
  • は、スタックから指定された要素を抽出し、このプロセスを「スタック外」(またはスタック外)と呼ぶ.

  • キューストレージ構造の実装には、次の2つの方法があります.
  • シーケンスキュー:シーケンステーブルに基づいて実現されるキュー構造;
  • チェーンキュー:チェーンテーブルに基づいて実現されるキュー構造;