[AOP]4.Spring AOPで提供される様々なAsppects-非同期実行
Aync Execution Interceptor
まず、このクラスの関連カテゴリとの関係を見てみましょう.
このクラスの文書から:
AOP Alliance that processes method invocations asyncronously、using a given{@link org.springframe ork.co.task.AyncTask Exector}.Tyliallyused with the@link org.springframe ork.Asprigframe.Astork.AsyncTascduction.
二つの事柄を引き継いだ:
そこで、先にAyncTaskExectorとその親インターフェースの定義を見てみます.
Aync TaskExector執行者の抽象的なレベル
Exector
階層構造上の最上階は同時パッケージ内に位置するExectorインターフェースである.
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
一つのexecute方法を定義し、一つのRunnableを実行すべきタスクとして定義する.タスクとタスクを実行するインフラストラクチャ(新しく作成されたスレッド/スレッドの池にあるスレッド/現在の呼び出しスレッド)を結合することを目的としています.つまり、任務はそれ自身が誰によって実行されるか分かりません.Task Exector
org.springframework.core.task
カバンの中のインターフェースにあります.これはExectorインターフェースにおけるexecuteアプローチの定義を直接カバーしている.そして元のパラメータ名をcommandからtaskに変えました.実はこれもRunnableインターフェースの本質的な意味をより明確に表しています.これは実行可能な命令を表しています.これらの実行可能な命令は一つのタスクを構成しています.public interface TaskExecutor extends Executor {
/**
* Execute the given {@code task}.
* The call might return immediately if the implementation uses
* an asynchronous execution strategy, or might block in the case
* of synchronous execution.
* @param task the {@code Runnable} to execute (never {@code null})
* @throws TaskRejectedException if the given task was not accepted
*/
@Override
void execute(Runnable task);
}
ここでは、この方法はすぐに戻るかもしれない(非同期が実行される場合)、ブロックされるかもしれない(同期が実行される場合、すなわち現在の呼出スレッドを使用して実行される可能性がある).Aync TaskExector
public interface AsyncTaskExecutor extends TaskExecutor {
/** Constant that indicates immediate execution */
long TIMEOUT_IMMEDIATE = 0;
/** Constant that indicates no time limit */
long TIMEOUT_INDEFINITE = Long.MAX_VALUE;
/**
* Execute the given {@code task}.
* @throws TaskRejectedException if the given task was not accepted
*/
void execute(Runnable task, long startTimeout);
/**
* Submit a Runnable task for execution, receiving a Future representing that task.
* @since 3.0
*/
Future> submit(Runnable task);
/**
* Submit a Callable task for execution, receiving a Future
* @since 3.0
*/
Future submit(Callable task);
}
このインターフェースは三つの方法を定義している.一つの方法はexecute方法の再負荷として、実行すべきタスクの緊急度を指定するためのstartTimeoutパラメータを追加した.インターフェースで定義された2つの定数は、この情報を記述するものである.たとえばTIMEOUTを使うIMMEDIATEは現在の任務が緊急であることを意味し、直ちに実行しなければならない.しかし、このパラメータは下のタスク実行者に提示されたもので、参照するかどうかは具体的な実装を見てください.デフォルトではstartTimeoutパラメータを使用しないリロードはTIMEOUTを使用します.INDEFINITEはデフォルトとして、タスクの優先度が低いと定義されます.他の二つのsubmit方法は非同期タスクのために追加され、タスクを提出した後にFutureオブジェクトを取得結果の証拠として得られます.このFutureはJavaScriptのPromiseとして理解できます.両者は本質的に同じ概念です.また、Callableインターフェースに基づくタスクもサポートしており、CallableとRunnableの最大の違いは前者が返却値という概念を持っているため、Taskの概念にもより近い.
Aync Execution Interceptorの階層構造
親について:
/**
* Base class for asynchronous method execution aspects, such as
* {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor}
* or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}.
*
* Provides support for executor qualification on a method-by-method basis.
* {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code
* Executor}, but each individual method may further qualify a specific {@code Executor}
* bean to be used when executing it, e.g. through an annotation attribute.
*
* @author Chris Beams
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 3.1.2
*/
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
//
}
Javadocから見ると、このクラスは非同期的な方法でAsppectsを実行するために必要ないくつかの基礎的なサポートを提供しています.完全な実現部分は貼り出さないで、この種類の主要な機能を話します.
private final Map executors = new ConcurrentHashMap(16);
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
} else {
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {
synchronized (this.executors) {
if (this.defaultExecutor == null) {
this.defaultExecutor = getDefaultExecutor(this.beanFactory);
}
targetExecutor = this.defaultExecutor;
}
}
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
protected abstract String getExecutorQualifier(Method method);
したがって、実行時の一つの方法オブジェクトの実行とその実行を具体的に行うExectorは動的な関連関係である.このExectorインスタンスを取得するために、get Exector rQualfierの戻り値が使用される.qualiferを提供しないなら、デフォルトのExectorを使用します.protected Executor getDefaultExecutor(BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
return beanFactory.getBean(TaskExecutor.class);
} catch (NoUniqueBeanDefinitionException ex) {
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
} catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger
.info("More than one TaskExecutor bean found within the context, and none is named "
+ "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly "
+ "as an alias) in order to use it for async processing: "
+ ex.getBeanNamesFound());
}
}
} catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
// Giving up -> either using local default executor or none at all...
logger.info("No TaskExecutor bean found for async processing");
}
}
return null;
}
すなわち最終的なデフォルトExectorはTaskExectorインターフェースの実現クラスである.実験をして、デフォルトのExectorが何なのかを見てみます.@EnableAsync
@ComponentScan(basePackages = "com.rxjiang")
public class SystemConfiguration {
// ......
}
@Service
public class PlainService
@Async
public void doSomethingAsync() {
System.out.println(Thread.currentThread().getName() + ": Async Biz...");
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {SystemConfiguration.class})
public class AsyncTest {
@Autowired
private PlainService service;
@Test
public void testAsync() {
service.doSomethingAsync();
}
}
印刷された結果:SimpleAsyncTaskExecutor-1: Async Biz...
したがって、最終的に非同期タスクを実行するのは、SimpleAsyncTaskExectorであり、具体的にはクラスで作成作業を完了する:@Override
protected Executor getDefaultExecutor(BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
親クラスのAync Execution AspectionSupportのget Default Exectorを通じてExectorを取得してみましたが、未遂(nullに戻りました)、つまりSpringはTaskExectorインターフェースの実現クラスをデフォルトExectorとして定義してくれませんでした.SimpleAsync TaskExector-デフォルト非同期Exector
SimpleAsyncTaskExectorはどういうことですか?
/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
* executing it asynchronously.
*
* Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
*
*
NOTE: This implementation does not reuse threads! Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
*
* @author Juergen Hoeller
* @since 2.0
* @see #setConcurrencyLimit
* @see SyncTaskExecutor
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {
// ...
}
この簡単な非同期Exectorの戦略は文書で説明されているように、各タスクのために新たにスレッドを作成します.コンカレントという属性でコンカレントを制御します.また、大量のショートタスクを実行するには、それを推奨しません.このような場面ではスレッドベースのExectorを使うのがより適切である.いわゆる制御合併度については、次の文章でConcerencyThrottleInterceptorを紹介する際に一緒に検討しましょう.
彼の父親のCustomizable ThreadCreator機能も簡単で、作成したスレッドの各種属性を設定します.例えば、ThreadGroup、Thread Nameなどです.
ちなみに、このExectorはAync Listenable TaskExectorインターフェースも実現しています.
public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor {
/**
* Submit a {@code Runnable} task for execution, receiving a {@code ListenableFuture}
* representing that task. The Future will return a {@code null} result upon completion.
* @param task the {@code Runnable} to execute (never {@code null})
* @return a {@code ListenableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
*/
ListenableFuture> submitListenable(Runnable task);
/**
* Submit a {@code Callable} task for execution, receiving a {@code ListenableFuture}
* representing that task. The Future will return the Callable's result upon
* completion.
* @param task the {@code Callable} to execute (never {@code null})
* @return a {@code ListenableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
*/
ListenableFuture submitListenable(Callable task);
}
このインターフェースで定義された方法は依然としてRunnableまたはCallableをパラメータとして受け入れるが、戻り値はListenable Futureであり、Futureインターフェースの拡張である.public interface ListenableFuture<T> extends Future<T> {
/**
* Register the given {@code ListenableFutureCallback}.
* @param callback the callback to register
*/
void addCallback(ListenableFutureCallback super T> callback);
/**
* Java 8 lambda-friendly alternative with success and failure callbacks.
* @param successCallback the success callback
* @param failureCallback the failure callback
* @since 4.1
*/
void addCallback(SuccessCallback super T> successCallback, FailureCallback failureCallback);
}
Futureインターフェースで定義されている機能の他に、コールバックを追加する機能も提供されています.これらのコールバックはFutureで完了した時点ですぐに呼び出されます.Aync Resoultを使って結果を返したら、addCallbackを実行する時に呼び出されます.// Service Async , ListenableFuture 。
@Async
public ListenableFuture doSomethingListenableAsync() {
AsyncResult asyncResult = new AsyncResult(42);
asyncResult.addCallback(new ListenableFutureCallback() {
public void onSuccess(Integer result) {
System.out.println(Thread.currentThread().getName() + " Callback, Result is: " + result);
}
public void onFailure(Throwable ex) {
System.err.println(ex.getMessage());
}
});
return asyncResult;
}
テストコード:@Test
public void testListenableAsync() throws InterruptedException, ExecutionException {
ListenableFuture future = service.doSomethingListenableAsync();
System.out.println(Thread.currentThread().getName() + " Result is: " + future.get());
}
最終的な出力はこうです.SimpleAsyncTaskExecutor-1 Callback, Result is: 42
main Result is: 42
デフォルトではなく、必要なExectorをどのように指定しますか?前に論じたqualiferという概念を通じて.関連コードを見てみます.
Aync Execution Interceptor
このクラスで:
/**
* This implementation is a no-op for compatibility in Spring 3.1.2. Subclasses may override to
* provide support for extracting qualifier information, e.g. via an annotation on the given
* method.
*
* @return always {@code null}
* @since 3.1.2
* @see #determineAsyncExecutor(Method)
*/
@Override
protected String getExecutorQualifier(Method method) {
return null;
}
互換性のため、このクラスはExector Qualfierの機能を得ることができませんでした.サブクラスの実現を希望するのではなく、AnnotationAsync ExectionInterceptorが担当して実現します.AnnotationAsync ExectionInterceptor
/**
* Return the qualifier or bean name of the executor to be used when executing the given method,
* specified via {@link Async#value} at the method or declaring class level. If {@code @Async} is
* specified at both the method and class level, the method's {@code #value} takes precedence
* (even if empty string, indicating that the default executor should be used preferentially).
*
* @param method the method to inspect for executor qualifier metadata
* @return the qualifier if specified, otherwise empty string indicating that the
* {@linkplain #setExecutor(Executor) default executor} should be used
* @see #determineAsyncExecutor(Method)
*/
@Override
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
したがって、Qualferを決定するロジックは@Asyncへの注釈の中でそのvalue属性の値を探すことです.ThreadPoolによるExectorを使用します.
では、どのようにデフォルトとは異なるExectorを定義しますか?それともJavaConfigを通しますか?
@Configuration
@EnableAspectJAutoProxy
@EnableAsync
public class SystemConfiguration {
// ...
@Bean(name = "tpExecutor")
public Executor getThreadPoolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("TPExecutor-");
executor.initialize();
return executor;
}
}
その後、該当する方法で属性値を持つ@Ayncを使用すれば良いです.@Async("tpExecutor")
public void doSomethingAsyncInTP() {
System.out.println(Thread.currentThread().getName() + ": Async Biz in TP...");
}
// :
@Test
public void testAsyncInTP() {
service.doSomethingAsyncInTP();
}
従って、サービスの必要に応じて一連のスレッド池を配置し、方法のトラフィックタイプに応じて、具体的な実行作業を対応するスレッド池に割り当てることができる.したがって、メールの送信、ショートメッセージの検証コードの送信、リモートAPIの呼び出しなど、いくつかの時間のかかるタスクを完了します.おわりに
本論文では、以下の図のタイプとそれらの間の階層関係を紹介します.
Aync ExectionInterceptorとそのサブタイプAnnotationAcExectionInterceptorの機能と使い方を紹介しました.@Ayncに協力して解決方法の実行の非同期化を注釈します.
次の文章では、もう一つのInterceptor:ConcerencyThrottleInterceptorを紹介します.