Javaスレッド、スレッドプールでキャプチャされた異常は、私に速く現れます.

5365 ワード

前言
Java開発の仲間たちは、開発、デバッグ、ユニットテストの過程でスレッドプールの異常処理メカニズムに頭を痛めているに違いない(私は異常が全然見えず、スレッドプールは直接異常を捕獲し、このスレッドに死刑を言い渡した).同時タスクでは、スレッドプールを導入する必要がありますが、オンラインになる前にコードの完璧さを保証することはできません.テストを続ける必要があります.では、スレッドプールでキャプチャされた異常を避けることはできません.
通常スレッドの例外
一般的なスレッドといえば,スレッドがタスクを実行するrun()メソッドに置かれるコードを考慮する.
public interface Runnable {
    public abstract void run();
}

ではrun()メソッドにRuntimeExceptionが投げ出されたらどうなりますか?通常java.lang.Threadオブジェクトは、デフォルトの例外処理方法を設定します.
java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)

このデフォルトの静的グローバルの例外キャプチャメソッドは、スタックを出力します.もちろん、このデフォルトのインプリメンテーションは上書きできます.カスタムjava.lang.Thread.UncaughtExceptionHandlerインタフェースでインプリメンテーションするだけです.
public interface UncaughtExceptionHandler {
    void uncaughtException(Thread t, Throwable e);
}

スレッドプールの例外
スレッドプールでは特殊です.デフォルトでは、スレッドプールjava.util.concurrent.ThreadPoolExecutorはすべての例外をキャッチし、タスクの実行が完了すると(java.util.concurrent.ExecutorServices.submit(Callable))結果が取得されます(java.util.concurrent.Future.get()はこのRuntimeExceptionを放出します.
/**
 * Waits if necessary for the computation to complete, and then
 * retrieves its result.
 *
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an exception
 * @throws InterruptedException if the current thread was interrupted while waiting
 */
V get() throws InterruptedException, ExecutionException;

ここでExecutionException異常はjava.lang.Runnableまたはjava.util.concurrent.Callableが投げ出した異常です.すなわち、スレッドプールは、タスクの実行中にすべての例外をキャプチャし、その例外を結果に追加します.これにより、スレッドプール内のすべてのスレッドは、放出された例外をキャプチャできません.スレッドのデフォルトのキャプチャメソッドを設定することでブロックできないエラー例外.カスタムスレッドによって例外のブロックを完了することも異なります.幸いjava.util.concurrent.ThreadPoolExecutorは、タスクの実行後に拡張するためのメソッドを予約しています(もちろん、protectedメソッドbeforeExecute(Thread t,Runnable r)も予約しています):
protected void afterExecute(Runnable r, Throwable t) { } 

このメソッドのデフォルト実装は空です.これにより、ThreadPoolExecutorを継承または上書きすることで、カスタムエラー処理を達成できます.解決策は次のとおりです.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //
        new ArrayBlockingQueue(10000),//
        new DefaultThreadFactory()) {

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        printException(r, t);
    }
};

private static void printException(Runnable r, Throwable t) {
    if (t == null && r instanceof Future>) {
        try {
            Future> future = (Future>) r;
            if (future.isDone())
                future.get();
        } catch (CancellationException ce) {
            t = ce;
        } catch (ExecutionException ee) {
            t = ee.getCause();
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt(); // ignore/reset
        }
    }
    if (t != null)
        log.error(t.getMessage(), t);
}

この方法の鍵は、afterExecuteが常に異常Throwable tを投げ出すわけではないことであり、ソースコードを表示することによって、異常はこのときのFutureオブジェクトにカプセル化されていることがわかります.このFutureオブジェクトはjava.util.concurrent.FutureTaskの実装であり、デフォルトのrunメソッドが実際に呼び出されたjava.util.concurrent.FutureTask.Sync.innerRun()です.
void innerRun() {
    if (!compareAndSetState(0, RUNNING))
        return;
    try {
        runner = Thread.currentThread();
        if (getState() == RUNNING) // recheck after setting thread
            innerSet(callable.call());
        else
            releaseShared(0); // cancel
    } catch (Throwable ex) {
        innerSetException(ex);
    }
}

void innerSetException(Throwable t) {
    for (;;) {
        int s = getState();
        if (s == RAN)
            return;
        if (s == CANCELLED) {
            // aggressively release to set runner to null,
            // in case we are racing with a cancel request
            // that will try to interrupt runner
            releaseShared(0);
            return;
        }
        if (compareAndSetState(s, RAN)) {
            exception = t;
            result = null;
            releaseShared(0);
            done();
            return;
        }
    }
}

ここでは、java.util.concurrent.FutureTask.Syncのexceptionフィールドに異常が格納されていることがわかります.
/** The exception to throw from get() */
private Throwable exception;

非同期実行の結果を取得するとjava.util.concurrent.FutureTask.get()
public V get() throws InterruptedException, ExecutionException {
    return sync.innerGet();
}

java.util.concurrent.FutureTask.Sync.innerGet()
V innerGet() throws InterruptedException, ExecutionException {
    acquireSharedInterruptibly(0);
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}

異常はExecutionException異常放出としてパッケージされます.つまり、スレッドプールThreadPoolExecutor(java.util.concurrent.ExecutorService)がタスクをコミットしたい場合、タスク結果(Feture.get()を無視すると、この異常はスレッドプールに食べられます.
 Future submit(Callable task);
Future> submit(Runnable task);

java.util.concurrent.ScheduledThreadPoolExecutorはThreadPoolExecutorを継承しているので、状況は似ています.
まとめ
ThreadPoolExecuter.afterExecuteメソッドを上書きすることで、タスクの例外(RuntimeException)をキャプチャできます.