Spring Cloud Gateway 6限流

9688 ワード

Spring Cloud Gateway限流
ストリーム制限の目的は、同時アクセス/リクエストを制限するか、または時間ウィンドウ内のリクエストを制限することによってシステムを保護することであり、制限速度に達すると、サービスを拒否することによって、エラーページまたは友好的な展示ページに指向し、キューまたは待機することができる.
Gateway内蔵フィルタ工場限流
Spring Cloud Gatewayは、RequestRateLimiterGatewayFilterFactoryというクラスを公式に提供しており、Redisとluaスクリプトを適用してトークンバケツを実現しています.具体的なインプリメンテーションロジック
pomファイルにgatewayの開始依存とredisのreactive依存を導入

    org.springframework.cloud
    spring-cloud-starter-gateway



    org.springframework.boot
    spring-boot-starter-data-redis-reactive

コンフィギュレーション
server:
  port: 8081
spring:
  cloud:
    gateway:
      routes:
      - id: limit_route
        uri: http://httpbin.org:80/get
        predicates:
        - After=2017-01-20T17:42:47.789-07:00[America/Denver]
        filters:
        - name: RequestRateLimiter
          args:
            key-resolver: '#{@hostAddrKeyResolver}'
            redis-rate-limiter.replenishRate: 1
            redis-rate-limiter.burstCapacity: 3
  application:
    name: gateway-limiter
  redis:
    host: localhost
    port: 6379
    database: 0

