redisツールパッケージ開発——限流モジュール(スライドウィンドウ、漏斗、トークンバケツ)の実現

15131 ワード

限流モジュールは主に3種類の限流のアルゴリズム+aop実現である.
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RedisBloomFilterRegistar.class, RedisLimiterRegistar.class})
public @interface EnableRedisAux {
    String[] bloomFilterPath() default "";
    boolean enableLimit() default false;
    boolean transaction() default false;

}

次にspringは@Importのクラスをロードし、注入されたクラスは注釈の情報を取得して接面を有効にするかどうかを決定します.
public class RedisLimiterRegistar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        Map attributes = importingClassMetadata
                .getAnnotationAttributes(EnableRedisAux.class.getCanonicalName());
        Boolean enableLimit = (Boolean) attributes.get("enableLimit");
        //      ,     、            
        if(enableLimit){
            ClassPathBeanDefinitionScanner scanConfigure =
                    new ClassPathBeanDefinitionScanner(registry, true);
            scanConfigure.scan("com.opensource.redisaux.limiter.autoconfigure");
        }
    }

}

次に、構成クラスに移動します.主に3つのスクリプトをロードし、対応するストッパをキャッシュして接面クラスをロードします.
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
@ConditionalOnBean(RedisTemplate.class)
public class RedisLimiterAutoConfiguration {

    @Autowired
    @Qualifier(BloomFilterConsts.INNERTEMPLATE)
    private RedisTemplate redisTemplate;

    /**
     *      lua  ,  :
     * 1.       
     * 2.   (     -          ) key  
     * 3.           
     * @return
     */
    @Bean
    public DefaultRedisScript windowLimitScript() {
        DefaultRedisScript script = new DefaultRedisScript();
        script.setResultType(Boolean.class);
        script.setScriptText("redis.call('zadd',KEYS[1],ARGV[1],ARGV[1]) redis.call('zremrangebyscore',KEYS[1],0,ARGV[2]) return redis.call('zcard',KEYS[1]) <= tonumber(ARGV[3])");
        return script;
    }

    /**
     *      lua    
     * @return
     */
    @Bean
    public DefaultRedisScript tokenLimitScript() {
        DefaultRedisScript script = new DefaultRedisScript();
        script.setResultType(Long.class);
        script.setLocation(new ClassPathResource("TokenRateLimit.lua"));
        return script;
    }
    /**
     *      lua    
     * @return
     */
    @Bean
    public DefaultRedisScript funnelLimitScript() {
        DefaultRedisScript script = new DefaultRedisScript();
        script.setResultType(Boolean.class);
        script.setLocation(new ClassPathResource("FunnelRateLimit.lua"));
        return script;
    }
    /**
     *   
     * @return
     */
    @Bean
    public LimiterAspect limiterAspect(){
        Map map = new HashMap();
        map.put(BaseRateLimiter.WINDOW_LIMITER, new WindowRateLimiter(redisTemplate, windowLimitScript()));
        map.put(BaseRateLimiter.TOKEN_LIMITER, new TokenRateLimiter(redisTemplate, tokenLimitScript()));
        map.put(BaseRateLimiter.FUNNEL_LIMITER, new FunnelRateLimiter(redisTemplate, funnelLimitScript()));
        return new LimiterAspect(map);
    }



}

切面類は、主に現在の方法の注釈に対応するタイプがどれかを確認し、mapの中に行って対応する実体類を探し、限流操作を行い、操作が通らないと同類の失敗方法を呼び出し、デフォルトでは元の方法のパラメータを伝達しない.もし伝送パラメータが有効になったら、元の方法のパラメータを失敗方法のところに運んでください.
@SuppressWarnings("unchecked")
@Aspect
public class LimiterAspect  {


    private final Map rateLimiterMap;

    private final Map annotationMap;


    public LimiterAspect(Map rateLimiterMap
    ) {
        this.rateLimiterMap = rateLimiterMap;
        this.annotationMap = new ConcurrentHashMap();
    }


