APIゲートウェイ監視の簡単な実現
記事の目次一.需要 二.考え方 三.実現 1. filter&factory 2. service
一.需要
APIゲートウェイ統計:呼び出し量、応答時間、応答コードなど、時間粒度は分(アリクラウドゲートウェイのいくつかのパラメータを参照)
二.考え方まず、フレームワークは 各ホストが要求したデータをローカルメモリに統計し、ネットワークioがインタフェースに影響しないようにする 使用 使用 タイミングタスク:毎分各ホストが同期してredisに加算 まず1つ使用 インタフェースurlごとに1個使用 各ホストは分散ロックで同期し、分散キャッシュに保存する タイミングタスク:1時間あたりredisからデータベースへの永続化 三.実現
依存:
一.需要
APIゲートウェイ統計:呼び出し量、応答時間、応答コードなど、時間粒度は分(アリクラウドゲートウェイのいくつかのパラメータを参照)
二.考え方
Spring Cloud Gateway
ルーティングによるfilterの統計LongAdder
カウントConcurretHashMap
各インタフェースの統計を保存redis set
全てのインタフェースurlを格納redis hash
統計データを格納keyは時間(分)、valueは統計依存:
Redisson
、spring-cloud-gateway
、spring-webflux
1. filter&factory /**
*
* 1.
* 2.
* 3. ( )
* 4. (Api )
*/
@Slf4j
@Getter
@Component
public class MonitorFilter implements GatewayFilter, Ordered {
private static final String START_TIME = "MonitorStartTime";
@Autowired
MonitorService monitorService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String url = exchange.getRequest().getURI().getPath();
MonitorService.MonitorParams params = monitorService.getMonitorParams(url);
LongAdder reqNum = params.getReqNum();
LongAdder res5xxNum = params.getRes5xxNum();
LongAdder errorNum = params.getErrorNum();
LongAdder resTime = params.getResTime();
reqNum.increment();
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
return chain.filter(exchange).doOnError(e -> {
//
errorNum.increment();
}).then(
Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(START_TIME);
if (startTime != null) {
//
resTime.add(System.currentTimeMillis() - startTime);
//5xx
if (exchange.getResponse().getStatusCode().is5xxServerError()) {
res5xxNum.increment();
}
}
})
);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
@Component
public class MonitorGatewayFilterFactory extends AbstractGatewayFilterFactory {
@Autowired
MonitorFilter monitorFilter;
public GatewayFilter apply() {
return apply(o -> {
});
}
@Override
public GatewayFilter apply(Object config) {
return monitorFilter;
}
}
2. service @Service
@Slf4j
public class MonitorService {
private Map<String, MonitorParams> map = new ConcurrentHashMap<>();
@Autowired
RedissonClient redissonClient;
@Autowired
MonitorRecordMapper monitorRecordMapper;
@Autowired
MonitorUrlMapper monitorUrlMapper;
public static final String LOCK_MONITOR_FLUSH = "lock_monitor_flush_";
public static final String LOCK_MONITOR_SAVE = "lock_monitor_save_";
public static final String CACHE_MONITOR = "cache_monitor";
public MonitorParams getMonitorParams(String url) {
MonitorParams params = map.getOrDefault(url, new MonitorParams());
map.putIfAbsent(url, params);
return params;
}
/**
*
*
* @param time
*/
public void flush(String time) {
log.info(" :{}", time);
Flux.fromIterable(map.entrySet())
.publishOn(Schedulers.parallel())
.subscribe(entry -> {
log.info("flush");
//url , (/)
String url = entry.getKey().replaceAll("/$", "");
// url
String lockKey = LOCK_MONITOR_FLUSH + url;
RLock lock = redissonClient.getLock(lockKey);
try {
lock.lock();
TimeUnit.SECONDS.sleep(1);
//
MonitorParams params = entry.getValue();
MonitorData data = new MonitorData(url, time,
params.getReqNum().sumThenReset(),
params.getRes5xxNum().sumThenReset(),
params.getErrorNum().sumThenReset(),
params.getResTime().sumThenReset());
//
if (!isMonitorUrl(url)) {
return;
}
// ,
if (data.getReqNum() == 0) {
return;
}
// url
String cacheKey = String.format("%s_%s", CACHE_MONITOR, url);
// key
RSet<String> cacheList = redissonClient.getSet(CACHE_MONITOR);
cacheList.add(cacheKey);
// redis
RMap<String, MonitorData> cacheMap = redissonClient.getMap(cacheKey);
MonitorData result = cacheMap.putIfAbsent(time, data);
if (result != null) {
//
cacheMap.put(time, data.add(result));
}
} catch (Exception e) {
log.error(" :" + url, e);
} finally {
lock.unlock();
}
});
log.info(" :{}", time);
}
/**
*
*
* @param time
*/
public void save(String time) {
RLock lock = redissonClient.getLock(LOCK_MONITOR_SAVE + time);
//
if (!lock.tryLock()) {
log.info(" ");
return;
}
log.info(" :{}", time);
try {
RSet<String> cacheList = redissonClient.getSet(CACHE_MONITOR);
if (cacheList == null) {
return;
}
Flux.fromIterable(cacheList)
.publishOn(Schedulers.newParallel("monitor_save"))
.subscribe(cacheKey -> {
RMap<String, MonitorData> map = redissonClient.getMap(cacheKey);
map.entrySet()
.forEach(entry -> {
String key = entry.getKey();
try {
MonitorData value = entry.getValue();
log.info("{}->{}", key, value);
if (value != null) {
//
insertData(value);
map.remove(key);
}
} catch (Exception e) {
log.error(String.format(" :%s-%s", cacheKey, key), e);
}
});
});
} finally {
lock.unlock();
}
log.info(" :{}", time);
}
private void insertData(MonitorData data) {
MonitorRecord s = new MonitorRecord();
s.setTime(Long.parseLong(data.getTime()));
s.setUrl(data.getUrl());
s.setReqNum(data.getReqNum());
s.setErrorNum(data.getErrorNum());
s.setRes5xxNum(data.getRes5xxNum());
s.setResTime(data.getResTime());
monitorRecordMapper.insertSelective(s);
}
/**
* url
*
* @param url
* @return
*/
private boolean isMonitorUrl(String url) {
List<MonitorUrl> list = getProxy().selectAllUrl();
return list == null ? false : list.contains(url);
}
@Cacheable(value = "fulin-squeezy-cache", key = "allMonitorUrl")
public List<MonitorUrl> selectAllUrl() {
log.info(" url ");
return monitorUrlMapper.selectAll();
}
@CacheEvict(value = "fulin-squeezy-cache", key = "allMonitorUrl")
public void delUrlCache() {
log.info(" url ");
}
private MonitorService getProxy() {
try {
return (MonitorService) AopContext.currentProxy();
} catch (IllegalStateException e) {
log.error(" ", e);
return this;
}
}
public Mono<MonitorUrlInfo> create(Mono<MonitorUrlReq> just) {
return just.flatMap(
r -> {
MonitorUrl param = new MonitorUrl();
param.setUrl(r.getUrl());
if (monitorUrlMapper.selectOne(param) != null) {
throw new IllegalArgumentException(" url ");
}
monitorUrlMapper.insertSelective(param);
getProxy().delUrlCache();
MonitorUrlInfo info = BeanCopyUtil.objConvert(param, MonitorUrlInfo.class);
return Mono.just(info);
}
);
}
public Mono<Long> del(Mono<Long> id) {
return id.flatMap(
i -> {
MonitorUrl monitorUrl = monitorUrlMapper.selectByPrimaryKey(id);
if (monitorUrl == null) {
throw new IllegalArgumentException(" url ");
}
monitorUrlMapper.deleteByPrimaryKey(i);
getProxy().delUrlCache();
return Mono.just(i);
}
);
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class MonitorData {
String url;
String time;
Long reqNum;
Long res5xxNum;
Long errorNum;
Long resTime;
public MonitorData add(MonitorData data) {
reqNum += data.reqNum;
res5xxNum += data.res5xxNum;
errorNum += data.errorNum;
resTime += data.resTime;
return this;
}
}
@Data
public static class MonitorParams {
//
private LongAdder reqNum = new LongAdder();
//5xx
private LongAdder res5xxNum = new LongAdder();
// (Api )
private LongAdder errorNum = new LongAdder();
//
private LongAdder resTime = new LongAdder();
}
}