[AOP]5.Spring AOPで提供される様々なAsppects-合併制御

20344 ワード

本論文では引き続きConcerencyThrottleInterceptor(Springによる4.3.7)について議論します。前の記事で残したSimpleAsyncTaskExectorクラスのプロパティconcurrencyLimitに関する問題。
これらは合併制御と関連している。しかし、ここで先に説明しなければならないのは、これらの種類と実現の年代は比較的に古くなりました。例えば、ConcerencyThrottleInterceptorは2004年のSpring 1.xの中に存在しています。その年代はJDKのjava.util.co ncurrentがまだありません。したがって、ここではもっと多くのことを学んで、特定の問題を解決するための思想を討論して、みんなにそれを使うように励ますのではありません。同時制御の問題については、パッケージ内の関連クラスを利用して、より良く解決できます。
まず、恒例のキータイプの間の概略図を描きます。
同時制御はどうやって実現されますか?
ConcerencyThrottleInterceptor
/**
 * Interceptor that throttles concurrent access, blocking invocations if a specified concurrency
 * limit is reached.
 *
 * 

* Can be applied to methods of local services that involve heavy use of system resources, in a * scenario where it is more efficient to throttle concurrency for a specific service rather than * restricting the entire thread pool (e.g. the web container's thread pool). * *

* The default concurrency limit of this interceptor is 1. Specify the "concurrencyLimit" bean * property to change this value. * * @author Juergen Hoeller * @since 11.02.2004 * @see #setConcurrencyLimit */

