[Spring]同期並列処理で@AsyncとDeferredResultを使ってみた


はじめに

業務内で同期並列処理を実現するためにSpringの@AsyncDeferredResultをさわる機会がありました。
具体的には、ブラウザ操作におけるイベント処理で複数を並行して行い、処理結果を同期的にクライアントへ返却するというものです。一般的にもHTTPリクエスト時やブラウザ操作時における同期/非同期での並列処理は需要が高く、注目されている分野ではないかと考えられます。
本記事では@AsyncDeferredResultの利用シーンを例に取り上げ、メリットやはまりポイントなどを説明していきます。

利用シーン

@​AsyncやDeferredResultを使用するにあたって、次のようなケースを想定します。

  • 言語はJavaで、Spring Frameworkを使用したい
  • 複数のクライアントから同時リクエストが来た場合に同時実行数を制御したい
    • 5多重まで許容したい
    • 多重度を超えるリクエストに対してはエラー応答したい
    • 制御時間は10ms以内に抑えたい
  • クライアントへのレスポンスは同期的に返却したい

@​Async・DeferredResultの利用メリット

例としてあげた利用シーンで@​Async・DeferredResultを活用するメリットを説明します。

@​Asyncの場合

  • SpringのThreadPoolTaskExecutorの仕組みを用いてスレッドプールを利用することができるため、
    同時実行数制御の実装が容易に行える。
  • 非同期処理の動作を個別にカスタマイズすることができるため、拡張性の高い実装が行える。
  • 非同期処理中に発生した例外は、ExceptionResolverによってハンドリングされるため、
    例外ハンドリングの実装が容易に行える。

DeferredResultの場合

  • 非同期処理スレッドからの戻り値を生成することができるため、
    レスポンスの際に@​Asyncでの非同期処理結果を呼び出し元へ返却することができる。
  • 非同期処理が終了するまで、HTTPリクエストに対するレスポンス処理を延期することができる。

同期並列処理を実装してみる...

@​AsyncとDeferredResultを用いた同期並列処理の実装例を次に示します。

Application.java
/**
 * アプリケーション起動クラス。
 */
@EnableAsync
@SpringBootApplication
public class Application {

    /**
     * アプリケーション起動用のmainメソッド。
     *
     * @param args 使用しません
     */
    public static void main(String[] args) {
        ApplicationContext ctx = SpringApplication.run(Application.class, args);
    }

    /**
     * Bean定義
     */
    @Bean
    @Qualifier("sleepTask")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(0);
        return executor;
    }
}

setCorePoolSize()とsetMaxPoolSize()の違いはこちらをご参照下さい。
つまり、違いは次の通りです。

corePoolSize - アイドルであってもプール内に維持されるスレッドの数
maximumPoolSize - プール内で可能なスレッドの最大数

TaskRestController.java
/**
 * クライアントからのリクエストを処理するコントローラクラス。
 */
@Slf4j
@RestController
public class TaskRestController {
    // 非同期処理実行クラス
    @Autowired
    private TaskServiceImpl taskService;

    // 非同期処理結果用の変数
    private DeferredResult<String> result;

    /**
     * 非同期処理実行メソッド
     * @return タスク実行結果
     */
    @RequestMapping(value = "/tasks", method = RequestMethod.GET)
    public DeferredResult<String> executeTask() {
        final long start = System.currentTimeMillis();

        // 結果(タイムアウトは8秒)
        result = new DeferredResult<>(8000L);
        taskService.executeSleep(result, start);

        return result;
    }

    /**
     * タスク実行数オーバー時のエラーハンドル用メソッド
     * @return エラー時出力文字列
     */
    @ExceptionHandler(TaskRejectedException.class)
    @ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
    public String handleTaskRejectedException() {
        log.error("task - " + (taskService.taskNum + 1) + " : error.");
        return "too busy";
    }

    /**
     * タスク実行でのタイムアウト時のエラーハンドル用メソッド
     */
    @ExceptionHandler(AsyncRequestTimeoutException.class)
    @ResponseStatus(HttpStatus.REQUEST_TIMEOUT)
    public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
        return "timeout";
    }
}

※実装内の例外ハンドリングはControllerに記述していますが、
実際には@​ControllerAdviceを利用する方が良いです。

TaskServiceImpl.java
/**
 * 非同期処理実行用サービスクラス
 */
public class TaskServiceImpl {
    public static int taskNum = 0;

    /**
     * 5秒スリープする非同期処理実行メソッド
     * @param deferredResult    非同期処理結果の格納用変数
     * @param start             実行時間計測開始時間
     */
    @Async("sleepTask")
    @Transactional
    public void executeSleep(final DeferredResult<String> deferredResult, final long start) {
        try {
            taskNum++;
            final int taskNumber = taskNum;

            log.info("start task - " + taskNumber);

            TimeUnit.SECONDS.sleep(5);
            deferredResult.setResult("task - " + taskNumber + " finished. ");

            log.info("end task - " + taskNumber);
            log.info("task - " + taskNumber + " : " + (System.currentTimeMillis() - start - 5000L) + "ms");
        } catch (Exception e) {
            deferredResult.setErrorResult(e);
        }
    }

}

