Springboot統合websocketおよびtomcatクラスタ環境のwebsocket共有問題の解決
5033 ワード
ソリューション:redisメッセージパブリケーションサブスクリプションを使用して、複数のtomcatアプリケーションサーバの下で、接続が共有されない問題を解決します.具体的には
WebSocketServerの実装
リスニングクラスの設定
//傍受後の実行方法の実現
詳細はGitHubアドレスを参照
@Configuration
public class WebSocketConfig {
//TODEO tomcat, , , springboot tomcat
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketServerの実装
private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
// , 。 。
private static int onlineCount = 0;
//concurrent Set, MyWebSocket 。
private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet();
// ,
private Session session;
// userId
private String userId = "";
/**
*
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
webSocketSet.add(this); // set
addOnlineCount(); // 1
log.info(" :" + userId + ", " + getOnlineCount());
this.userId = userId;
try {
sendMessage(" ");
} catch (IOException e) {
log.error("websocket IO ");
}
}
/**
*
*/
@OnClose
public void onClose() {
boolean flag = webSocketSet.remove(this); // set
if (flag) {
subOnlineCount(); // 1
log.info(" ! " + getOnlineCount());
}
}
/**
*
*
* @param message
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info(" " + userId + " :" + message);
//
// for (WebSocketServer item : webSocketSet) {
// try {
// item.sendMessage(message);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(" :" + error.getMessage());
// error.printStackTrace();
}
/**
*
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
*
*/
public static boolean sendInfo(String message, @PathParam("userId") String userId) {
log.info(" " + userId + ", :" + message);
for (WebSocketServer item : webSocketSet) {
try {
// userId , null
// if (userId == null) {
// item.sendMessage(message);
// } else if (item.userId.equals(userId)) {
if (item.userId.equals(userId)) {
item.sendMessage(message);
return true;
}
} catch (IOException e) {
continue;
}
}
return false;
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
リスニングクラスの設定
@Configuration
public class RedisSubListenerConfig {
//
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(RedisKeyConstants.TOPIC_CHANNEL_SENDWEBSOCKET));
return container;
}
//
@Bean
MessageListenerAdapter listenerAdapter(OutpatientRedisDaoImpl redisReceiver) {
return new MessageListenerAdapter(redisReceiver, "sendMessageByOpen");
}
}
//傍受後の実行方法の実現
@Component("outpatientRedisDao")
public class OutpatientRedisDaoImpl {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void sendMessageByOpen(String message) {
logger.info("reids message:{}", message);
message = message.substring(1, message.length()-1);
String [] info = message.split(";");
JSONObject content = JSONObject.fromObject(info[1].replaceAll("\\\\\"", "\""));
WebSocketServer.sendInfo(content.toString(), info[0]);
}
}
詳細はGitHubアドレスを参照