    @Pointcut("@annotation(com.opensource.redisaux.limiter.annonations.TokenLimiter)||@annotation(com.opensource.redisaux.limiter.annonations.WindowLimiter)||@annotation(com.opensource.redisaux.limiter.annonations.FunnelLimiter)")
    public void limitPoint() {

    }


    @Around("limitPoint()")
    public Object doAroundAdvice(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
        Class> beanClass = proceedingJoinPoint.getTarget().getClass();
        //      
        String targetName = beanClass.getName();
        //       
        Method method = signature.getMethod();
        String methodKey = CommonUtil.getMethodKey(targetName, method);
        //            
        LimiterType baseLimiter = null;
        Annotation target = null;
        if ((target = annotationMap.get(methodKey)) == null) {
            //               map  
            Annotation[] annotations = signature.getMethod().getAnnotations();
            for (Annotation annotation : annotations) {
                if (annotation.annotationType().isAnnotationPresent(LimiterType.class)) {
                    target = annotation;
                    annotationMap.put(methodKey, target);
                    break;
                }
            }
        }
        baseLimiter = target.annotationType().getAnnotation(LimiterType.class);
        BaseRateLimiter rateLimiter = rateLimiterMap.get(baseLimiter.mode());
        if (rateLimiter.canExecute(target, methodKey)) {
            return proceedingJoinPoint.proceed();
        } else {
            //        
            Object bean =proceedingJoinPoint.getTarget();
            BaseRateLimiter.KeyInfoNode keyInfoNode = BaseRateLimiter.keyInfoMap.get(methodKey);
            String fallBackMethodStr = keyInfoNode.getFallBackMethod();
            if ("".equals(fallBackMethodStr)) {
                return "too much request";
            }

           Method fallBackMethod= keyInfoNode.isPassArgs()?
                   beanClass.getMethod(fallBackMethodStr,method.getParameterTypes()):
                   beanClass.getMethod(fallBackMethodStr);
            fallBackMethod.setAccessible(true);
           return keyInfoNode.isPassArgs()?fallBackMethod.invoke(bean,proceedingJoinPoint.getArgs()):fallBackMethod.invoke(bean);
        }
    }



}