@SuppressWarnings("serial") public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport implements MethodInterceptor, Serializable { public ConcurrencyThrottleInterceptor() { setConcurrencyLimit(1); } @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { beforeAccess(); try { return methodInvocation.proceed(); } finally { afterAccess(); } } }
このタイプの実現もとても簡潔で、ConcerencyThrottleSupportを継承しています。MethodInterceptorとSerializableのインターフェースを実現しました。その中のinvoke方法は主にMethodIntercepterインターフェースを実現するためです。
これは、ターゲットメソッドの呼び出しに対して、同時制御を実現し、concurrencyLimitによってマージンを定義するという機能的な注釈でも説明されています。
上記のinvoke方法の実現から、主な制御ロジックはすべてbeforeAccessという方法の実現にあるはずです。これは親タイプのConcerencyThrottleSupportで定義されています。
protected void beforeAccess() {
  if (this.concurrencyLimit == NO_CONCURRENCY) {
    throw new IllegalStateException(
        "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
  }
  if (this.concurrencyLimit > 0) {
    boolean debug = logger.isDebugEnabled();
    synchronized (this.monitor) {
      boolean interrupted = false;
      while (this.concurrencyCount >= this.concurrencyLimit) {
        if (interrupted) {
          throw new IllegalStateException(
              "Thread was interrupted while waiting for invocation access, "
                  + "but concurrency limit still does not allow for entering");
        }
        if (debug) {
          logger.debug("Concurrency count " + this.concurrencyCount + " has reached limit "
              + this.concurrencyLimit + " - blocking");
        }
        try {
          this.monitor.wait();
        } catch (InterruptedException ex) {
          // Re-interrupt current thread, to allow other threads to react.
          Thread.currentThread().interrupt();
          interrupted = true;
        }
      }
      if (debug) {
        logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
      }
      this.concurrencyCount++;
    }
  }
}
まず合併度がNO_に設定されているかどうかを判断します。CONCURRENCY(0)は、すべての実行を許可しない。そうでしたら、直接に異常を投げて提示します。
concurrencyLimitが0より大きい場合、monitorというオブジェクトに同期コードブロックを設定します。この同期コードブロックに適用されるのは、最下層のwait/nofity機構が合併制御を実現するためのものであり、wait/notify工作機構の参考例とも言えるでしょう。
  • は一つのwhileサイクルの中でwait操作を行う必要があります。もちろんこれは必要ではありません。一番いい実践です。whileの循環条件は、具体的な業務によって異なり、この条件の役割は、現在閉塞されているスレッドが本当に再実行されることを保証するものであり、上の例においては、現在の合併量が閾値以下である場合のみ、ブロックされているスレッドを起動することができる。したがって、マルチスレッド環境では、すべてが可能であり、ある起動後、他のスレッドの影響で、本来満たされていた条件が再び満たされない状態になります。
  • は、同期コードブロックでwait動作を行う。上の実装から見れば、whileサイクルも確かに同期コードブロックの中にある。このようにする目的は、メッセージを呼び覚ます正確さを保証するためである。
  • 同期コードブロックのモニタオブジェクトは、上記コードのmonitorオブジェクトのようなwaitメソッドの呼び出しオブジェクトと一致するはずである。
  • 方法の最後に、カウンタconcurrencyCountの値を増加させて、現在のマージン量を表す。
    before Accessメソッドの呼び出しが完了したら、目的方法を実行します。実行後にfinallyコードブロックでafterAccessメソッドを呼び出します。
    protected void afterAccess() {
      if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
          this.concurrencyCount--;
          if (logger.isDebugEnabled()) {
            logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
          }
          this.monitor.notify();
        }
      }
    }
    この方法の役割は簡単であり,ターゲット方法の実行が完了した後に現在の同時カウンタを低減し,遮断されたスレッドを起動することである。ここで注意すべきことは、ブロックされたスレッドを起動するnotify動作も同じモニタオブジェクトの同期コードブロックで実現されることである。
    重要な方法の実現を知ると、このInterceptorの役割も非常に明確になります。例えば、3つのマージンを設定すると、ターゲット方法は最大3つのスレッドしか同時にアクセスできなくなり、4番目のスレッドにアクセスしようとすると、waitでブロックされ、前の3つのスレッドのうち1つが実行されてから、ブロックされたスレッドが起動される。
    アプリケーションのインスタンス
    Advisorの定義
    簡単な例を書いてConcerencyThrottleInterceptrを適用します。まずPointcutとInterceptor自身を定義します。
    @Bean
    public Advisor throttleAdvisor() {
      AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
      pointcut.setExpression(
          "execution(* (@org.springframework.stereotype.Service *).doThrottleThings(..))");
    
      return new DefaultPointcutAdvisor(pointcut, concurrencyThrottleInterceptor());
    }
    
    @Bean
    public ConcurrencyThrottleInterceptor concurrencyThrottleInterceptor() {
      ConcurrencyThrottleInterceptor interceptor = new ConcurrencyThrottleInterceptor();
      interceptor.setConcurrencyLimit(3);
      return interceptor;
    }
    ここではJavaConfigを完全に使って配置しています。最初の方法はthrottleAdvisorによって宣言されたもので、実際には完全なAspectであり、二つの部分が含まれています。
  • Pointcut
  • Intercepter
  • 目標方法の定義
    public void doThrottleThings() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": is doing something, needing 5s");
        Thread.sleep(5000);
      }
    併発制御が有効かどうかを検証するために、まず現在のスレッドの名前をプリントしてから5秒眠る。
    起動方法の定義
    JUnitヘルプを直接使う方法の起動:
    @Test
    public void testConcurrencyThrottle() throws InterruptedException {
      IntStream.range(0, 5).forEach(i -> {
        new Thread(() -> {
          try {
            service.doThrottleThings();
          } catch (Exception e) {
            e.printStackTrace();
          }
        }).start();
      });
    
      Thread.sleep(10000);
    }
    最後に10 s眠ったのは、このテスト方法が終了しないようにするためです。これはあくまでもメインメソッドではないので、JUnitはテスト方法が完了したら全てのスレッドを削除し、JVMをオフにします。
    最後の印刷結果はこうです。
    [Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 0
    [Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
    [Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
    [Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
    [Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
    Thread-4: is doing something, needing 5s
    Thread-5: is doing something, needing 5s
    Thread-6: is doing something, needing 5s
    [Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
    [Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
    Thread-3: is doing something, needing 5s
    [Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
    [Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 1
    [Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
    Thread-2: is doing something, needing 5s
    合併量が3に達した後、残りの2つのスレッドがブロックされることが明確に分かった。5 s後にこの二つのスレッドが呼び覚まされます。
    SimpleAsync TaskExectorにおける同時制御
    同時制御をどのように処理しているのかが分かりました。SimpleAsyncTaskExectorの同時制御を見てみましょう。このクラスにはこのようなメンバーオブジェクトと2つの定数があります。
    /**
     * Permit any number of concurrent invocations: that is, don't throttle concurrency.
     */
    public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
    
    /**
     * Switch concurrency 'off': that is, don't allow any concurrent invocations.
     */
    public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
    
    /** Internal concurrency throttle used by this executor */
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
    つまり内部では、ConcerencyThrottleAdapterという合併制御オブジェクトを使用しており、ConcerencyThrottleSupportで定義されている2つの定数を多重化して、合併度を制限しないということと、完全に実行できないということを表しています。
    /**
     * Subclass of the general ConcurrencyThrottleSupport class, making {@code beforeAccess()} and
     * {@code afterAccess()} visible to the surrounding class.
     */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
    
      @Override
      protected void beforeAccess() {
        super.beforeAccess();
      }
    
      @Override
      protected void afterAccess() {
        super.afterAccess();
      }
    }
    このクラスの目的は、SimpleAyncTaskExectorがConcerencyThrottleSupportに定義されているbeforeAccessとafterAccessの2つの方法にアクセスできるようにすることです。
    いつこのアプリを使いますか?
    @Override
    public void execute(Runnable task, long startTimeout) {
      Assert.notNull(task, "Runnable must not be null");
      Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
      if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        this.concurrencyThrottle.beforeAccess();
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
      } else {
        doExecute(taskToUse);
      }
    }
    上記のコードは、SimpleAsyncTaskExectorに定義されるexecute方法である。この中には三つの問題があります。
  • task Decoratorは何ですか?
  • いつ合併制御を実行しますか?また、どのようなタスクが同時に制御されますか?
  • ConccurrencyThrottlingRunnableは何のために使いますか?
  • Task Decorator
    まず最初の問題:
    Task Decoratorはインターフェースで、装飾器モードの概念を抽象化しています。
    public interface TaskDecorator {
    
        /**
         * Decorate the given {@code Runnable}, returning a potentially wrapped
         * {@code Runnable} for actual execution.
         * @param runnable the original {@code Runnable}
         * @return the decorated {@code Runnable}
         */
        Runnable decorate(Runnable runnable);
    
    }
    Springにはこのインターフェースの実現はありませんので、この想像空間は開発者の皆様に残しておきましたが、Javadocの説明からその意図が分かります。
  • タスク実行のコンテキスト環境を設定する
  • いくつかの監視と統計の仕事を行います。
    同時制御の実行タイミング
    まず満足すべき条件はisThrottleActive()がtrueに戻ることです。
    public boolean isThrottleActive() {
      return (this.concurrencyLimit > 0);
    }
    つまり設定されたマージンが0より大きい場合は、Throttling機能がオンされていると考えられます。
    また、満足すべき条件はreturn methodInvocation.proceed();である。後の定数の値は0です。つまりどのようなタスクのtimeout値がTIMEOUT_uに設定されていますか?IMMEDIATEでは、このようなタスクは同時制御の対象外です。(また、このティムアウトはTIMEOUT MouIMMEDIATEと比較した以外に、他の用途がないということは、ちょっとおかしいと思います。)
    ConcerencyThrottlingRunnable
    最終的には、ConcerencyThrottlingRunnableオブジェクトにタスクをカプセル化して、wrapperオブジェクトを実行します。
    /**
     * This Runnable calls {@code afterAccess()} after the target Runnable has finished its execution.
     */
    private class ConcurrencyThrottlingRunnable implements Runnable {
    
      private final Runnable target;
    
      public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
      }
    
      @Override
      public void run() {
        try {
          this.target.run();
        } finally {
          concurrencyThrottle.afterAccess();
        }
      }
    }
    このようにしても、fterAccessと連携してコントロールするリソースのリリースをfinallyで実行するためだけです。
    締め括りをつける
    本論文では、2つの態様の内容を議論する。
  • ConccurrencyThrottleInterceptorの実現原理
  • SimpleAsyncTaskExectorは、どうやって同時制御を実現しますか?
    ここでSpring AOPのいくつかの実用的なAsppects(Interceptors)が紹介されました。