利用するstop完了メソッド実行タイムアウト割り込み

20486 ワード

サンプルコードはgithubから取得できますhttps://github.com/git-simm/simm-framework.git
上のブログ「FutureTaskサブスレッド実行をキャンセルした状態判断」に続きます
一、業務シーン:
システムには、トランザクションの実行時に複数のシーンが同時にロックされ、タスクが蓄積され、システムがクラッシュします.各シーンビジネスのパフォーマンス調整を先に行いましたが、同時インターロックは避けられません.そこで,頻繁に呼び出される同期機能をデッドロックの犠牲者として選択し,実行をキャンセルし,ロックを解放することを考え始めた.
 
二、処理方案:
なぜならcancelスキームでは、データベースのデッドロックの問題を解決できません.そこで次にシナリオを切り替える、Threadに変更する.stopは実現し,サブスレッドを強引に殺す.核心思想はモニタリングが必要な操作をサブスレッドに包装し、起動後、メインスレッドは運行時間モニタリングを開き、サブスレッドが期限を超えたと判断した場合、stop方法を呼び出し、強引に中断する(説明する必要があり、stop操作は知らない異常状態をもたらす可能性があり、jdkはすでにそれを時代遅れとマークし、使用を提案しないが、ここでは個人が自分で方案の影響を評価する必要がある). 
では、stopサブスレッドを1つだけ操作すると、データベースへのロックを同時に解決できますか?これは不可能であることが確認されました.データベースのロックは、トランザクションのコミットまたはロールバック時に解放する必要があります.トランザクションはspringフレームワークによって自己管理され、デッドロックが発生し、サブスレッドが終了していない場合、トランザクションはタイムアウト時間になっても、新しいsql実行文がタイムアウトチェックをトリガーしないため、springトランザクションは自己終了できません.終了ポイントは、データベース・トランザクション・ロックの待機タイムアウトです.したがって、トランザクションを早期にロールバックするには、@Transactionalのtimeout時間を設定し、短い時間に指定する必要があります.サブスレッドがstopされると、springは自動的にトランザクションをロールバックし、データベースのロック解除の目的を達成します.
 
三、コード実現:
3.1.FTaskEndFlagのスレッド同期フラグを作成する.親スレッドは、サブスレッドが実行結果をフィードバックするのを待ってから、後続の論理を実行します.
package simm.framework.threadutils.interrupt;

import java.util.concurrent.TimeoutException;
/**
 * futuretask 
 * 2018.09.22 by simm
 */
public class FTaskEndFlag {
    private volatile boolean isNormaled = false;
    private volatile boolean fired = false;
    private Exception exception =null;

    public boolean isNormaled() {
        return isNormaled;
    }

    /**
     *  
     * @return
     */
    public Exception getException() {
        return exception;
    }

    /**
     *  
     * @param result
     * @param result
     */
    public synchronized void notifyEnd(boolean result){
        isNormaled = result;
        fired = true;
        notifyAll();
    }

    /**
     *  
     * @param result
     * @param result
     */
    public synchronized void notifyEnd(boolean result,Exception ex){
        isNormaled = result;
        exception = ex;
        fired = true;
        notifyAll();
    }

