redis、quartzに基づく再試行可能な非同期通知実装
1、データベース表の設計
2、コードロジック:
2.1再試行機構の分散ロック
2.2スキャンステータスは、初期化または実行中であり、送信時間が現在の時間より小さい100個のレコードであり、優先順位でソートされる.
2.3一括更新状態が処理中
2.4メッセージ送信方法を呼び出し、通知レコードの最終状態を更新する.残りの回数があればさらに処理され、なければ不明に更新され、成功すれば成功に更新されます.
2.5タスク全体を5分ごとに実行し、quartz式で構成できます.
CREATE TABLE `crl_notify_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ' ',
`notice_owner` varchar(30) NOT NULL COMMENT ' ',
`notice_owner_serial` varchar(80) NOT NULL COMMENT ' ',
`notice_accepter` varchar(30) NOT NULL COMMENT ' ',
`notify_url` varchar(300) NOT NULL COMMENT ' ,http(s)://……',
`notify_content` text COMMENT ' : json ',
`notify_send_content` text COMMENT ' , ',
`retry_count` int NOT NULL DEFAULT '20' COMMENT ' , :20',
`notify_count` int NOT NULL DEFAULT '0' COMMENT ' ',
`last_notify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' , “ ” , 。( )',
`next_notify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' , : + (n+5)*n^2.(n : )',
`delay_time` int NOT NULL DEFAULT '0' COMMENT ' , : , , “ - ”, ',
`send_status` varchar(20) NOT NULL DEFAULT 'INIT' COMMENT ' (http 200 ). :INIT; :PROCESSING; :SUCCESS; :UNKNOWN; :PRE_PROCESSING',
`ret_type` varchar(10) NOT NULL DEFAULT 'RETCON' COMMENT ' . ( ):NONE; ( http 200):SENDED; ( ):RETCON',
`ret_content` varchar(512) DEFAULT NULL COMMENT ' ',
`notice_priority` int(11) NOT NULL DEFAULT '0' COMMENT ' :0, ',
`created_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' ',
`modified_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ' ',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_notice_owner` (`notice_owner`,`notice_owner_serial`) USING BTREE,
KEY `idx__query` (`send_status`,`next_notify_time`,`notice_priority`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT=' ';
2、コードロジック:
2.1再試行機構の分散ロック
2.2スキャンステータスは、初期化または実行中であり、送信時間が現在の時間より小さい100個のレコードであり、優先順位でソートされる.
2.3一括更新状態が処理中
2.4メッセージ送信方法を呼び出し、通知レコードの最終状態を更新する.残りの回数があればさらに処理され、なければ不明に更新され、成功すれば成功に更新されます.
2.5タスク全体を5分ごとに実行し、quartz式で構成できます.
import com.alibaba.fastjson.JSON;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
*
*/
public class CrlNotifyDispatcherJob {
private static final Logger logger = LoggerFactory.getLogger(CrlNotifyDispatcherJob.class);
@Resource
private CrlNotifyRecordDao crlNotifyRecordDao;
@Resource
private CacheBizService cacheClient;
//
private static int max_sleep_count = 10;
// ms
private static int max_sleep_time = 1 * 1000;
//
private static int select_num = 100;
//
private static int expire_time = 30;
@Resource(name = "crlTaskQuartzRulesService")
private CrlTaskQuartzRulesService crlTaskQuartzRulesService;
@Resource
private CrlHttpPostFacade crlHttpPostFacade;
public synchronized void doDispatcherJob() {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| |start...");
boolean isSetnx = false;
try {
//1.redis // ... !
if (!setnx(1)) {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| |redis ");
return;
}
isSetnx = true;
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |jobCode:" + JobTypeEnum.CrlNotifyDispatcherJob.getJobCode());
CrlTaskQuartzRules crlTaskQuartzRules = crlTaskQuartzRulesService.selectByTaskCode(JobTypeEnum.CrlNotifyDispatcherJob.getJobCode());
if (crlTaskQuartzRules == null || SwitchStatusEnum.OFF.getCode().equals(crlTaskQuartzRules.getSwitchStatus())) {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | ");
return;
}
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |jobCode:" + JobTypeEnum.CrlNotifyDispatcherJob.getJobCode() + crlTaskQuartzRules);
//2. init/ PRE_PROCESSING ,
List statusList = Lists.newArrayList();
statusList.add(NotifySendStatusEnum.INIT.getCode());
statusList.add(NotifySendStatusEnum.PRE_PROCESSING.getCode());
List notifyRecordList = crlNotifyRecordDao.selectHandleList(statusList, select_num);
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| |notifyRecordList" + JSON.toJSONString(notifyRecordList));
if (CollectionUtils.isEmpty(notifyRecordList)) {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |end...");
return;
}
//3.
List notifyRecordNoList = Lists.transform(notifyRecordList, new Function() {
@Override
public Long apply(CrlNotifyRecord notifyRecord) {
return notifyRecord.getId();
}
});
crlNotifyRecordDao.updateStatusByIds(notifyRecordNoList, NotifySendStatusEnum.PROCESSING.getCode());
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |notifyRecordNoList:" + JSON.toJSONString(notifyRecordNoList));
//4.
for (CrlNotifyRecord notifyRecord : notifyRecordList) {
String respStatus = "";
// NotifyRecord ,
CrlNotifyRecord upNotifyRecord = new CrlNotifyRecord();
upNotifyRecord.setId(notifyRecord.getId());
upNotifyRecord.setNotifyCount(notifyRecord.getNotifyCount() + 1);
upNotifyRecord.setLastNotifyTime(new Date());
upNotifyRecord.setNextNotifyTime(new Date(System.currentTimeMillis() + ((notifyRecord.getNotifyCount() + 5) * notifyRecord.getNotifyCount() * notifyRecord.getNotifyCount() * 1000)));
upNotifyRecord.setDelayTime(new Long(System.currentTimeMillis() - notifyRecord.getNextNotifyTime().getTime()).intValue() / 1000);
upNotifyRecord.setModifiedDate(new Date());
try {
CrlCommonRequest crlCommonRequest=new CrlCommonRequest();
CrlHttpPostRequestDto crlHttpPostRequestDto=new CrlHttpPostRequestDto();
crlHttpPostRequestDto.setHttpUrl(notifyRecord.getNotifyUrl());
crlHttpPostRequestDto.setJsonStr(notifyRecord.getNotifySendContent());
crlCommonRequest.setRequestData(crlHttpPostRequestDto);
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |crlCommonRequest:" + crlCommonRequest);
CrlCommonResponse crlCommonResponse= crlHttpPostFacade.doHttpPost(crlCommonRequest);
if(!crlCommonResponse.isSuccess()){
logger.warn("CrlNotifyDispatcherJob|doDispatcherJob| | |crlCommonRequest:" + crlCommonRequest);
continue;
}
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |crlCommonRequest:" + crlCommonRequest+"|crlCommonResponse:"+crlCommonResponse);
CrlBizCommonResponse crlBizCommonResponse=crlCommonResponse.getResData();
if(RespCodeEnum.HTTP_POST_SUCCESS.getCode().equals(crlBizCommonResponse.getResBizCode())){
respStatus = NotifySendStatusEnum.SUCCESS.getCode();
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
if (NotifyRetTypeEnum.SENDED.getCode().equals(notifyRecord.getRetType())) {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | ( http 200)|crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
upNotifyRecord.setRetContent(RespCodeEnum.HTTP_POST_SUCCESS.getDesc());
} else if (NotifyRetTypeEnum.RETCON.getCode().equals(notifyRecord.getRetType())) {
// , upNotifyRecord ,
upNotifyRecord.setRetContent(crlBizCommonResponse.getResBizMsg());
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | ( )|crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
} else {
//
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | ( )|crlCommonRequest:" + crlCommonRequest+"|crlBizCommonResponse:" + crlBizCommonResponse);
}
}else if(notifyRecord.getNotifyCount() > notifyRecord.getRetryCount()){
respStatus = NotifySendStatusEnum.UNKNOWN.getCode();
}else{
respStatus = NotifySendStatusEnum.PRE_PROCESSING.getCode();
}
} catch (Exception e) {
logger.warn("CrlNotifyDispatcherJob|doDispatcherJob| " + LogConstant.SYS_SCHEDULE_EXCEPTION + "|Exception:" + ExceptionUtils.getStackTrace(e));
respStatus = NotifySendStatusEnum.PRE_PROCESSING.getCode();
}
//
upNotifyRecord.setSendStatus(respStatus);
int upCount = crlNotifyRecordDao.updateByPrimaryKeySelective(upNotifyRecord);
logger.info("DispatcherService|doDispatcherJob| | |upNotifyRecord:" + JSON.toJSONString(upNotifyRecord) + "|upCount" + upCount);
if (notifyRecord.getNotifyCount() > notifyRecord.getRetryCount()) {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | |NotifyCount:" + notifyRecord.getNotifyCount() + "|RetryCount:" + notifyRecord.getRetryCount());
}
}
} catch (Exception e) {
logger.error("CrlNotifyDispatcherJob|doDispatcherJob| " + LogConstant.SYS_SCHEDULE_EXCEPTION + "|Exception:" + ExceptionUtils.getStackTrace(e));
} finally {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob| | ");
if (isSetnx) {
cacheClient.del(CacheConstant.NOTIFY_DISPATCHER_CAS_KEY);
}
}
}
//
private boolean setnx(int count) throws Exception {
if (count > max_sleep_count) {
logger.info("CrlNotifyDispatcherJob|doDispatcherJob|setnx| | " + count);
return false;
}
Long redisCAS = cacheClient.setnx(CacheConstant.NOTIFY_DISPATCHER_CAS_KEY, "1");
if (redisCAS == null || (!redisCAS.equals(new Long(1)))) {//
logger.info("CrlNotifyDispatcherJob|doDispatcherJob|setnx| | count |redisCAS:" + redisCAS + "|count:" + count);
Thread.sleep(max_sleep_time);
return setnx(count + 1);
} else {
cacheClient.expire(CacheConstant.NOTIFY_DISPATCHER_CAS_KEY, expire_time);
logger.info("CrlNotifyDispatcherJob|doDispatcherJob|setnx| | |redisCAS:" + redisCAS + "|count:" + count);
return true;
}
}
}