QPSフロー制御
7333 ワード
バグがある場合は、使用するslot qpsを事前に0にリセットする必要があります.タイマのリセット、またはパートナーのリセットを考慮できます.
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class QPS {
private SlotData[] slot=new SlotData[30];
private String name;
public QPS(String name){
this.name=name;
for (int i = 0; i < slot.length; i++) {
slot[i]=new SlotData();
}
}
public int increAndGetQps(){
long now=System.currentTimeMillis()/1000;
int index=(int) (now%slot.length);
AtomicLong old=slot[index].getSecond();
if (old.get()!=now) {
if (old.compareAndSet(old.get(), now)) {
slot[index].getQps().set(0);
}
}
return slot[index].getQps().incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {
QPS qpsCtrl=new QPS("longji");
for (int i = 0; i < 1000000; i++) {
Thread.sleep(100);
System.out.println(qpsCtrl.increAndGetQps());
}
}
public class SlotData{
AtomicInteger qps=new AtomicInteger(0);
AtomicLong second=new AtomicLong(System.currentTimeMillis()/1000);
public AtomicInteger getQps(){
return qps;
}
public AtomicLong getSecond() {
return second;
}
public void setSecond(AtomicLong second) {
this.second = second;
}
public void setQps(AtomicInteger qps) {
this.qps = qps;
}
}
public String getName() {
return name;
}
}
改良版
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
public class QPS {
private SlotData[] slot=new SlotData[30];
private String name;
private static Logger logger=Logger.getLogger("qps");
public QPS(String name){
this.name=name;
for (int i = 0; i < slot.length; i++) {
slot[i]=new SlotData();
}
}
public void reset(){
logger.error("begin reset:"+name);
long now=System.currentTimeMillis()/1000;
for (int i = 0; i < slot.length; i++) {
long time=slot[i].second.get();
if (time!=0) {
logger.error(name+"---"+new Date(time*1000)+"-qps--"+slot[i].getQps().get());
}
if (now-time>5) {
if (slot[i].second.compareAndSet(time, 0)) {
slot[i].qps.set(0);
}
}
}
logger.error("end reset:"+name);
}
public int increAndGetQps(){
long now=System.currentTimeMillis()/1000;
int index=(int) (now%slot.length);
AtomicLong old=slot[index].getSecond();
long oldTime=old.get();
if (oldTime!=now) {
if (old.compareAndSet(oldTime, now)) {
// reset, qps
if (oldTime!=0) {
slot[index].getQps().set(0);
}
}
}
return slot[index].getQps().incrementAndGet();
}
public int getQps(){
long now=System.currentTimeMillis()/1000;
int index=(int) (now%slot.length);
return slot[index].getQps().get();
}
public static void main(String[] args) throws InterruptedException {
QPS qpsCtrl=new QPS("longji");
for (int i = 0; i < 1000000; i++) {
Thread.sleep(100);
System.out.println(qpsCtrl.increAndGetQps());
}
}
public class SlotData{
AtomicInteger qps=new AtomicInteger(0);
AtomicLong second=new AtomicLong(0);
public AtomicInteger getQps(){
return qps;
}
public AtomicLong getSecond() {
return second;
}
public void setSecond(AtomicLong second) {
this.second = second;
}
public void setQps(AtomicInteger qps) {
this.qps = qps;
}
}
public String getName() {
return name;
}
}
public class QPSFactory {
private static Map<String, QPS> qpsMap = new HashMap<String, QPS>();
private static Logger logger = LoggerFactory.getLogger("qpsLog");
static {
Thread resetTask = new Thread(new Runnable() {
public void run() {
while (true) {
try {
// ,qps slot , 。
Thread.sleep(5000);
for (Entry<String, QPS> entry : qpsMap.entrySet()) {
entry.getValue().reset();
}
} catch (Exception e) {
logger.error("reset error", e);
}
}
}
});
resetTask.setDaemon(true);
resetTask.start();
}
synchronized public static QPS buildQPS(String name) {
if (qpsMap.containsKey(name)) {
return qpsMap.get(name);
}
QPS qps = new QPS(name);
qpsMap.put(name, qps);
return qps;
}
}
リング無ロックキューに基づくqpsフロー制御
public class QPSCtrl {
private long[] ringData;
private AtomicLong count = new AtomicLong(0);
public QPSCtrl(int maxQps) {
ringData = new long[maxQps];
for (int i = 0; i < ringData.length; i++) {
ringData[i] = 0;
}
}
public boolean needCtrl(){
long now=System.currentTimeMillis();
int slot=(int) (count.incrementAndGet()%ringData.length);
if (now-ringData[slot]>1000) {
ringData[slot]=now;
return false;
}else {
return true;
}
}
public void incre(){
int slot=(int) (count.incrementAndGet()%ringData.length);
long now=System.currentTimeMillis();
ringData[slot]=now;
}
public static void main(String[] args) throws InterruptedException {
QPSCtrl qps = new QPSCtrl(1000);
int count=0;
for (int i = 0; i < 1000000000; i++) {
if (!qps.needCtrl()) {
count++;
}
}
System.out.println(count);
}
}