『Java同時プログラミング』の5:スレッドのキャンセルとクローズ

15498 ワード

詳細
Javaはスレッドを安全に終了するメカニズムを提供していませんが、Thread.stopやsuspendなどの方法はこのようなメカニズムを提供しているが,これらの方法の使用は避けるべき深刻な欠陥がある.しかしJavaは、あるスレッドが別のスレッドの現在の作業を終了させることができるコラボレーションメカニズムであるInterruptionを中断するメカニズムを提供しています.
このコラボレーション方式は必要であり、共有されたデータ構造が一致しない状態にあるため、タスクスレッドまたはサービスが直ちに停止することを望んでいません.逆に、タスクとサービスを作成するときにコラボレーションを使用できます.停止が必要な場合は、現在実行中の作業を消去してから終了します.
 
7.1タスクのキャンセル
外部コードが正常に完了する前に完了状態にすることができる場合、この操作はキャンセル可能なCancelableと呼ぶことができる.
コラボレーションメカニズムの1つは、キャンセルフラグCancellation Requestedフラグを設定し、タスクは定期的にフラグを表示することです.
@ThreadSafe
public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @GuardedBy("this")
    private final List primes = new ArrayList();
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;
    }

    public synchronized List get() {
        return new ArrayList(primes);
    }

    static List aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
}

JavaのAPIや言語仕様では、割り込みはいかなる意味にも関連付けられていないが、実際には、キャンセル以外の他の操作で割り込みを使用すると、不適切であり、より大きなアプリケーションをサポートすることは困難である.
 
各スレッドにはbooleanタイプの割り込み状態があります.しかし、スレッドを中断すると、このスレッドの中断状態はtrueに設定されます.
 
Threadの3つの方法:
public void interrupt()スレッドを中断
public boolean isInterrupted()割り込みフラグを取得し、割り込みの有無を判断する
public static boolean interrupted()は、割り込み状態を明確にし、その前の状態値を返します.
 
スレッドがブロック状態で中断するとInterruptedException(例えばThread.sleep(), Thread.wait(), Thread.join()などの方法.
スレッドが非ブロック状態で中断されると、その中断状態が設定され、割り込み状態のチェックに基づいて割り込みの有無が判断されます.
 
interruptを呼び出すことは、ターゲットスレッドが進行中である作業を直ちに停止することを意味するものではなく、要求中断のメッセージを伝えるだけであり、言い換えれば、スレッドのisInterruptedフラグフィールドのみが変更される.
 
通常、中断時にキャンセルを実現する最も合理的な方法です.
public class PrimeProducer extends Thread {
    private final BlockingQueue queue;

    PrimeProducer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }

    public void cancel() {
        interrupt();
    }
}

 
7.1.3応答割り込み
スレッド割り込みポリシーを実装したコードのみが割り込み要求をブロックすることができ、通常のタスクおよびライブラリコードでは割り込み要求をブロックするべきではありません.
2つの方法は、割り込みに応答します.
*転送異常InterruptedException
*割り込み状態を回復し、呼び出しスタックの上位レベルのコードを処理できるようにする
キャンセル不可能なタスク終了前に割り込みフラグを復元
public class NoncancelableTask {
    public Task getNextTask(BlockingQueue queue) {
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return queue.take();
                } catch (InterruptedException e) {
                    interrupted = true;
                    // fall through and retry
                }
            }
        } finally {
            if (interrupted)
                Thread.currentThread().interrupt();
        }
    }

    interface Task {
    }
}
 
7.1.5タイミングタスク、Futureによるキャンセル:
スレッドの割り込みポリシーが分からない限り、スレッドを割り込まないでください.神馬の場合にcancelを呼び出すと、パラメータをtrueに指定できます.
タスクのスレッドが標準のExecutorによって作成されている場合は、mayInterruptIfRunningを設定できます.
public class TimedRun {
    private static final ExecutorService taskExec = Executors.newCachedThreadPool();

    public static void timedRun(Runnable r, long timeout, TimeUnit unit)
            throws InterruptedException {
        Future> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // task will be cancelled below
        } catch (ExecutionException e) {
            // exception thrown in task; rethrow
            throw launderThrowable(e.getCause());
        } finally {
            // Harmless if task already completed
            task.cancel(true); // interrupt if running
        }
    }
}
 
7.1.6中断不可能なブロックの処理
これらのスレッドの場合、割り込み要求はスレッドの割り込み状態を設定するしかなく、それ以外に何の役にも立たない.
これらのスレッドを停止するために同様の割り込み手段を用いることができるが,これはスレッドのブロックの原因を知る必要がある.
newTaskForを使用して、非標準のキャンセル操作を1つのタスクにカプセル化します.
 
