redis、quartzに基づく再試行可能な非同期通知実装


1、データベース表の設計
CREATE TABLE `crl_notify_record` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '    ',
  `notice_owner` varchar(30)  NOT NULL COMMENT '        ',
  `notice_owner_serial` varchar(80)  NOT NULL COMMENT '       ',
  `notice_accepter` varchar(30)  NOT NULL COMMENT '       ',
  `notify_url` varchar(300)  NOT NULL COMMENT '    ,http(s)://……',
  `notify_content` text  COMMENT '    :  json ',
  `notify_send_content` text  COMMENT '      ,     ',
  `retry_count` int NOT NULL DEFAULT '20' COMMENT '      ,   :20',
  `notify_count` int NOT NULL DEFAULT '0' COMMENT '     ',
  `last_notify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '      ,    “      ”  ,         。(               )',
  `next_notify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '      ,      :     + (n+5)*n^2.(n   : )',
  `delay_time` int NOT NULL DEFAULT '0' COMMENT '    ,  : ,    , “    -      ”,        ',
  `send_status` varchar(20)  NOT NULL DEFAULT 'INIT' COMMENT '      (http    200       ).   :INIT;   :PROCESSING;  :SUCCESS;  :UNKNOWN;   :PRE_PROCESSING',
  `ret_type` varchar(10)  NOT NULL DEFAULT 'RETCON' COMMENT '             .    (  ):NONE;      (   http  200):SENDED;      (        ):RETCON',
  `ret_content` varchar(512)  DEFAULT NULL COMMENT '              ',
  `notice_priority` int(11) NOT NULL DEFAULT '0' COMMENT '   :0,         ',
  `created_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '    ',
  `modified_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '    ',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_notice_owner` (`notice_owner`,`notice_owner_serial`) USING BTREE,
  KEY `idx__query` (`send_status`,`next_notify_time`,`notice_priority`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='       ';

2、コードロジック:
2.1再試行機構の分散ロック
2.2スキャンステータスは、初期化または実行中であり、送信時間が現在の時間より小さい100個のレコードであり、優先順位でソートされる.
2.3一括更新状態が処理中
2.4メッセージ送信方法を呼び出し、通知レコードの最終状態を更新する.残りの回数があればさらに処理され、なければ不明に更新され、成功すれば成功に更新されます.
2.5タスク全体を5分ごとに実行し、quartz式で構成できます.
import com.alibaba.fastjson.JSON;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

/**
 *          
 */
public class CrlNotifyDispatcherJob {
    private static final Logger logger = LoggerFactory.getLogger(CrlNotifyDispatcherJob.class);

    @Resource
    private CrlNotifyRecordDao crlNotifyRecordDao;

    @Resource
    private CacheBizService cacheClient;

    //         
    private static int max_sleep_count = 10;

    //         ms
    private static int max_sleep_time = 1 * 1000;

    //    
    private static int select_num = 100;

    //        
    private static int expire_time = 30;

    @Resource(name = "crlTaskQuartzRulesService")
    private CrlTaskQuartzRulesService crlTaskQuartzRulesService;

    @Resource
    private CrlHttpPostFacade crlHttpPostFacade;

    public synchronized void doDispatcherJob() {
        logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |start...");
        boolean isSetnx = false;
        try {
            //1.redis  //                    ...  !
            if (!setnx(1)) {
                logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |redis    ");
                return;
            }
            isSetnx = true;
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |        |jobCode:" + JobTypeEnum.CrlNotifyDispatcherJob.getJobCode());
            CrlTaskQuartzRules crlTaskQuartzRules = crlTaskQuartzRulesService.selectByTaskCode(JobTypeEnum.CrlNotifyDispatcherJob.getJobCode());
            if (crlTaskQuartzRules == null || SwitchStatusEnum.OFF.getCode().equals(crlTaskQuartzRules.getSwitchStatus())) {
                logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |        ");
                return;
            }
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |        |jobCode:" + JobTypeEnum.CrlNotifyDispatcherJob.getJobCode() + crlTaskQuartzRules);
            //2.     init/   PRE_PROCESSING     ,      
            List statusList = Lists.newArrayList();
            statusList.add(NotifySendStatusEnum.INIT.getCode());
            statusList.add(NotifySendStatusEnum.PRE_PROCESSING.getCode());
            List notifyRecordList = crlNotifyRecordDao.selectHandleList(statusList, select_num);
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |notifyRecordList" + JSON.toJSONString(notifyRecordList));
            if (CollectionUtils.isEmpty(notifyRecordList)) {
                logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |    |end...");
                return;
            }
            //3.      
            List notifyRecordNoList = Lists.transform(notifyRecordList, new Function() {
                @Override
                public Long apply(CrlNotifyRecord notifyRecord) {
                    return notifyRecord.getId();
                }
            });
            crlNotifyRecordDao.updateStatusByIds(notifyRecordNoList, NotifySendStatusEnum.PROCESSING.getCode());
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |        |notifyRecordNoList:" + JSON.toJSONString(notifyRecordNoList));
            //4.           
            for (CrlNotifyRecord notifyRecord : notifyRecordList) {
                String respStatus = "";
                //      NotifyRecord  ,               
                CrlNotifyRecord upNotifyRecord = new CrlNotifyRecord();
                upNotifyRecord.setId(notifyRecord.getId());
                upNotifyRecord.setNotifyCount(notifyRecord.getNotifyCount() + 1);
                upNotifyRecord.setLastNotifyTime(new Date());
                upNotifyRecord.setNextNotifyTime(new Date(System.currentTimeMillis() + ((notifyRecord.getNotifyCount() + 5) * notifyRecord.getNotifyCount() * notifyRecord.getNotifyCount() * 1000)));
                upNotifyRecord.setDelayTime(new Long(System.currentTimeMillis() - notifyRecord.getNextNotifyTime().getTime()).intValue() / 1000);
                upNotifyRecord.setModifiedDate(new Date());
                try {
                     CrlCommonRequest crlCommonRequest=new CrlCommonRequest();
                    CrlHttpPostRequestDto crlHttpPostRequestDto=new CrlHttpPostRequestDto();
                    crlHttpPostRequestDto.setHttpUrl(notifyRecord.getNotifyUrl());
                    crlHttpPostRequestDto.setJsonStr(notifyRecord.getNotifySendContent());
                    crlCommonRequest.setRequestData(crlHttpPostRequestDto);
                    logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |        |crlCommonRequest:" + crlCommonRequest);
                    CrlCommonResponse crlCommonResponse= crlHttpPostFacade.doHttpPost(crlCommonRequest);
                    if(!crlCommonResponse.isSuccess()){
                        logger.warn("CrlNotifyDispatcherJob|doDispatcherJob|        |          |crlCommonRequest:" + crlCommonRequest);
                        continue;
                    }
                    logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |        |crlCommonRequest:" + crlCommonRequest+"|crlCommonResponse:"+crlCommonResponse);
                    CrlBizCommonResponse crlBizCommonResponse=crlCommonResponse.getResData();
                    if(RespCodeEnum.HTTP_POST_SUCCESS.getCode().equals(crlBizCommonResponse.getResBizCode())){
                        respStatus = NotifySendStatusEnum.SUCCESS.getCode();
                        logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |              |crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
                        if (NotifyRetTypeEnum.SENDED.getCode().equals(notifyRecord.getRetType())) {
                            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |      (   http  200)|crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
                            upNotifyRecord.setRetContent(RespCodeEnum.HTTP_POST_SUCCESS.getDesc());
                        } else if (NotifyRetTypeEnum.RETCON.getCode().equals(notifyRecord.getRetType())) {
                            //      ,  upNotifyRecord       ,           
                            upNotifyRecord.setRetContent(crlBizCommonResponse.getResBizMsg());
                            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |      (        )|crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
                        } else {
                            //         
                            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |    (  )|crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
                        }
                    }else if(notifyRecord.getNotifyCount() > notifyRecord.getRetryCount()){
                        respStatus = NotifySendStatusEnum.UNKNOWN.getCode();
                    }else{
                        respStatus = NotifySendStatusEnum.PRE_PROCESSING.getCode();
                    }
                } catch (Exception e) {
                    logger.warn("CrlNotifyDispatcherJob|doDispatcherJob|          " + LogConstant.SYS_SCHEDULE_EXCEPTION + "|Exception:" + ExceptionUtils.getStackTrace(e));
                    respStatus = NotifySendStatusEnum.PRE_PROCESSING.getCode();
                }
                //                  
                upNotifyRecord.setSendStatus(respStatus);
                int upCount = crlNotifyRecordDao.updateByPrimaryKeySelective(upNotifyRecord);
                logger.info("DispatcherService|doDispatcherJob|        |        |upNotifyRecord:" + JSON.toJSONString(upNotifyRecord) + "|upCount" + upCount);
                if (notifyRecord.getNotifyCount() > notifyRecord.getRetryCount()) {
                    logger.info("CrlNotifyDispatcherJob|doDispatcherJob|        |            |NotifyCount:" + notifyRecord.getNotifyCount() + "|RetryCount:" + notifyRecord.getRetryCount());
                }
            }
        } catch (Exception e) {
            logger.error("CrlNotifyDispatcherJob|doDispatcherJob|          " + LogConstant.SYS_SCHEDULE_EXCEPTION + "|Exception:" + ExceptionUtils.getStackTrace(e));
        } finally {

            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|          |    ");
            if (isSetnx) {
                cacheClient.del(CacheConstant.NOTIFY_DISPATCHER_CAS_KEY);
            }
        }
    }

    //    
    private boolean setnx(int count) throws Exception {
        if (count > max_sleep_count) {
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|setnx|        |        " + count);
            return false;
        }
        Long redisCAS = cacheClient.setnx(CacheConstant.NOTIFY_DISPATCHER_CAS_KEY, "1");
        if (redisCAS == null || (!redisCAS.equals(new Long(1)))) {//    
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|setnx|        |       count   |redisCAS:" + redisCAS + "|count:" + count);
            Thread.sleep(max_sleep_time);
            return setnx(count + 1);
        } else {
            cacheClient.expire(CacheConstant.NOTIFY_DISPATCHER_CAS_KEY, expire_time);
            logger.info("CrlNotifyDispatcherJob|doDispatcherJob|setnx|        |        |redisCAS:" + redisCAS + "|count:" + count);
            return true;
        }
    }

}