JAVAカスタムEVENTBOS簡単実現
7653 ワード
package com.wosai.constant;
import com.wosai.data.util.CollectionUtil;
import com.wosai.eventbus.Event;
import com.wosai.eventbus.EventListener;
import com.wosai.eventbus.SimpleEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: eventBus
* @Author jerry
* Date 2020/1/2 5:43
**/
public class EventBus {
private static final Logger logger = LoggerFactory.getLogger(com.wosai.eventbus.EventBus.class);
private Map> listeners;
private com.wosai.eventbus.EventBus.EventLoop[] eventLoops;
private int maxQueueSizeForWarning = 5;
private int maxWarningCount = 10 ;
private AtomicInteger remainingWarningCount = new AtomicInteger(maxWarningCount) ;
private int concurrency = 0;
public EventBus(int concurrency) {
this.listeners = new HashMap>();
this.eventLoops = new com.wosai.eventbus.EventBus.EventLoop[concurrency];
for(int i=0; i listenerSet = listeners.get(event);
if (listenerSet == null) {
listenerSet = new LinkedHashSet();
listeners.put(event, listenerSet);
}
listenerSet.add(listener);
}
public void post(Event event) {
eventLoops[event.getKey()].postEvent(event);
}
private void dispatchEvent(final Event event) {
Set listenerSet = listeners.get(event.getName());
if (listenerSet != null) {
for(final EventListener listener: listenerSet) {
try {
listener.handle(event);
}catch(Throwable e){
// WARN: exception in event handler
logger.warn("Unexpected exception in event handler.", e);
}
}
}else{
// WARN: no event listener
logger.warn("Received event {} w/o listeners.", event.getName());
}
}
public void shutdown() {
for(com.wosai.eventbus.EventBus.EventLoop loop: eventLoops) {
loop.stop();
}
}
public Map getInfo(){
List list = new ArrayList();
if(eventLoops != null){
for (int i = 0; i < eventLoops.length; i++) {
com.wosai.eventbus.EventBus.EventLoop eventLoop = eventLoops[i];
int queueSize = eventLoop.eventQueue.size();
list.add(CollectionUtil.hashMap(
"index", i,
"queueSize", queueSize
));
}
}
return CollectionUtil.hashMap(
"eventLoops", list
);
}
class EventLoop {
private LinkedBlockingQueue eventQueue = new LinkedBlockingQueue();
private volatile boolean stopped = true;
private Thread _t;
public void start() {
if (!stopped) {
return;
}
stopped = false;
_t = new Thread(new Runnable() {
@Override
public void run() {
while(!stopped) {
try {
Event event = eventQueue.take();
dispatchEvent(event);
}
catch (InterruptedException e) {
// Continue next iteration.
}
}
logger.info("event loop stopped");
}
});
_t.setDaemon(true);
_t.setName("eventloop " + _t.getName());
_t.start();
}
public void stop() {
if (stopped)
return;
stopped = true;
_t.interrupt();
}
public void postEvent(Event e) {
int queueSize = eventQueue.size();
if(queueSize <= maxQueueSizeForWarning){
remainingWarningCount.set(maxWarningCount);
}else{
if(remainingWarningCount.getAndDecrement() > 0){
logger.warn("too many events in queue! current size: {} warning max: {}", queueSize, maxQueueSizeForWarning);
}
}
for (int i = 0; i<3; ++i) {
try {
eventQueue.put(e);
return;
} catch (InterruptedException ex) {
}
}
logger.error("lost event {}", e.getName());
}
}
//eventloop
public static class EventLoopMultiple{
private int start; // eventloop , 0
private int count; // eventloop
private int end; // eventloop
public EventLoopMultiple(int start, int count, int end) {
this.start = start;
this.count = count;
this.end = end;
}
public int getStart() {
return start;
}
public int getEnd() {
return end;
}
public int getCount() {
return count;
}
@Override
public String toString() {
return "EventLoopMultiple [start=" + start + ", end=" + end + ", count=" + count + "]";
}
}
public int getConcurrency() {
return concurrency;
}
public static void main(String[] args) throws Exception {
final com.wosai.eventbus.EventBus bus = new com.wosai.eventbus.EventBus();
bus.subscribe("success", new EventListener() {
@Override
public void handle(Event event) {
System.out.println("on event " + event + " " + Thread.currentThread().getId());
bus.post(new SimpleEvent("postSuccess", null));
bus.post(new SimpleEvent("final", 1, "first final"));
}
});
bus.subscribe("postSuccess", new EventListener() {
@Override
public void handle(Event event) {
System.out.println("on event " + event + " " + Thread.currentThread().getId());
bus.post(new SimpleEvent("final", "second final"));
}
});
bus.subscribe("final", new EventListener() {
@Override
public void handle(Event event) {
System.out.println("handler 1 on event " + event + " " + Thread.currentThread().getId());
}
});
bus.subscribe("final", new EventListener() {
@Override
public void handle(Event event) {
System.out.println("handler 2 on event " + event + " " + Thread.currentThread().getId());
}
});
bus.start();
bus.post(new SimpleEvent("success"));
Thread.sleep(2000);
bus.shutdown();
}
}