次に、ストリーム制限設計セクション、4つの注記に移動します.
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface LimiterType {
    /**
     *   ,      map       limiter
     * @return
     */
    int mode();
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@LimiterType(mode = BaseRateLimiter.TOKEN_LIMITER)
public @interface TokenLimiter {

    /**
     *      
     *
     * @return
     */
    double capacity();

    /**
     *       
     *
     * @return
     */
    double rate();

    /**
     *       ,   
     *
     * @return
     */
    TimeUnit rateUnit() default TimeUnit.SECONDS;

    /**
     *            
     *
     * @return
     */
    double need();

    /**
     *       
     *
     * @return
     */
    boolean isAbort() default false;

    /**
     *       
     *
     * @return
     */
    int timeout() default -1;

    /**
     *   ,    
     *
     * @return
     */
    TimeUnit timeoutUnit() default TimeUnit.MILLISECONDS;

    String fallback() default "";

    boolean passArgs() default false;

}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@LimiterType(mode = BaseRateLimiter.FUNNEL_LIMITER)
public @interface FunnelLimiter {

    /**
     *     
     * @return
     */
    double capacity();
    /**
     *       
     * @return
     */
    double passRate();
    /**
     *    
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
    /**
     *          
     * @return
     */
    double addWater();

    String fallback() default "";

    boolean passArgs() default false;

}

@LimiterTypeは、現在のメソッドを決定するために必要なストリーム制限タイプです.
他の注釈の中の情報は後のビジネスロジック処理に使用されます.
ここには抽象的なストッパがあります.中には共通情報mapと対応する方法があります.このKeyInfoNodeのkeyListはインタフェースがredisにマッピングされたkeyNameです.スクリプト実行パラメータはlistです.passArgsとfallBackは失敗した方法情報です.通過できるかどうかを判断する方法があります.
public abstract class BaseRateLimiter {
    public final static int WINDOW_LIMITER =1;
    public final static int TOKEN_LIMITER =2;
    public final static int FUNNEL_LIMITER=3;
    //    keyNameList、    ,     
    public static Map keyInfoMap =new ConcurrentHashMap();



    
   static List getKey(String methodKey,String method,boolean passArgs){
        KeyInfoNode keyInfoNode;
        if((keyInfoNode= keyInfoMap.get(methodKey))==null){
            keyInfoNode= new KeyInfoNode();
            keyInfoNode.fallBackMethod=method;
            keyInfoNode.passArgs=passArgs;
            keyInfoNode.keyNameList= Collections.singletonList(methodKey);
            keyInfoMap.put(methodKey, keyInfoNode);
        }
        return keyInfoNode.getKeyNameList();
    }

    /**
     *      ,        
     * @param redisLimiter
     * @param methodKey
     * @return
     */
    public   Boolean canExecute(Annotation redisLimiter, String methodKey){return null;};


    public static class KeyInfoNode{

        private  List keyNameList;
        private  boolean passArgs;
        private String fallBackMethod;


        public List getKeyNameList() {
            return keyNameList;
        }

        public boolean isPassArgs() {
            return passArgs;
        }

        public String getFallBackMethod() {
            return fallBackMethod;
        }
    }

}

漏斗制限流を見て、ここでaop遮断方法で取得した注釈情報によって制限流方法の実行パラメータを決定する
public class FunnelRateLimiter extends BaseRateLimiter {
    private RedisTemplate redisTemplate;
    private DefaultRedisScript redisScript;


    public FunnelRateLimiter(RedisTemplate redisTemplate, DefaultRedisScript redisScript) {
        this.redisScript = redisScript;
        this.redisTemplate = redisTemplate;

    }

    @Override
    public Boolean canExecute(Annotation baseLimiter, String methodKey) {
        FunnelLimiter funnelLimiter = (FunnelLimiter) baseLimiter;
        TimeUnit timeUnit = funnelLimiter.timeUnit();
        double capacity = funnelLimiter.capacity();
        double need = funnelLimiter.addWater();
        double rate = funnelLimiter.passRate();
        long l = timeUnit.toMillis(1);
        double millRate = rate / l;
        String methodName=funnelLimiter.fallback();
        boolean passArgs=funnelLimiter.passArgs();
        List keyList = BaseRateLimiter.getKey(methodKey,methodName,passArgs);
        return (Boolean) redisTemplate.execute(redisScript, keyList, new Object[]{capacity, millRate, need, Double.valueOf(System.currentTimeMillis())});
    }


}

実行するスクリプトは次のとおりです.
redisのhashテーブルによって漏斗器オブジェクトが構築され、その属性は、漏斗容量、漏水速度、一度に要求された水、最後に要求された時間、現在の水量である.
要求が来た場合、前回要求した時間と今回の時間に基づいてこの時間に流出した水を計算し、現在の水量を設定して加水可能か否かを判断し、できれば最後の要求時間と現在の水量を更新することで、漏斗の単位時間当たりの水量が一定であるため、通過要求量が安定することを保証することができる.
--    ,key[1]          ,argv1 capacity,argv2     ,argv3          ,argv4    
local limitInfo = redis.call('hmget', KEYS[1], 'capacity', 'passRate', 'addWater','water', 'lastTs')
local capacity = limitInfo[1]
local passRate = limitInfo[2]
local addWater= limitInfo[3]
local water = limitInfo[4]
local lastTs = limitInfo[5]

--     
if capacity == false then
    capacity = tonumber(ARGV[1])
    passRate = tonumber(ARGV[2])
    --          
    addWater=tonumber(ARGV[3])
    --    
    water = 0
    lastTs = tonumber(ARGV[4])
    redis.call('hmset', KEYS[1], 'capacity', capacity, 'passRate', passRate,'addWater',addWater,'water', water, 'lastTs', lastTs)
    return true
else
    local nowTs = tonumber(ARGV[4])
    --                
    local waterPass = tonumber((nowTs - lastTs)* passRate)
    --      ,     
    water=math.max(0,water-waterPass)
    --         
    lastTs = nowTs
    --        
    addWater=tonumber(addWater)
    if capacity-water >= addWater then
        --  
        water=water+addWater
        --          
        redis.call('hmset', KEYS[1], 'water', water, 'lastTs', lastTs)
        return true
    end
    return false
end

トークンバケツ限流
ここではスクリプトを直接貼り付けます.注釈を取得することでスクリプトにパラメータを与えるので、hashテーブルでトークンバケツオブジェクトを構築します.トークンバケツ数、トークン生成レート、リクエストごとに必要なトークン、前回リクエストした時間
多分原理は実際に漏斗と似ているが,要求が置かれている役割は異なり,ここでは消費者と見なすことができ,漏斗アルゴリズムの要求は生産者と見なすことができる.
プロセス:現在の時間と前回の時間の時間帯で生成されたトークンを計算し、今回の要求に十分かどうかを計算し、対応する情報を更新することにより、トークンバケツ限流は定期的にトークンを生産するため、ある時点、トークンバケツに10個のトークンがあるなどの瞬時のバースト要求に応答することができ、1秒以上の時間で10個の要求に対応してもエラーはないが、漏斗限流は一定の流出水量であるため、1 s以内に10個のリクエストが発生した場合、流速が必ずしも追いつくとは限らず、満タンになったらサービスを拒否するしかありません.
--    ,key[1]          ,argv1 capacity,argv2       ,argv3         ,argv4      
local limitInfo = redis.call('hmget', KEYS[1], 'capacity', 'passRate', 'leftToken', 'lastTs')
local capacity = limitInfo[1]
local rate = limitInfo[2]
local leftToken = limitInfo[3]
local lastTs = limitInfo[4]

--      
if capacity == false then
    capacity = tonumber(ARGV[1])
    rate = tonumber(ARGV[2])
    leftToken = tonumber(ARGV[1])
    lastTs = tonumber(ARGV[4])
    redis.call('hmset', KEYS[1], 'capacity', capacity, 'passRate', rate, 'leftToken', leftToken, 'lastTs', lastTs)
    return -1
else
    local nowTs = tonumber(ARGV[4])
    --                 
    local genTokenNum = tonumber((nowTs - lastTs)* rate)
    --           
    leftToken = genTokenNum + leftToken
    --      
    leftToken = math.min(capacity, leftToken)
    --         
    lastTs = nowTs
    local need = tonumber(ARGV[3])
    --          ,-1     
    if leftToken >= need then
        --       
        leftToken = leftToken - need
        --                
        redis.call('hmset', KEYS[1], 'leftToken', leftToken, 'lastTs', lastTs)
        return -1
    end
    return (need-leftToken)/rate
end

次にスライドウィンドウへ、主にsorted set構造が用いられ、ARGV[1]は現在のリクエストタイムスタンプであり、その後ウィンドウ外のタイムスタンプのリクエスト数をすべて削除し、現在のリクエスト数がスライドウィンドウ単位時間通過のリクエスト数より小さいか否かを返す
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@LimiterType(mode = BaseRateLimiter.WINDOW_LIMITER)
public @interface WindowLimiter {
    /**
     *     ,    
     * @return
     */
    int during();

    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /**
     *       
     * @return
     */
    long value();

    String fallback() default "";

    boolean passArgs() default false;



}
redis.call('zadd',KEYS[1],ARGV[1],ARGV[1])
 redis.call('zremrangebyscore',KEYS[1],0,ARGV[2]) 
return redis.call('zcard',KEYS[1]) <= tonumber(ARGV[3])

現在のストリーム制限機能には改善できる点がたくさんありますが、後で更新する時間があります.
大まかな実装の構想はこのようにして、詳細はgithubの上のコードを見ることができます