JAVAスレッドプール:タイムアウト待ちスレッド呼び出し

13366 ワード

Executors.newSingleThreadExecutor()

Executors.newFixedThreadPool()

Executors.newCachedThreadPool()
            ,       (   OOM),    60     

Executors.newScheduledThreadPool()
  ScheduledExecutorService  

newFixedThreadPool()の適用
/**
 * Created by hurf on 2016/8/24.
 *       
 */
public class ThreadsUtil {
    public static ExecutorService executor = Executors.newFixedThreadPool(12);//          
        /**
     *         
     * @param callable
     * @return 
     *         :
     *     String result = ThreadsUtil.execInTime(new CallableImpl("  "));
     *     class CallableImpl implements Callable { private String key = ""; public CallableImpl(String key) {this.key = key;} ...}
     */
    public static  T execInTime(Callable callable) {
        Future future = executor.submit(callable);
        T t = null;
        try {
            t = future.get(CallAtomUtil.timeOutLen, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            String cuase = ObjectUtils.toString(e.getCause()).replace(toReplaceStr,",");
            String eMsg = "            ,      :【"+cuase+"】";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6787,eMsg, FrameworkConstants.BIZ_LVL);
        } catch (ExecutionException e) {
            String cuase = ObjectUtils.toString(e.getCause()).replace(toReplaceStr,",");
            String eMsg = "      ,      :【"+cuase+"】";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6787,eMsg, FrameworkConstants.BIZ_LVL);
        } catch (TimeoutException e) {
            String eMsg = "          ";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6789, eMsg, FrameworkConstants.BIZ_LVL);
        }
        return t;
    }

    /**
     *       
     * @param callable
     * @param timeOut     30000   30 
     * @param 
     * @return
     */
    public static  T execInTime(Callable callable,long timeOut){
        Future future = executor.submit(callable);
        T t = null;
        try {
            t = future.get(timeOut, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            String cuase = ObjectUtils.toString(e.getCause()).replace(toReplaceStr,",");
            String eMsg = "            ,      :【"+cuase+"】";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6787,eMsg, FrameworkConstants.BIZ_LVL);
        } catch (ExecutionException e) {
            String cuase = ObjectUtils.toString(e.getCause()).replace(toReplaceStr,",");
            String eMsg = "      ,      :【"+cuase+"】";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6787,eMsg, FrameworkConstants.BIZ_LVL);
        } catch (TimeoutException e) {
            String eMsg = "          ";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6789, eMsg, FrameworkConstants.BIZ_LVL);
        }
        return t;
    }

    /**
     *      
     * @param callable
     * @param 
     * @return
     */
    public static  T exec(Callable callable){
        Future future = executor.submit(callable);
        T t = null;
        try {
            t = future.get();
        } catch (InterruptedException e) {
            String cuase = ObjectUtils.toString(e.getCause()).replace(toReplaceStr,",");
            String eMsg = "            ,      :【"+cuase+"】";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6787,eMsg, FrameworkConstants.BIZ_LVL);
        } catch (ExecutionException e) {
            String cuase = ObjectUtils.toString(e.getCause()).replace(toReplaceStr,",");
            String eMsg = "      ,      :【"+cuase+"】";
            LogUtil.error(eMsg);
            future.cancel(true);
            throw new AtomException(-6787,eMsg, FrameworkConstants.BIZ_LVL);
        }
        return t;
    }

    /**
     *               
     * @param callable
     * @param 
     * @return
     */
    public static  T lookDb(Callable callable){
        if("1".equals(CallAtomUtil.remoteCallTimeOutFlag)){
            long timeOut = CallAtomUtil.timeOutLen+10000;
            return execInTime(callable,timeOut);
        }else {
            return exec(callable);
        }
    }
}

よびだし
List cs = ThreadsUtil.lookDb(new DbLook( maxCountFeild,  nowCountFeild,result, pMap, commParams, gathsn, cs));

//   
private class DbLook implements Callable>{
        private Uresult result;
        private Map pMap;
        private Map commParams;
        private String gathsn;
        private List cs;
        private String maxCountFeild;
        private String nowCountFeild;

        public DbLook(String maxCountFeild, String nowCountFeild,Uresult result, Map pMap, Map commParams, String gathsn, List cs) {
            this.maxCountFeild = maxCountFeild;
            this.nowCountFeild = nowCountFeild;
            this.result = result;
            this.pMap = pMap;
            this.commParams = commParams;
            this.gathsn = gathsn;
            this.cs = cs;
        }

        public List invoke() {
            while (cs.isEmpty()||!cs.get(0).get("GATH_RST").equals("2")){//  0.5        
                String sql = "SELECT GATH_RST,GATH_SN,SOUR_REC_CNT,DEST_REC_CNT FROM DATAGATHLOG WHERE GATH_DATE = (SELECT REPORT_DATE FROM SUBSYS_PVT_CFG WHERE SUBSYS_SN=0) AND GATH_SN="+gathsn;
                cs= sqlExecutor.execQrySql(sql,pMap,commParams,result,
                        BusinessPrompt.QUERY_DATAGATHLOG_FAIL_CODE,BusinessPrompt.QUERY_DATAGATHLOG_FAIL_MSG).getDataList();
                if(!cs.isEmpty()){
                    Map fm = cs.get(0);
                    if(fm!=null&&!fm.isEmpty()){
                        String src = ObjectUtils.toString(fm.get("SOUR_REC_CNT"));
                        String trc = ObjectUtils.toString(fm.get("DEST_REC_CNT"));
                        src=src.isEmpty()?"0":src;
                        trc=trc.isEmpty()?"0":trc;
                        int sourRecCnt = Integer.parseInt(src);
                        int tarRecCnt = Integer.parseInt(trc);
                        ReflectionUtil.setFileVal(ppp,nowCountFeild,tarRecCnt);
                        ReflectionUtil.setFileVal(ppp,maxCountFeild,sourRecCnt);
                    }
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    LogUtil.error(e.fillInStackTrace());
                    throw new AtomException(BusinessPrompt.INTERRUPTED_EXCEPTION_ERROR_CODE, BusinessPrompt.INTERRUPTED_EXCEPTION_ERROR_MSG+"【"+e.getMessage()+"】", FrameworkConstants.ATOM_LVL);
                }
            }
            return cs;
        }

        @Override
        public List call() throws Exception {
            return invoke();
        }
    }