APIゲートウェイ監視の簡単な実現


記事の目次
  • 一.需要
  • 二.考え方
  • 三.実現
  • 1. filter&factory
  • 2. service

  • 一.需要
    APIゲートウェイ統計:呼び出し量、応答時間、応答コードなど、時間粒度は分(アリクラウドゲートウェイのいくつかのパラメータを参照)
    二.考え方
  • まず、フレームワークはSpring Cloud Gatewayルーティングによるfilterの統計
  • 各ホストが要求したデータをローカルメモリに統計し、ネットワークioがインタフェースに影響しないようにする
  • 使用LongAdderカウント
  • 使用ConcurretHashMap各インタフェースの統計を保存
  • タイミングタスク:毎分各ホストが同期してredisに加算
  • まず1つ使用redis set全てのインタフェースurlを格納
  • インタフェースurlごとに1個使用redis hash統計データを格納keyは時間(分)、valueは統計
  • 各ホストは分散ロックで同期し、分散キャッシュに保存する
  • タイミングタスク:1時間あたりredisからデータベースへの永続化
  • 三.実現
    依存:Redissonspring-cloud-gatewayspring-webflux1. filter&factory
    /**
     *      
     * 1.    
     * 2.     
     * 3.    (   )
     * 4.    (Api    )
     */
    @Slf4j
    @Getter
    @Component
    public class MonitorFilter implements GatewayFilter, Ordered {
    
        private static final String START_TIME = "MonitorStartTime";
    
        @Autowired
        MonitorService monitorService;
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    
            String url = exchange.getRequest().getURI().getPath();
    
            MonitorService.MonitorParams params = monitorService.getMonitorParams(url);
            LongAdder reqNum = params.getReqNum();
            LongAdder res5xxNum = params.getRes5xxNum();
            LongAdder errorNum = params.getErrorNum();
            LongAdder resTime = params.getResTime();
    
            reqNum.increment();
    
            exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
            return chain.filter(exchange).doOnError(e -> {
                //       
                errorNum.increment();
            }).then(
                    Mono.fromRunnable(() -> {
                        Long startTime = exchange.getAttribute(START_TIME);
                        if (startTime != null) {
                            //      
                            resTime.add(System.currentTimeMillis() - startTime);
                            //5xx      
                            if (exchange.getResponse().getStatusCode().is5xxServerError()) {
                                res5xxNum.increment();
                            }
                        }
                    })
            );
        }
    
        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }
    }
    
    @Component
    public class MonitorGatewayFilterFactory extends AbstractGatewayFilterFactory {
    
        @Autowired
        MonitorFilter monitorFilter;
    
        public GatewayFilter apply() {
            return apply(o -> {
            });
        }
    
        @Override
        public GatewayFilter apply(Object config) {
            return monitorFilter;
        }
    }
    
    2. service
    @Service
    @Slf4j
    public class MonitorService {
    
    
        private Map<String, MonitorParams> map = new ConcurrentHashMap<>();
    
        @Autowired
        RedissonClient redissonClient;
    
        @Autowired
        MonitorRecordMapper monitorRecordMapper;
    
        @Autowired
        MonitorUrlMapper monitorUrlMapper;
    
    
        public static final String LOCK_MONITOR_FLUSH = "lock_monitor_flush_";
        public static final String LOCK_MONITOR_SAVE = "lock_monitor_save_";
        public static final String CACHE_MONITOR = "cache_monitor";
    
        public MonitorParams getMonitorParams(String url) {
            MonitorParams params = map.getOrDefault(url, new MonitorParams());
            map.putIfAbsent(url, params);
            return params;
        }
    
    
        /**
         *       
         *
         * @param time
         */
        public void flush(String time) {
    
    
            log.info("        :{}", time);
    
            Flux.fromIterable(map.entrySet())
                    .publishOn(Schedulers.parallel())
                    .subscribe(entry -> {
                        log.info("flush");
    
    
                        //url  ,       (/)
                        String url = entry.getKey().replaceAll("/$", "");
    
                        //  url  
                        String lockKey = LOCK_MONITOR_FLUSH + url;
                        RLock lock = redissonClient.getLock(lockKey);
                        try {
                            lock.lock();
    
                            TimeUnit.SECONDS.sleep(1);
    
                            //         
                            MonitorParams params = entry.getValue();
                            MonitorData data = new MonitorData(url, time,
                                    params.getReqNum().sumThenReset(),
                                    params.getRes5xxNum().sumThenReset(),
                                    params.getErrorNum().sumThenReset(),
                                    params.getResTime().sumThenReset());
    
                            //           
                            if (!isMonitorUrl(url)) {
                                return;
                            }
    
                            //        ,      
                            if (data.getReqNum() == 0) {
                                return;
                            }
    
                            //  url    
                            String cacheKey = String.format("%s_%s", CACHE_MONITOR, url);
    
                            //   key      
                            RSet<String> cacheList = redissonClient.getSet(CACHE_MONITOR);
                            cacheList.add(cacheKey);
    
                            //       redis  
                            RMap<String, MonitorData> cacheMap = redissonClient.getMap(cacheKey);
                            MonitorData result = cacheMap.putIfAbsent(time, data);
                            if (result != null) {
                                //          
                                cacheMap.put(time, data.add(result));
                            }
    
                        } catch (Exception e) {
                            log.error("        :" + url, e);
                        } finally {
                            lock.unlock();
                        }
                    });
    
    
            log.info("        :{}", time);
    
        }
    
    
        /**
         *       
         *
         * @param time
         */
        public void save(String time) {
    
            RLock lock = redissonClient.getLock(LOCK_MONITOR_SAVE + time);
            //       
            if (!lock.tryLock()) {
                log.info("            ");
                return;
            }
    
            log.info("        :{}", time);
            try {
    
    
                RSet<String> cacheList = redissonClient.getSet(CACHE_MONITOR);
                if (cacheList == null) {
                    return;
                }
    
                Flux.fromIterable(cacheList)
                        .publishOn(Schedulers.newParallel("monitor_save"))
                        .subscribe(cacheKey -> {
    
                            RMap<String, MonitorData> map = redissonClient.getMap(cacheKey);
                            map.entrySet()
                                    .forEach(entry -> {
                                        String key = entry.getKey();
                                        try {
                                            MonitorData value = entry.getValue();
                                            log.info("{}->{}", key, value);
                                            if (value != null) {
                                                //  
                                                insertData(value);
                                                map.remove(key);
                                            }
                                        } catch (Exception e) {
                                            log.error(String.format("          :%s-%s", cacheKey, key), e);
                                        }
                                    });
                        });
    
    
            } finally {
                lock.unlock();
            }
            log.info("        :{}", time);
    
        }
    
        private void insertData(MonitorData data) {
            MonitorRecord s = new MonitorRecord();
            s.setTime(Long.parseLong(data.getTime()));
            s.setUrl(data.getUrl());
            s.setReqNum(data.getReqNum());
            s.setErrorNum(data.getErrorNum());
            s.setRes5xxNum(data.getRes5xxNum());
            s.setResTime(data.getResTime());
            monitorRecordMapper.insertSelective(s);
        }
    
        /**
         *        url
         *
         * @param url
         * @return
         */
        private boolean isMonitorUrl(String url) {
            List<MonitorUrl> list = getProxy().selectAllUrl();
            return list == null ? false : list.contains(url);
        }
    
        @Cacheable(value = "fulin-squeezy-cache", key = "allMonitorUrl")
        public List<MonitorUrl> selectAllUrl() {
            log.info("    url  ");
            return monitorUrlMapper.selectAll();
        }
    
        @CacheEvict(value = "fulin-squeezy-cache", key = "allMonitorUrl")
        public void delUrlCache() {
            log.info("    url  ");
        }
    
    
        private MonitorService getProxy() {
            try {
                return (MonitorService) AopContext.currentProxy();
            } catch (IllegalStateException e) {
                log.error("     ", e);
                return this;
            }
        }
    
        public Mono<MonitorUrlInfo> create(Mono<MonitorUrlReq> just) {
            return just.flatMap(
                    r -> {
                        MonitorUrl param = new MonitorUrl();
                        param.setUrl(r.getUrl());
                        if (monitorUrlMapper.selectOne(param) != null) {
                            throw new IllegalArgumentException("  url   ");
                        }
                        monitorUrlMapper.insertSelective(param);
                        getProxy().delUrlCache();
                        MonitorUrlInfo info = BeanCopyUtil.objConvert(param, MonitorUrlInfo.class);
                        return Mono.just(info);
                    }
            );
        }
    
        public Mono<Long> del(Mono<Long> id) {
            return id.flatMap(
                    i -> {
                        MonitorUrl monitorUrl = monitorUrlMapper.selectByPrimaryKey(id);
                        if (monitorUrl == null) {
                            throw new IllegalArgumentException("  url   ");
                        }
    
                        monitorUrlMapper.deleteByPrimaryKey(i);
                        getProxy().delUrlCache();
                        return Mono.just(i);
                    }
            );
        }
    
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class MonitorData {
    
            String url;
            String time;
            Long reqNum;
            Long res5xxNum;
            Long errorNum;
            Long resTime;
    
            public MonitorData add(MonitorData data) {
                reqNum += data.reqNum;
                res5xxNum += data.res5xxNum;
                errorNum += data.errorNum;
                resTime += data.resTime;
                return this;
            }
        }
    
        @Data
        public static class MonitorParams {
            //   
            private LongAdder reqNum = new LongAdder();
            //5xx   
            private LongAdder res5xxNum = new LongAdder();
            //     (Api    )
            private LongAdder errorNum = new LongAdder();
            //    
            private LongAdder resTime = new LongAdder();
        }
    }