今回は簡易版のため実装していませんが、本来であればdeferredResult.setResult()deferredResult.setErrorResult()は戻り値がtrueかfalseかを確認する必要があります。
対象のdeferredResult変数に対して2回以上setResult()またはsetErrorResult()が行われた場合にfalseとなります。

はまった箇所

実装したものを実行していく中で不思議に感じた箇所がありました。
それは上記TaskRestControllerexecuteTask()メソッドでの処理は完了しているのにも関わらず、
クライアント側へは結果が返却されておらず、TaskServiceImplexecuteSleep()メソッドでの処理が完了した後に、クライアント側へ結果が返却されるという点です。
コントローラからクライアントへはどのように結果が返されたのでしょうか?という疑問です。
疑問を解決するためにはDeferredResultに関する理解を深める必要があります。

調査① DeferredResultに関するAPIリファレンスを読んでみる

APIリファレンスはこちらです。
DeferredResultを使用することによって内部的にどのような処理が行われているのかAPIリファレンスだけでは不明です。

調査② 実行時のログを再度確認してみる

どうやらクライアントからのリクエスト時に次のようなログが発生していることがわかります。

実行ログ
[nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
[nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 15 ms

DispatcherServletとはなにものなのでしょうか

調査③ Springの公式ドキュメントでdispatcherServletを調査

DispatcherServletに関して説明が記載されていました。
参考にしたページはこちらです。
この文書にも記載の通り、SpringはユーザからのリクエストやユーザへのレスポンスをDispatcherServlet(= Front controller)を介して行っているわけですね。つまりアプリケーション側のコントローラがDeferredResultを返却した際にdispatcherServletが何かしらの処理を行っていると予想できます。

調査④ 世の中の文書をあさります

DeferredResult使用時のSpring MVCにおける内部処理フローについて記載されている文書がありました。
次のシーケンス図が非常にわかりやすいです。

引用:http://callistaenterprise.se/blogg/teknik/2014/04/22/c10k-developing-non-blocking-rest-services-with-spring-mvc/

アプリケーションのコントローラからDeferredResultが返却されると、
DispatcherServletは呼び出し元へのレスポンスをオープンしたままリクエスト処理スレッドを中断します。
そして、DeferredResultに結果が設定される(setResult()メソッドの処理が行われる)と、
アイドル状態だったDispatcherServletは後続処理を再開するという処理フローであることが分かります。
解決!

(まとめ)@​Async・DeferredResult利用による恩恵

ここまでを読んでて頂くと、わざわざ@​AsyncやDeferredResultを使用しないで同期処理の実装を行えばいいのではないかと思われるかもしれません。もう一度、利用シーンの内容を思い出して頂きたいのですが、利用シーンではクライアントへのレスポンスは同期的に返却したいとありました。それに対して、DispatcherServletやサーブレットコンテナはできる限りリソースを有効活用したいため、処理は非同期で行って欲しいと考えます。
そのような場合に@​AsyncやDeferredResultを使用することで、非同期で処理が行われるためリソースを有効活用することができ、呼び出し元のクライアントへは同期的に結果を返却することができるようになります。

番外編

上記実装での実行時間を計測した結果はこちらです。(アプリケーション起動時引数には-serverを指定)
平均して10ms以内に抑えられていることがわかります。

- 単体実行の場合 : 
FrameworkServlet 'dispatcherServlet': initialization completed in 21 ms
start task - 1
end task - 1
task - 1 : 22ms
start task - 2
end task - 2
task - 2 : 1ms
start task - 3
end task - 3
task - 3 : 2ms
start task - 4
end task - 4
task - 4 : 3ms
start task - 5
end task - 5
task - 5 : 2ms

dispatcherServlet起動時間を含む場合 : 6(ms)/1回実行
dispatcherServlet起動時間を含まない場合 : 1.8(ms)/1回実行

- 5連続実行の場合:

FrameworkServlet 'dispatcherServlet': initialization completed in 19 ms
start task - 1
start task - 2
start task - 3
start task - 4
start task - 5
end task - 1
task - 1 : 26ms
end task - 2
task - 2 : 3ms
end task - 3
task - 3 : 3ms
end task - 4
task - 4 : 2ms
end task - 5
task - 5 : 3ms

dispatcherServlet起動時間を含む場合 : 7.4(ms)/1回実行
dispatcherServlet起動時間を含まない場合 : 3.6(ms)/1回実行