CarreraProducerのsendDelayについて

15078 ワード

シーケンス
本文は主にCarreraProducerのsendDelayを研究する.
ProducerInterface
DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/ProducerInterface.java
public interface ProducerInterface {

    void start() throws Exception;

    void shutdown();

    Result sendMessage(Message message);

    Result send(String topic, byte[] body);

    Result send(String topic, String body);

    Result sendByCharset(String topic, String body, String charsetName);

    Result send(String topic, String body, String key, String... tags);

    Result send(String topic, byte[] body, String key, String... tags);

    Result sendByCharset(String topic, String body, String charsetName, String key, String... tags);

    Result sendWithHashId(String topic, long hashId, String body, String key, String... tags);

    Result sendWithHashId(String topic, long hashId, byte[] body, String key, String... tags);

    Result sendWithHashIdByCharset(String topic, long hashId, String body, String charsetName, String key, String[] tags);

    Result sendWithPartition(String topic, int partitionId, long hashId, byte[] body, String key, String... tags);

    Result sendWithPartition(String topic, int partitionId, long hashId, String body, String key, String... tags);

    Result sendWithPartitionByCharset(String topic, int partitionId, long hashId, String body, String charsetName, String key, String[] tags);

    Result sendBatchConcurrently(List messages);

    Result sendBatchOrderly(List messages);

    DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta);

    DelayResult sendDelay(String topic, String body, DelayMeta delayMeta);

    DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName);

    DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags);

    DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags);

    DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags);

    DelayResult cancelDelay(String topic, String uniqDelayMsgId);

    DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags);

    Result sendBatchSync(List messages);
}
  • ProducerInterfaceは、いくつかのsendDelayメソッドおよびcancelDelayメソッド
  • を定義します.
    CarreraProducer
    DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducer.java
    public class CarreraProducer implements ProducerInterface {
        private ProducerInterface producer;
        private CarreraConfig config;
    
        public CarreraProducer(CarreraConfig config) {
            producer = new LocalCarreraProducer(config);
            this.config = config;
        }
    
        public static CarreraProducer newCarreraProducer(CarreraConfig config) throws Exception {
            return new CarreraProducer(config);
        }
    
        public MessageBuilder messageBuilder() {
            return new MessageBuilder(this);
        }
    
        public AddDelayMessageBuilder addDelayMessageBuilder() {
            return new AddDelayMessageBuilder(this);
        }
    
        public CancelDelayMessageBuilder cancelDelayMessageBuilder() {
            return new CancelDelayMessageBuilder(this);
        }
    
        public AddTxMonitorMessageBuilder addTxMonitorMessageBuilder(AddDelayMessageBuilder addDelayMessageBuilder) {
            return new AddTxMonitorMessageBuilder(addDelayMessageBuilder);
        }
    
        public CancelTxMonitorMessageBuilder cancelTxMonitorMessageBuilder(CancelDelayMessageBuilder cancelDelayMessageBuilder) {
            return new CancelTxMonitorMessageBuilder(cancelDelayMessageBuilder);
        }
    
        public TxBusinessMessageBuilder txBusinessMessageBuilder(MessageBuilder messageBuilder) {
            return new TxBusinessMessageBuilder(messageBuilder);
        }
    
        //......
    
        @Override
        public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) {
            return producer.sendDelay(topic, body, delayMeta);
        }
    
        @Override
        public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) {
            return producer.sendDelay(topic, body, delayMeta);
        }
    
        @Override
        public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) {
            return producer.sendDelayByCharset(topic, body, delayMeta, charsetName);
        }
    
        @Override
        public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) {
            return producer.sendDelay(topic, body, delayMeta, tags);
        }
    
        @Override
        public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) {
            return producer.sendDelay(topic, body, delayMeta, tags);
        }
    
        @Override
        public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) {
            return producer.sendDelayByCharset(topic, body, delayMeta, charsetName, tags);
        }
    
        @Override
        public DelayResult cancelDelay(String topic, String uniqDelayMsgId) {
            return producer.cancelDelay(topic, uniqDelayMsgId);
        }
    
        @Override
        public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) {
            return producer.cancelDelay(topic, uniqDelayMsgId, tags);
        }
    
        //......    
    }
  • CarreraProducerはProducerInterfaceインタフェースを実現し、そのsendDelay、cancelDelayメソッドはLocalCarreraProducer
  • に委任された.
    LocalCarreraProducer
    DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/LocalCarreraProducer.java
    public class LocalCarreraProducer extends CarreraProducerBase implements ProducerInterface {
    
        public LocalCarreraProducer(CarreraConfig config) {
            super(config);
        }
    
        @Override
        protected void initNodeMgr() throws Exception {
            nodeMgr = NodeManager.newLocalNodeManager(config, config.getCarreraProxyList());
            nodeMgr.initConnectionPool();
        }
    }
  • LocalCarreraProducerはCarreraProducerBaseを継承し、ProducerInterfaceインタフェース
  • を実現した.
    CarreraProducerBase
    DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducerBase.java
    public abstract class CarreraProducerBase implements ProducerInterface {
        private static final Logger LOGGER = LoggerFactory.getLogger(CarreraProducerBase.class);
        private static final Logger DROP_LOGGER = LoggerFactory.getLogger("DROP_LOG");
    
        private static final int DELAY_ACTIONS_ADD = 1;
        private static final int DELAY_ACTIONS_CANCEL = 2;
        private static final String TAGS_SEPARATOR = "||";
        private volatile boolean isRunning = false;
        protected NodeManager nodeMgr;
        protected CarreraConfig config;
        private ExecutorService executor;
    
        public CarreraProducerBase(CarreraConfig config) {
            this.config = config;
        }
    
        //......
    
        public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) {
            return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, randomKey()));
        }
    
        public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) {
            return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, randomKey()));
        }
    
        public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) {
            try {
                return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta));
            } catch (UnsupportedEncodingException e) {
                return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), "");
            }
        }
    
        public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) {
            return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, tags));
        }
    
        public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) {
            return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, tags));
        }
    
        public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) {
            try {
                return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta, tags));
            } catch (UnsupportedEncodingException e) {
                return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), "");
            }
        }
    
        public DelayResult cancelDelay(String topic, String uniqDelayMsgId) {
            return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, randomKey()));
        }
    
        public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) {
            return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, tags));
        }
    
        private DelayMessage buildDelayMessage4Add(String topic, byte[] body, DelayMeta delayMeta, String... tags) {
            DelayMessage delayMessage = new DelayMessage();
            delayMessage.setTopic(topic);
            delayMessage.setBody(body);
            delayMessage.setAction(DELAY_ACTIONS_ADD);
            delayMessage.setTimestamp(delayMeta.getTimestamp());
            delayMessage.setDmsgtype(delayMeta.getDmsgtype());
            delayMessage.setInterval(delayMeta.getInterval());
            delayMessage.setExpire(delayMeta.getExpire());
            delayMessage.setTimes(delayMeta.getTimes());
            delayMessage.setUuid(new UUID().toString());
            delayMessage.setVersion(VersionUtils.getVersion());
            if (null != delayMeta.getProperties() && delayMeta.getProperties().size() > 0) {
                delayMessage.setProperties(delayMeta.getProperties());
            }
    
            if (ArrayUtils.isNotEmpty(tags)) {
                delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR));
            }
    
            return delayMessage;
        }
    
        private DelayMessage buildDelayMessage4Cancel(String topic, String uniqDelayMsgId, String... tags) {
            DelayMessage delayMessage = new DelayMessage();
            delayMessage.setTopic(topic);
            delayMessage.setUniqDelayMsgId(uniqDelayMsgId);
            delayMessage.setAction(DELAY_ACTIONS_CANCEL);
            delayMessage.setVersion(VersionUtils.getVersion());
            delayMessage.setBody("c".getBytes()); // if body is null, new String(message.getBody()) will throw NullPointerException
    
            if (ArrayUtils.isNotEmpty(tags)) {
                delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR));
            }
    
            return delayMessage;
        }
    
        private DelayResult sendDelayMessage(DelayMessage message) {
            DelayResult result = new DelayResult(UNKNOWN_EXCEPTION, "unknown exception", "");
            if (!isRunning) {
                result.setCode(CLIENT_EXCEPTION);
                result.setMsg("please execute the start() method before sending the message");
                return result;
            }
            int retryCnt = 0;
            long start, used = 0;
            long begin = TimeUtils.getCurTime();
            String proxyAddress = null;
            do {
                CarreraConnection connection = null;
                try {
                    connection = nodeMgr.borrowConnection(config.getCarreraClientTimeout());
                    if (connection == null) {
                        if (result.getCode() == UNKNOWN_EXCEPTION) {
                            result.setCode(NO_MORE_HEALTHY_NODE);
                            result.setMsg("no more healthy node");
                        }
                        delay(config.getCarreraClientTimeout());
                        continue;
                    }
                    proxyAddress = connection.getNode().toString();
                    start = TimeUtils.getCurTime();
                    result = connection.sendDelay(message, this.config.getCarreraProxyTimeout());
                    used = TimeUtils.getElapseTime(start);
    
                    if (result.getCode() > OK) {
                        switch (result.getCode()) {
                            case FAIL_ILLEGAL_MSG:
                            case FAIL_TOPIC_NOT_ALLOWED:
                            case FAIL_TOPIC_NOT_EXIST:
                            case FAIL_TIMEOUT:
                            case FAIL_REFUSED_BY_RATE_LIMITER:
                                delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0));
                                break;
                            default:
                                nodeMgr.unhealthyNode(connection.getNode());
                                delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0));
                                break; //break switch
                        }
                    } else {
                        break; //break loop
                    }
                } catch (Exception e) {
                    LOGGER.warn("sendMessage failed, retry count:" + retryCnt + ", topic:" + message.topic + ", key:" + message.uniqDelayMsgId, e);
                    result.setCode(CLIENT_EXCEPTION);
                    result.setMsg(e.toString());
                } finally {
                    if (connection != null) {
                        nodeMgr.returnConnection(connection);
                    }
                }
            } while (retryCnt++ < this.config.getCarreraClientRetry());
    
            if (result.getCode() > OK) {
                LOGGER.error("send delay msg result:{}; msg[ip:{},topic:{},uuid:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{},ret.Code:{},ret.Msg:{}]",
                        resultToString(result), proxyAddress, message.getTopic(), message.getUuid(), message.getUniqDelayMsgId(),
                        StringUtils.length(new String(message.getBody())), TimeUtils.getElapseTime(begin), retryCnt, result.getCode(), result.getMsg());
            } else {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("send delay msg result:{}; msg[ip:{},topic:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{}]",
                            resultToString(result), proxyAddress, message.getTopic(), result.getUniqDelayMsgId(),
                            StringUtils.length(new String(message.getBody())), used, retryCnt);
                }
            }
            return result;
        }
    
    }
  • sendDelayはbuildDelayMessage 4 AddによってDelayMessageを構築し、cancelDelayはbuildDelayMessage 4 CancelによってDelayMessageを構築し、最後にsendDelayMessageによってメッセージ
  • を送信する.
    小結
    ProducerInterfaceはいくつかのsendDelayとcancelDelay方法を定義した.CarreraProducerはProducerInterfaceインタフェースを実現し、sendDelay、cancelDelayメソッドがLocalCarreraProducerに委任された.LocalCarreraProducerはCarreraProducerBaseを継承し、ProducerInterfaceインタフェースを実現した.CarreraProducerBaseのsendDelayはbuildDelayMessage 4 AddでDelayMessageを構築し、cancelDelayはbuildDelayMessage 4 CancelでDelayMessageを構築し、最後にsendDelayMessageでメッセージを送信する
    doc
  • carrera-chronos