Springboot+netty+redisフレームワークサービス側の実現(二)------業務処理クラスと心拍数パケットタイムアウト処理、redisツールクラス、netty起動クラスおよび残留問題
132112 ワード
四、業務処理類とハートビートパックのタイムアウト処理
業務処理クラスは
すべてのhandlerに
2つの実装クラスの例では、1つは心拍数を返すことに成功し、1つは送信されたデータを返すことです.
五、redisツール類
六、netty起動クラス
現在、tomcatの
nettyサービスポートが占有されているため、閉じることができません
killコマンドを使用する場合、ログを表示します.
github: https://github.com/gavinL93/springboot_netty
業務処理クラスは
ChannelInboundHandlerAdapter
クラスを継承し、userEventTriggered
メソッドを再ロードすることで、心拍タイムアウトを実現できる設定コードは以下の通りである.public class ServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
private ChannelCache channelCache = SpringUtil.getBean(ChannelCache.class);
private static ConcurrentHashMap<ChannelId, Integer> channelIdleTime = new ConcurrentHashMap<ChannelId, Integer>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message) msg;
Result result = new Result();
// ,
if (message.getModule() != 1) {
if (channelCache.getChannel(ctx.channel()) == null) {
result = new Result(0, "need auth");
ctx.writeAndFlush(result);
return;
}
}
channelCache.addChannel(ctx.channel(), message.getUid());
result = MyAnnotionUtil.process(ctx, message);
log.info("result: " + result.toString());
ctx.writeAndFlush(result);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
log.warn("---READER_IDLE---" + dateFormat.format(new Date()));
ChannelId channelId = ctx.channel().id();
Integer times = channelIdleTime.get(channelId);
if (times == null) {
channelIdleTime.put(channelId, 1);
} else {
int num = times.intValue() + 1;
if (num >= Const.TIME_OUT_NUM) {
log.error("--- TIME OUT ---");
channelIdleTime.remove(channelId);
channelCache.removeChannel(ctx.channel());
ctx.close();
} else {
channelIdleTime.put(channelId, num);
}
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("exceptionCaught:" + cause.getMessage());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("====== channelInactive ======");
channelCache.removeChannel(ctx.channel());
ctx.close();
log.info("====== Channel close ======");
}
}
ServerHandler
クラスはspringによって管理されるのではなくnewによってServerChannelInitializer
によって構成されるため、SpringUtil
ツールクラスをカスタマイズしてspringによって管理されるbeanを取得する@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static void setAppCtx(ApplicationContext webAppCtx) {
if (webAppCtx != null) {
applicationContext = webAppCtx;
}
}
/**
* ApplicationContext Bean
*/
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) throws ClassNotFoundException {
return getApplicationContext().getBean(name, clazz);
}
public static final Object getBean(String beanName) {
return getApplicationContext().getBean(beanName);
}
public static final Object getBean(String beanName, String className) throws ClassNotFoundException {
Class<?> clz = Class.forName(className);
return getApplicationContext().getBean(beanName, clz.getClass());
}
public static boolean containsBean(String name) {
return getApplicationContext().containsBean(name);
}
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return getApplicationContext().isSingleton(name);
}
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return getApplicationContext().getType(name);
}
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return getApplicationContext().getAliases(name);
}
}
MyAnnotionUtil
ツールクラスは、注釈で対応するクラスにデータを転送して処理します.@SuppressWarnings({ "rawtypes", "unchecked" })
@Component
@DependsOn("springUtil")
public class MyAnnotionUtil {
//
private static Map<Integer, Object> controllerClasses = new HashMap<>();
static {
System.out.println("MyAnnotionUtil static");
// , Controller
List<Class> clzes = ClassUtil.parseAllController("cn.ybt.netty.handler");
//
for (Class clz : clzes) {
//
if (clz.isAnnotationPresent(Module.class)) {
try {
//
Module annotation = (Module) clz.getAnnotation(Module.class);
// value
int value = annotation.module();
Object obj = controllerClasses.get(value);
if (obj == null) {
// obj = applicationContext.getBean(clz);
// obj = clz.newInstance();
obj = SpringUtil.getBean(clz);
controllerClasses.put(value, obj);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public static Result process(ChannelHandlerContext ctx, Message msg) {
try {
Object obj = controllerClasses.get(msg.getModule());
if (obj == null) {
return new Result(0, "fail", msg.toString());
}
Method method = obj.getClass().getMethod("process", ChannelHandlerContext.class, Message.class);
Object message = method.invoke(obj, ctx, msg);
return (Result) message;
} catch (Exception e) {
return new Result(0, "fail", msg.toString());
}
}
}
ClassUtil
は、すべてのhandlerパスの下にあるクラスを取得するためです.@SuppressWarnings("rawtypes")
public class ClassUtil {
public static List<Class> parseAllController(String basePackage) {
List<Class> clzes = new ArrayList<>();
String path = basePackage.replace(".", "/");
//
URL url = Thread.currentThread().getContextClassLoader().getResource(path);
File file = new File(url.getPath());
getClass(file, clzes, basePackage);
return clzes;
}
private static void getClass(File file, List<Class> clzes, String packAgeName) {
//
if (file.exists()) {
//
if (file.isFile()) {
try {
String className = null;
if (packAgeName.contains(".class")) {
className = packAgeName.replace(".class", "");
} else {
className = (packAgeName + "." + file.getName()).replace(".class", "");
}
clzes.add(Class.forName(className));
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
//
if (file.isDirectory()) {
File[] files = file.listFiles();
for (File f : files) {
String packAge = packAgeName + "." + f.getName();
getClass(f, clzes, packAge);
}
}
}
}
}
Module
注記は簡単で、モジュール番号を表すパラメータを定義します.@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Module {
//
int module();
}
すべてのhandlerに
process
メソッドがあり、パラメータがChannelHandlerContext ctx
およびMessage message
であるように、抽象クラスが定義されています.public abstract class BaseHandler {
@Autowired
protected MyRedisService jxRedisService;
@Autowired
protected LoginRedisService loginRedisService;
public abstract Result process(ChannelHandlerContext ctx, Message message);
}
2つの実装クラスの例では、1つは心拍数を返すことに成功し、1つは送信されたデータを返すことです.
@Module(module = 0)
@Component
public class HeartbeatHandler extends BaseHandler {
private static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class);
@Override
public Result process(ChannelHandlerContext ctx, Message message) {
log.info("heartbeat...");
return new Result(1, "heartbeat success...");
}
}
@Module(module = 99)
@Component
public class EchoHandler extends BaseHandler {
@Override
public Result process(ChannelHandlerContext ctx, Message message) {
return new Result(1, "success...", JSON.toJSONString(message));
}
}
五、redisツール類
MyRedisService
ユーザとチャネルの関係を処理するために使用されるLoginRedisService
は、ユーザ登録用のtokenの読み取りのみを担当するpublic class MyRedisService {
@Resource(name = "myRedisTemplate")
private RedisTemplate<String, Object> redisTemplate;
@Value(value = "${redis.cache.key.prefix}")
private String userKeyPrefix;
@Value(value = "${redis.cache.expireSeconds}")
private long expireSeconds;
public String getUserKeyPrefix() {
return userKeyPrefix;
}
public long getExpireSeconds() {
return expireSeconds;
}
public String flushDb(){
return redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
connection.flushDb();
return "ok";
}
});
}
/**
*
*
* @param key
* @param time ( )
* @return
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* key
*
* @param key null
* @return ( ) 0
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* key
*
* @param key
* @return true false
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
*
*
* @param key
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
*
*
* @param key
* @return
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
*
*
* @param key
* @param value
* @return true false
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
*
*
* @param key
* @param value
* @param time ( ) time 0 time 0
* @return true false
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
*
*
* @param key
* @param by ( 0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException(" 0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
*
*
* @param key
* @param by ( 0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException(" 0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
*
* @param key null
* @param item null
* @return
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* hashKey
*
* @param key
* @return
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key
* @param map
* @return true false
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet
*
* @param key
* @param map
* @param time ( )
* @return true false
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* hash ,
*
* @param key
* @param item
* @param value
* @return true false
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* hash ,
*
* @param key
* @param item
* @param value
* @param time ( ) : hash ,
* @return true false
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* hash
*
* @param key null
* @param item null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* hash
*
* @param key null
* @param item null
* @return true false
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash ,
*
* @param key
* @param item
* @param by ( 0)
* @return
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash
*
* @param key
* @param item
* @param by ( 0)
* @return
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* key Set
*
* @param key
* @return
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* value set ,
*
* @param key
* @param value
* @return true false
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* set
*
* @param key
* @param values
* @return
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* set
*
* @param key
* @param time ( )
* @param values
* @return
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* set
*
* @param key
* @return
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* value
*
* @param key
* @param values
* @return
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* list
*
* @param key
* @param start
* @param end 0 -1
* @return
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* list
*
* @param key
* @return
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* list
*
* @param key
* @param index index>=0 , 0 ,1 , ;index<0 ,-1, ,-2 ,
* @return
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* list
*
* @param key
* @param value
* @param time ( )
* @return
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* list
*
* @param key
* @param value
* @param time ( )
* @return
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* list
*
* @param key
* @param value
* @param time ( )
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* list
*
* @param key
* @param value
* @param time ( )
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* list
*
* @param key
* @param index
* @param value
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* N value
*
* @param key
* @param count
* @param value
* @return
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
@Configuration
public class LoginRedisService {
@Resource(name = "loginRedisTemplate")
private RedisTemplate<String, Object> redisTemplate;
@Value("${redis.login.prefix}")
String tokenPre;
// ============================String=============================
/**
* token
*
* @param key
* @return
*/
public Object getToken(int uid) {
String key = tokenPre + uid;
return key == null ? null : redisTemplate.opsForValue().get(key);
}
}
六、netty起動クラス
@WebListener
public class NettyRunServletContextListener implements ServletContextListener {
private static final Logger logger = LoggerFactory.getLogger(NettyRunServletContextListener.class);
@Value("${netty.port}")
private int port;
@Value("${netty.url}")
private String url;
@Autowired
private NettyConfig nettyConfig;
@Override
public void contextDestroyed(ServletContextEvent sce) {
System.out.println("====== springboot netty destroy ======");
nettyConfig.destroy();
System.out.println("---test contextDestroyed method---");
}
@Override
public void contextInitialized(ServletContextEvent sce) {
WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext())
.getAutowireCapableBeanFactory().autowireBean(this);
try {
InetSocketAddress address = new InetSocketAddress(url, port);
ChannelFuture future = nettyConfig.run(address);
logger.info("====== springboot netty start ======");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("---nettyConfig destroy---");
nettyConfig.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
logger.error("---springboot netty server start error : ", e.getMessage() + "---");
}
}
}
現在、tomcatの
shutdown.sh
命令を使用すると、tomcatを正常に閉じることができず、ヒントが表示されます.Using CATALINA_BASE: /usr/local/tomcat
Using CATALINA_HOME: /usr/local/tomcat
Using CATALINA_TMPDIR: /usr/local/tomcat/temp
Using JRE_HOME: /usr/local/jdk1.8.0_66
Using CLASSPATH: /usr/local/tomcat/bin/bootstrap.jar:/usr/local/tomcat/bin/tomcat-juli.jar
Jul 12, 2019 6:51:22 PM org.apache.catalina.startup.Catalina stopServer
SEVERE: Could not contact [localhost:[8080]]. Tomcat may not be running.
Jul 12, 2019 6:51:22 PM org.apache.catalina.startup.Catalina stopServer
SEVERE: Catalina.stop:
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.(Socket.java:434)
at java.net.Socket.(Socket.java:211)
at org.apache.catalina.startup.Catalina.stopServer(Catalina.java:504)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.catalina.startup.Bootstrap.stopServer(Bootstrap.java:406)
at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:498)
nettyサービスポートが占有されているため、閉じることができません
killコマンドを使用する場合、ログを表示します.
[2019-07-12 18:58:34.615][INFO ][][ org.my.netty.NettyRunServletContextListener$1.run(NettyRunServletContextListener.java:51)
] ==> ---nettyConfig destroy---
[2019-07-12 18:58:34.617][INFO ][][ org.my.netty.config.NettyConfig.destroy(NettyConfig.java:60)
] ==> Shutdown Netty Server...
[2019-07-12 18:58:34.669][INFO ][][ org.springframework.scheduling.concurrent.ExecutorConfigurationSupport.shutdown(ExecutorConfigurationSupport.java:208)
] ==> Shutting down ExecutorService 'applicationTaskExecutor'
[2019-07-12 18:58:34.790][INFO ][][ io.lettuce.core.EpollProvider.(EpollProvider.java:64)
] ==> Starting with epoll library
[2019-07-12 18:58:34.794][INFO ][][ io.lettuce.core.KqueueProvider.(KqueueProvider.java:70)
] ==> Starting without optional kqueue library
[2019-07-12 18:58:34.951][INFO ][][ org.my.netty.config.NettyConfig.destroy(NettyConfig.java:67)
] ==> Shutdown Netty Server Success!
contextDestroyed
メソッドは実行されず、フックメソッドが実行されただけで、nettyを正常に閉じる方法を検討する必要があります.github: https://github.com/gavinL93/springboot_netty