FutureTaskサブタスク実行をキャンセルした状態判断

13801 ワード

サンプルコードはgithubから取得できますhttps://github.com/git-simm/simm-framework.git
一、業務シーン:
システムには、トランザクションの実行時に複数のシーンが同時にロックされ、タスクが蓄積され、システムがクラッシュします.各シーンビジネスのパフォーマンス調整を先に行いましたが、同時インターロックは避けられません.そこで,頻繁に呼び出される同期機能をデッドロックの犠牲者として選択し,実行をキャンセルし,ロックを解放することを考え始めた.
 
二、処理方案:
ここではFutureTaskを優先する.cancelスキーム.FutureTaskのgetメソッドを呼び出すと,タイムアウト時間が長くなるというのが核心思想である.タイムアウト異常を受信したらcancelメソッドを呼び出し、スレッドを中断します.もちろん、実際に見ると、この案も私のビジネスニーズを満たすことはできません.次の2つの制限があります.
  • cancelメソッドは、サブスレッドに割り込み要求を開始するだけで、割り込むことができるかどうかは、サブスレッド自体に依存し、サブスレッドがどのステップで終了するかは判断できません.有効なトランザクションが追加されます.このトランザクションはロールバックされたり、コミットに成功したりする可能性があります.したがって、synchronized機能を借りて、親子スレッドを通信させ、サブスレッドの実行状態を明確にする必要があります.
  • サブスレッドでデータベース操作が実行され、デッドロック待機が発生します.この場合、cancle操作はタスクをキャンセルすることはできません.トランザクションがタイムアウトするまで待つしかありません.この問題はcancelではスレッドを強制的に閉じることができないため、FutureTaskスキームは使用できません.

  • 以下のインプリメンテーションは依然としてFutureTaskというスキームをめぐって行われ,親子スレッド通信を追加するだけで,サブスレッド状態を明確に取得するインプリメンテーションである.
    三、コード実現:
    3.1.FTaskEndFlagのスレッド同期フラグを作成する.親スレッドは、サブスレッドが実行結果をフィードバックするのを待ってから、後続の論理を実行します.
    package simm.framework.threadutils.interrupt;
    
    import java.util.concurrent.TimeoutException;
    /**
     * futuretask 
     * 2018.09.22 by simm
     */
    public class FTaskEndFlag {
        private volatile boolean isNormaled = false;
        private volatile boolean fired = false;
        private Exception exception =null;
    
        public boolean isNormaled() {
            return isNormaled;
        }
    
        /**
         *  
         * @return
         */
        public Exception getException() {
            return exception;
        }
    
        /**
         *  
         * @param result
         * @param result
         */
        public synchronized void notifyEnd(boolean result){
            isNormaled = result;
            fired = true;
            notifyAll();
        }
    
        /**
         *  
         * @param result
         * @param result
         */
        public synchronized void notifyEnd(boolean result,Exception ex){
            isNormaled = result;
            exception = ex;
            fired = true;
            notifyAll();
        }
    
        /**
         *  
         */
        public synchronized void waitForEnd() throws InterruptedException {
            while (!fired) {
                // , synchronized 
                wait();
            }
        }
       /**
         *  
         */
        private void waitFunc(long millis){
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    3.2、BaseFutureTaskの抽象クラスを作成し、FTaskEndFlagスレッド同期フラグを内蔵する.
    package simm.framework.threadutils.interrupt;
    
    import java.util.concurrent.Callable;
    
    /**
     *  
     * 2018.09.22 by simm
     */
    public abstract class BaseFutureTask implements Callable {
        /**
         * futuretask  
         */
        private FTaskEndFlag flag = new FTaskEndFlag();
    
        public FTaskEndFlag getFlag() {
            return flag;
        }
    }

    3.3、タイムアウト再試行のツールクラスを作成し、FutureTaskの結果に対してタイムアウト時間の設定を取得する;
    package simm.framework.threadutils.interrupt;
    
    import java.lang.reflect.Constructor;
    import java.util.List;
    import java.util.concurrent.*;
    /**
     *  
     * 2018.09.20  by simm
     */
    public class RetryUtil {
        /**
         *  ( jvm )
         */
        private static ExecutorService executorService = Executors.newCachedThreadPool();
    
        /**
         *  (3 , 3 )
         * @param callable
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         * @throws TimeoutException
         */
        public static Boolean execute(BaseFutureTask callable) throws InterruptedException, ExecutionException, TimeoutException {
            return execute(callable,3000,1000,3);
        }
    
        /**
         *  
         * @param callable  
         * @param timeout  
         * @param interval  
         * @param retryTimes  
         * @return
         * @throws ExecutionException
         * @throws InterruptedException
         * @throws TimeoutException
         */
        public static Boolean execute(BaseFutureTask callable, long timeout,long interval, int retryTimes) throws ExecutionException, InterruptedException, TimeoutException {
            Boolean result = false;
            FutureTask futureTask = new FutureTask<>(callable);
            executorService.execute(futureTask);
            try {
                result = futureTask.get(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                futureTask.cancel(true);
                throw e;
            }catch(TimeoutException e){
                futureTask.cancel(true);
                callable.getFlag().waitForEnd();
                if(callable.getFlag().isNormaled()){
                    return true;
                }
                e.printStackTrace();
                // 
                retryTimes--;
                if(retryTimes > 0){
                    Thread.sleep(interval);
                    execute(callable,timeout,interval,retryTimes);
                }else{
                    throw e;
                }
            }
            return result;
        }
    }

    四、呼び出しコードを与える.BaseFutureTaskから継承されたFutureTaskタスクを実装します.サブスレッドはspringのコンポーネントに関連しており、パラメータはプライマリスレッドからサブスレッドに注入されることが望ましい.
    RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);

     
    参考記事
    https://www.jianshu.com/p/55221d045f39