QPSフロー制御

7333 ワード


バグがある場合は、使用するslot qpsを事前に0にリセットする必要があります.タイマのリセット、またはパートナーのリセットを考慮できます.
 
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;



public class QPS {
	private SlotData[] slot=new SlotData[30];
	private String name;
	public QPS(String name){
		this.name=name;
		for (int i = 0; i < slot.length; i++) {
			slot[i]=new SlotData();
		}
	}
	public int increAndGetQps(){
		long now=System.currentTimeMillis()/1000;
		int index=(int) (now%slot.length);
		AtomicLong old=slot[index].getSecond();
		if (old.get()!=now) {
			if (old.compareAndSet(old.get(), now)) {
				slot[index].getQps().set(0);
			}
		}
		return slot[index].getQps().incrementAndGet();
	}
	public static void main(String[] args) throws InterruptedException {
		QPS qpsCtrl=new QPS("longji");
		for (int i = 0; i < 1000000; i++) {
			Thread.sleep(100);
			System.out.println(qpsCtrl.increAndGetQps());
		}
	}
	public class SlotData{
		AtomicInteger qps=new AtomicInteger(0);
		AtomicLong second=new AtomicLong(System.currentTimeMillis()/1000);
		public AtomicInteger getQps(){
			return qps;
		}
		public AtomicLong getSecond() {
			return second;
		}
		public void setSecond(AtomicLong second) {
			this.second = second;
		}
		public void setQps(AtomicInteger qps) {
			this.qps = qps;
		}
	}
	public String getName() {
		return name;
	}
}

 
改良版
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;




public class QPS {
    private SlotData[] slot=new SlotData[30];
    private String name;
    private static Logger logger=Logger.getLogger("qps");
    public QPS(String name){
        this.name=name;
        for (int i = 0; i < slot.length; i++) {
            slot[i]=new SlotData();
        }
    }
    public void reset(){
        logger.error("begin reset:"+name);
        long now=System.currentTimeMillis()/1000;
        for (int i = 0; i < slot.length; i++) {
            long time=slot[i].second.get();
            if (time!=0) {
                logger.error(name+"---"+new Date(time*1000)+"-qps--"+slot[i].getQps().get());
            }
            if (now-time>5) {
                if (slot[i].second.compareAndSet(time, 0)) {
                    slot[i].qps.set(0);
                }
            }
        }
        logger.error("end reset:"+name);
    }
    public int increAndGetQps(){
        long now=System.currentTimeMillis()/1000;
        int index=(int) (now%slot.length);
        AtomicLong old=slot[index].getSecond();
        long oldTime=old.get();
        if (oldTime!=now) {
            if (old.compareAndSet(oldTime, now)) {
                // reset, qps
                if (oldTime!=0) {
                    slot[index].getQps().set(0);
                }
            }
        }
        return slot[index].getQps().incrementAndGet();
    }
    
    public int getQps(){
        long now=System.currentTimeMillis()/1000;
        int index=(int) (now%slot.length);
        return slot[index].getQps().get();
    }
    public static void main(String[] args) throws InterruptedException {
        QPS qpsCtrl=new QPS("longji");
        for (int i = 0; i < 1000000; i++) {
            Thread.sleep(100);
            System.out.println(qpsCtrl.increAndGetQps());
        }
    }
    public class SlotData{
        AtomicInteger qps=new AtomicInteger(0);
        AtomicLong second=new AtomicLong(0);
        public AtomicInteger getQps(){
            return qps;
        }
        public AtomicLong getSecond() {
            return second;
        }
        public void setSecond(AtomicLong second) {
            this.second = second;
        }
        public void setQps(AtomicInteger qps) {
            this.qps = qps;
        }
    }
    public String getName() {
        return name;
    }
}


 
 
public class QPSFactory {
    private static Map<String, QPS> qpsMap = new HashMap<String, QPS>();
    private static Logger logger = LoggerFactory.getLogger("qpsLog");
    static {
        Thread resetTask = new Thread(new Runnable() {

            public void run() {
                while (true) {
                    try {
                        // ,qps slot , 。
                        Thread.sleep(5000);
                        for (Entry<String, QPS> entry : qpsMap.entrySet()) {
                            entry.getValue().reset();
                        }
                    } catch (Exception e) {
                        logger.error("reset error", e);
                    }
                }
            }
        });
        resetTask.setDaemon(true);
        resetTask.start();
    }

    synchronized public static QPS buildQPS(String name) {
        if (qpsMap.containsKey(name)) {
            return qpsMap.get(name);
        }
        QPS qps = new QPS(name);
        qpsMap.put(name, qps);
        return qps;
    }

}


 
 
 
 
リング無ロックキューに基づくqpsフロー制御
public class QPSCtrl {
    private long[] ringData;
    private AtomicLong count = new AtomicLong(0);

    public QPSCtrl(int maxQps) {
        ringData = new long[maxQps];
        for (int i = 0; i < ringData.length; i++) {
            ringData[i] = 0;
        }
    }
    public boolean needCtrl(){
        long now=System.currentTimeMillis();
        int slot=(int) (count.incrementAndGet()%ringData.length);
        if (now-ringData[slot]>1000) {
            ringData[slot]=now;
            return false;
        }else {
            return true;
        }
    }
    public void incre(){
        int slot=(int) (count.incrementAndGet()%ringData.length);
        long now=System.currentTimeMillis();
        ringData[slot]=now;
    }
    public static void main(String[] args) throws InterruptedException {
        QPSCtrl qps = new QPSCtrl(1000);
        int count=0;
        for (int i = 0; i < 1000000000; i++) {
            if (!qps.needCtrl()) {
                count++; 
            }
        }
        System.out.println(count);
    }

    

}