public abstract class SocketUsingTask implements CancellableTask {
    @GuardedBy("this")
    private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }

    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
        }
    }

    public RunnableFuture newTask() {
        return new FutureTask(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}


interface CancellableTask extends Callable {
    void cancel();

    RunnableFuture newTask();
}
 
7.2スレッドベースのサービスを停止する
アプリケーションは通常、複数のスレッドを持つサービスを作成します.アプリケーションが終了する準備ができている場合、javaはスレッドを停止するプリエンプトメソッドがないため、自分で終了する必要があります.
正しいカプセル化の原則は、スレッドが1つある場合を除き、スレッドを中断したり優先度を変更したりするなど、スレッドを操作しないことです.
 
スレッドには対応する所有者、すなわちスレッドのクラスが作成されるため、スレッドプールはその作業者スレッドの所有者であり、スレッドを中断する場合は、スレッドプールを使用して中断する必要があります.
スレッドの所有権は伝達できません.サービスは、ライフサイクルメソッドLifecycle Methodを提供して、独自のスレッドと所有するスレッドを閉じる必要があります.これにより、アプリケーションがサービスを閉じると、サービスはすべてのスレッドを閉じることができます.ExecutorServiceではshutdownやshutdown Nowなどの方法が提供されており、同様に他のスレッドを持つサービス方法でも同様のクローズメカニズムが提供されるべきである.
Tips:スレッドを持つサービスについて、サービスの存在時間がスレッドを作成する方法の存在時間より大きい限り、ライフサイクルメソッドを提供する必要があります.
 
7.2.1例:ログサービス
私たちは通常、アプリケーションにlog情報を追加します.一般的なフレームワークはlog 4 jです.しかし、このようなインライン・ログ機能は、高容量のHighvolumeアプリケーションにパフォーマンスのオーバーヘッドをもたらします.もう1つの代替方法は、logメソッドを呼び出してログメッセージをキューに入れ、他のスレッドで処理することである.
public class LogService {
    private final BlockingQueue queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    @GuardedBy("this")
    private boolean isShutdown;
    @GuardedBy("this")
    private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown)
                throw new IllegalStateException(/*...*/);
            ++reservations;
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0)
                                break;
                        }
                        String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

 
7.2.2 ExecutorServiceでシャットダウン
単純なプログラムは、main関数でグローバルなExecutorServiceを直接起動および閉じることができますが、複雑なプログラムでは、通常、ExecutorServiceはより高いレベルのサービスにカプセル化され、そのサービスは独自のライフサイクルメソッドを提供します.次に、ExecutorServiceを使用して、上記のログ・サービスを再構築します.
public class LogService {
    public void stop() throws InterruptedException {
        try {
            exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT);
        }
    }
}

 
7.2.3 Poison PillオブジェクトによるProducer-Consumerサービスのクローズ
 
7.2.5スレッドプールを閉じると、まだ開始していないタスクデータと開始後にキャンセルされたタスクデータを保存して、後で再処理する準備をします.次はウェブページの爬虫プログラムです.爬虫サービスを閉じると、開始していないページとキャンセルしたページのURLがすべて記録されます.
public abstract class WebCrawler {
    private volatile TrackingExecutor exec;
    @GuardedBy("this")
    private final Set urlsToCrawl = new HashSet();

    private final ConcurrentMap seen = new ConcurrentHashMap();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = MILLISECONDS;

    public WebCrawler(URL startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
        } finally {
            exec = null;
        }
    }

    protected abstract List processPage(URL url);

    private void saveUncrawled(List uncrawled) {
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    }

    private void submitCrawlTask(URL u) {
        exec.execute(new CrawlTask(u));
    }

    private class CrawlTask implements Runnable {
        private final URL url;

        CrawlTask(URL url) {
            this.url = url;
        }

        private int count = 1;

        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        void markUncrawled() {
            seen.remove(url);
            System.out.printf("marking %s uncrawled%n", url);
        }

        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            return url;
        }
    }
}

 
public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

 
7.3処理が正常でないスレッドの終了
未取得の例外を処理するには、アプリケーションにUncaughtExceptionHandler例外プロセッサを指定します.
public class UEHLogger implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
    }
}

放出された例外を未キャプチャ例外プロセッサに渡すには、executeでコミットされたタスクのみが必要です.サブミットによってコミットされたタスクは、未検査の例外を投げ出しても検査された例外を投げ出しても、タスクの戻り状態の一部とみなされます.
 
7.4 JVMが閉じる時に閉じるフックを提供する
JVMが正常にシャットダウンされると、JVMはまず登録されているすべてのシャットダウンフックShutdown Hookを呼び出します.フックを閉じると、一時ファイルの削除やオペレーティングシステムで自動的に消去できないリソースの消去など、サービスまたはアプリケーションのクリーンアップ作業を実現できます.
ベストプラクティスは、すべてのサービスに対して同じクローズフックを使用し、このクローズフックで一連のクローズ操作を実行することである.これにより、クローズ操作が単一スレッドでシリアルに実行されることを保証し、競合条件の発生やデッドロックの問題を回避します.
Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        try{LogService.this.stop();} catch(InterruptedException) {..}
    }
})

 
まとめ:タスク、スレッド、サービス、アプリケーションなどのモジュールでのライフサイクル終了の問題は、設計と実装の複雑さを増す可能性があります.FutureTaskとExecutorのフレームワークを活用することで、キャンセル可能なタスクとサービスの構築を支援します.
 
ブログの新しいアドレス:http://yidao620c.github.io