    /**
     *  
     */
    public synchronized void waitForEnd() throws InterruptedException {
        while (!fired) {
            // , synchronized 
            wait();
        }
    }
    /**
     *  
     */
    public void waitForEnd(Thread thread,Long timeout) throws TimeoutException {
        long begin = System.currentTimeMillis();
        while(System.currentTimeMillis()-begin <= timeout){
            waitFunc(10);
            //
            if(fired) return;
        }
        //
        try{
            thread.stop();
        }catch(Exception e){
            e.printStackTrace();
            throw e;
        }
        throw new TimeoutException(" :"+timeout);
    }
    /**
     *  
     */
    private void waitFunc(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.2、BaseThreadの抽象クラスを作成し、FTaskEndFlagスレッド同期フラグを内蔵する.
package com.zhaogang.ii.biz.threads.future;

/**
 *  
 */
public abstract class BaseThread extends Thread {
    /**
     * futuretask  
     */
    private FTaskEndFlag flag = new FTaskEndFlag();

    public FTaskEndFlag getFlag() {
        return flag;
    }
}

3.3、タイムアウト再試行のツールクラスを作成し、FutureTaskの結果に対してタイムアウト時間の設定を取得する;
package com.zhaogang.ii.biz.threads.future;

import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.*;
/**
 *  
 * 2018.09.20  by simm
 */
public class RetryUtil {/**
     *  (3 , 3 )
     * @param syncThread
     * @param params
     * @param 
     * @return
     * @throws Exception
     */
    public static extends BaseThread> Boolean execute(Class syncThread, List params) throws Exception {
        return execute(syncThread,params,3000,1000,3);
    }

    /**
     *  
     * @param syncThread  
     * @param params  
     * @param timeout
     * @param interval
     * @param retryTimes
     * @param 
     * @return
     * @throws Exception
     */
    public static extends BaseThread> Boolean execute(Class syncThread, List params, long timeout, long interval, int retryTimes) throws Exception {
        Boolean result = false;
        try{
            // 
            Class[] parameterTypeArrs = new Class[params.size()];
            for(int i=0;i){
                Class c =  params.get(i).getClass();
                if(c.getName().indexOf("$$")>0){
                    String clsName = c.getName().substring(0,c.getName().indexOf("$$"));
                    parameterTypeArrs[i] = Class.forName(clsName);
                }else{
                    parameterTypeArrs[i] = c;
                }
            }
            // 
            Constructor constructor= syncThread.getConstructor(parameterTypeArrs);
            // 
            Object[] parameters= params.toArray();
            //
            BaseThread processor = (BaseThread) constructor.newInstance(parameters);
            processor.start();
            // 
            processor.getFlag().waitForEnd(processor,timeout);
            boolean r = processor.getFlag().isNormaled();
            if(!r){
                throw processor.getFlag().getException();
            }
            return processor.getFlag().isNormaled();
        }catch (TimeoutException e) {
            // 
            retryTimes--;
            if(retryTimes > 0){
                System.out.println(" :"+retryTimes);
                Thread.sleep(interval);
                execute(syncThread,params,timeout,interval,retryTimes);
            }else{
                throw e;
            }
        }
        return result;
    }
}

3.4、サブスレッドトランザクションのタイムアウト時間を設定する.
    @Transactional(timeout = 3)
    public void syncProduct(ProductDetailinfo productInfo) {

四、呼び出しコードを与える.BaseFutureTaskから継承されたFutureTaskタスクを実装します.サブスレッドはspringのコンポーネントに関連しており、パラメータはプライマリスレッドからサブスレッドに注入されることが望ましい.ダイナミックエージェントオブジェクトが渡されることを保証します.
package interrupt;

import simm.framework.threadutils.interrupt.BaseThread;

public class SyncProductThread extends BaseThread {
    private ProductBiz productBiz;
    private ProductDetailinfo productInfo;
    /**
     *  
     * @param productBiz
     * @param productInfo
     */
    public SyncProductThread(ProductBiz productBiz, ProductDetailinfo productInfo){
        this.productBiz = productBiz;
        this.productInfo = productInfo;
    }
    @Override
    public void run() {
        boolean isNormaled = false;
        Exception exception = null;
        try{
            productBiz.syncProduct(productInfo);
            isNormaled = true;
        }catch(Exception e){
            e.printStackTrace();
            exception = e;
        }finally {
            // 
            super.getFlag().notifyEnd(isNormaled,exception);
        }
    }
}
/**
     *  
     * @param productInfo
     * @throws Exception
     */
    public void syncProductTimeout(final ProductDetailinfo productInfo) throws Exception {
        Long timeout = 3000L;
        Long interval = 1000L;
        RetryUtil.execute(SyncProductThread.class,Arrays.asList(productBiz,productInfo),timeout,interval,3);
        //RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);
    }

参考記事
https://www.jianshu.com/p/55221d045f39