設定ファイル、指定されたプログラムのポートが8081で、redisの情報が構成され、RequestRateLimiterのストリーム制限フィルタが構成されています.このフィルタには3つのパラメータが必要です.
  • burstCapacity、トークンバケツの総容量.
  • replenishRate、トークンバケツ毎秒充填平均速度.
  • key-resolver、ストリームを制限するキーの解析器に使用されるBeanオブジェクトの名前.SpEL式を使用して、#{@beanName}に従ってSpringコンテナからBeanオブジェクトを取得します.

  • KeyResolverはresolveメソッドを実装する必要があり,たとえばHostnameによるストリーム制限はhostAddressで判断する必要がある.KeyResolverを実装した後、このクラスのBeanをIocコンテナに登録する必要があります.
    public class HostAddrKeyResolver implements KeyResolver {
    
        @Override
        public Mono resolve(ServerWebExchange exchange) {
            return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
        }
    
    }
         @Bean
        public HostAddrKeyResolver hostAddrKeyResolver() {
            return new HostAddrKeyResolver();
        }

    uriによってストリームを制限すると、KeyResolverコードは次のようになります.
    public class UriKeyResolver  implements KeyResolver {
    
        @Override
        public Mono resolve(ServerWebExchange exchange) {
            return Mono.just(exchange.getRequest().getURI().getPath());
        }
    
    }
         @Bean
        public UriKeyResolver uriKeyResolver() {
            return new UriKeyResolver();
        }

    ユーザーの次元でストリームを制限する
    @Bean
        KeyResolver userKeyResolver() {
            return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
        }

    カスタム制限フロー
    Spring Cloud Gatewayはカスタムストリーム制限を実現し、フィルタを作成する必要があります.GuavaにおけるRateLimiter,Bucket 4 j,RateLimitJ限流はいずれもトークンバケツに基づいて実現される.
    以下、Bucket 4 jを用いてストリーム制限を行う.
    pom
    
            
                org.springframework.cloud
                spring-cloud-starter-gateway
            
            
            
                com.github.vladimir-bukhtoyarov
                bucket4j-core
                4.0.0
            

    カスタムフィルタはGatewayFilter,Orderedインタフェースを実現し,ipに対するストリーム制限を実現する必要がある
    import io.github.bucket4j.Bandwidth;
    import io.github.bucket4j.Bucket;
    import io.github.bucket4j.Bucket4j;
    import io.github.bucket4j.Refill;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.core.Ordered;
    import org.springframework.http.HttpStatus;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     *         ip  
     */
    public class GatewayRateLimitFilterByIp implements GatewayFilter, Ordered {
    
        private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByIp.class);
    
        /**
         *          ConcurrentHashMap    bucket,
         *             ,     Redis        
         */
        private static final Map LOCAL_CACHE = new ConcurrentHashMap<>();
    
        /**
         *       ,     Token      
         */
        int capacity;
        /**
         *    Token    
         */
        int refillTokens;
        /**
         *   Token      
         */
        Duration refillDuration;
    
        public GatewayRateLimitFilterByIp() {
        }
    
        public GatewayRateLimitFilterByIp(int capacity, int refillTokens, Duration refillDuration) {
            this.capacity = capacity;
            this.refillTokens = refillTokens;
            this.refillDuration = refillDuration;
        }
    
        private Bucket createNewBucket() {
            Refill refill = Refill.of(refillTokens, refillDuration);
            Bandwidth limit = Bandwidth.classic(capacity, refill);
            return Bucket4j.builder().addLimit(limit).build();
        }
    
        @Override
        public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();
            Bucket bucket = LOCAL_CACHE.computeIfAbsent(ip, k -> createNewBucket());
            log.debug("IP:{} ,      Token  :{} " ,ip,bucket.getAvailableTokens());
            if (bucket.tryConsume(1)) {
                return chain.filter(exchange);
            } else {
               //        0 ,      429   
                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                return exchange.getResponse().setComplete();
            }
        }
    
        @Override
        public int getOrder() {
            return -1000;
        }
    
        public static Map getLocalCache() {
            return LOCAL_CACHE;
        }
    
        public int getCapacity() {
            return capacity;
        }
    
        public void setCapacity(int capacity) {
            this.capacity = capacity;
        }
    
        public int getRefillTokens() {
            return refillTokens;
        }
    
        public void setRefillTokens(int refillTokens) {
            this.refillTokens = refillTokens;
        }
    
        public Duration getRefillDuration() {
            return refillDuration;
        }
    
        public void setRefillDuration(Duration refillDuration) {
            this.refillDuration = refillDuration;
        }
    }

    コード構成
    @Bean
        public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
            return builder.routes()
                    .route(r -> r.path("/test/rateLimit")
                            .filters(f -> f.filter(new GatewayRateLimitFilterByIp(10,1,Duration.ofSeconds(1))))
                            .uri("http://localhost:8000/hello/rateLimit")
                            .id("rateLimit_route")
                    ).build();
        }

    CPU使用率によるストリーム制限
    Spring Boot Actuatorが提供するMetricsで現在のCPUの使用状況を取得し、ストリーム制限を行う
    
        org.springframework.boot
        spring-boot-starter-actuator
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.actuate.metrics.MetricsEndpoint;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.core.Ordered;
    import org.springframework.http.HttpStatus;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    import java.util.Objects;
    
    /**
     *   CPU       
     **/
    @Component
    public class GatewayRateLimitFilterByCpu implements GatewayFilter, Ordered {
    
        private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByCpu.class);
    
        @Autowired
        private MetricsEndpoint metricsEndpoint;
    
        private static final String METRIC_NAME = "system.cpu.usage";
    
        private static final double MAX_USAGE = 0.50D;
    
        @Override
        public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            //         CPU    
            Double systemCpuUsage = metricsEndpoint.metric(METRIC_NAME, null)
                    .getMeasurements()
                    .stream()
                    .filter(Objects::nonNull)
                    .findFirst()
                    .map(MetricsEndpoint.Sample::getValue)
                    .filter(Double::isFinite)
                    .orElse(0.0D);
    
            boolean isOpenRateLimit = systemCpuUsage >MAX_USAGE;
            log.debug("system.cpu.usage: {}, isOpenRateLimit:{} ",systemCpuUsage , isOpenRateLimit);
            if (isOpenRateLimit) {
                // CPU                
                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                return exchange.getResponse().setComplete();
            } else {
                return chain.filter(exchange);
            }
        }
    
        @Override
        public int getOrder() {
            return 0;
        }
    
    }

    コード構成
    @Autowired
        private GatewayRateLimitFilterByCpu gatewayRateLimitFilterByCpu;
    
        @Bean
        public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
            return builder.routes()
                    .route(r -> r.path("/test/rateLimit")
                            .filters(f -> f.filter(gatewayRateLimitFilterByCpu))
                            .uri("http://localhost:8000/hello/rateLimit")
                            .id("rateLimit_route")
                    ).build();
        }