Spring websocketプッシュ

10505 ワード

Spring websocket使用、spring公式サイトに対応するdemoがあります.ここでは主に対応するブラウザクライアントにプッシュすることを説明します.
  • 基本的なブロック構成処理
  • @Component
    public class MessageHandshakeInterceptor implements HandshakeInterceptor {
    
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Map attributes) throws Exception {
            if (request instanceof ServletServerHttpRequest) {
                ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
                HttpSession session = servletRequest.getServletRequest().getSession(false);
                if (session != null) {
                    User user = (User) session.getAttribute("user");
                    Integer userId = (Integer) session.getAttribute("userId");
                    if(null !=user){
                        attributes.put("user", user);
                        attributes.put("userId", userId);
                    }
                }
            }
            return true;
        }
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Exception exception) {
    
        }
    }
    
  • 構成サービス側は、要求に応じて対応するhandlerを使用して
  • を処理する.
    @Configuration
    @EnableWebMvc
    @EnableWebSocket
    public class NotifyWebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
        @Autowired
        private NotifyHandlerNew noifyHandler;
        @Autowired
        private MessageHandshakeInterceptor notifyHandshakeInterceptor;
        @Bean
        public DefaultHandshakeHandler handshakeHandler() {
            return new DefaultHandshakeHandler();
        }
    //        
        @Bean
        public ServletServerContainerFactoryBean createWebSocketContainer() {
            ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
            container.setMaxTextMessageBufferSize(8192);
            container.setMaxBinaryMessageBufferSize(5 * 1024 * 1024);
            return container;
        }
    
        //  
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
                registry.addHandler(this.noifyHandler, "pushMsg")
                .setAllowedOrigins("*")
                        .addInterceptors(notifyHandshakeInterceptor).setHandshakeHandler(handshakeHandler());
                registry.addHandler(this.noifyHandler, "/sockjs/pushMsg")
                .setAllowedOrigins("*")     .addInterceptors(notifyHandshakeInterceptor).setHandshakeHandler(handshakeHandler()).withSockJS();
        }
    }
    
  • 対応handler処理
  • @Component
    public class NotifyHandlerNew extends TextWebSocketHandler {
        //       
        private static final Map users = new ConcurrentHashMap<>();
        private static Logger logger = Logger.getLogger(NotifyHandlerNew.class);
        //     
        private static final String CLIENT_ID = "userId";
        @Resource
        private PushMsgDao pushMsgDao;
        @Resource
        private  EnvMapper envMapper;
        
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            Integer userId = getClientId(session);
            logger.info(userId+":      ");
            if (userId != null) {
                users.put(userId, session);
    //          session.sendMessage(new TextMessage("    socket  "));
            }
        }
    
    //       ,    
        @Override
        public void handleTextMessage(WebSocketSession session, TextMessage message) {
            try {
                String receiver = message.getPayload();
                        Map result = new HashMap<>();
                        result.put("all", "hello");
                        TextMessage mes = new TextMessage(JSONObject.toJSONString(result));
                        session.sendMessage(mes);
                }
            } catch (IOException | SQLException e) {
                e.printStackTrace();
            }
        }
    
        /**
         *          
         * 
         * @param clientId
         * @param message
         * @return
         */
        public boolean sendMessageToUser(Integer clientId, TextMessage message) {
            if (users.get(clientId) == null)
                return false;
            WebSocketSession session = users.get(clientId);
            if (!session.isOpen())
                return false;
            try {
                //       
                List msgs = pushMsgDao.getPushMsg(new PushMsg(clientId));
                Map> result = new HashMap<>();
                result.put("all", msgs);
                result.put("new", new ArrayList<>());
                TextMessage mes = new TextMessage(JSONObject.toJSONString(result));
                session.sendMessage(mes);
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return true;
        }
    
        /**
         *     
         * 
         * @param message
         * @return
         */
        public boolean sendMessageToAllUsers(TextMessage message) {
            boolean allSendSuccess = true;
            Set clientIds = users.keySet();
            WebSocketSession session = null;
            for (Integer clientId : clientIds) {
                try {
                    session = users.get(clientId);
                    if (session.isOpen()) {
                        session.sendMessage(message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    allSendSuccess = false;
                }
            }
    
            return allSendSuccess;
        }
    
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            if (session.isOpen()) {
                session.close();
            }
            logger.error("    ");
            users.remove(getClientId(session));
        }
    
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
            logger.warn("     :" + status);
            if( null != getClientId(session))
                users.remove(getClientId(session));
        }
    
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
    
        /**
         *       
         * 
         * @param session
         * @return
         */
        private Integer getClientId(WebSocketSession session) {
            try {
                Integer clientId = (Integer) session.getAttributes().get(CLIENT_ID);
                return clientId;
            } catch (Exception e) {
                return null;
            }
        }
    }
    

    servlet 3.0を使用した後、サービス側でサポートされているjarパッケージをインポートする必要があります.
    
                org.apache.tomcat.embed
                tomcat-embed-websocket
                8.5.23
            
    

    ソース・ネットワークの使用例:
    @ServerEndpoint(value="/myHandler",configurator = SpringConfigurator.class)
    public class Progress
    {
        private Session session;  
        private static final Random random = new Random();  
        private Timer timer = null;  
    //              
        private static final ByteBuffer stopbuffer  = ByteBuffer.wrap(new byte[]{1, 9, 2, 0, 1, 5, 1, 6});  
        
        
          
        /**  
         *          
         * @param session  
           */
        @OnOpen  
        public void start(Session session) {  
            this.session = session;  
            try {  
                System.out.println("open");  
                if (session.isOpen()) {  
    //                        。 2       。  
                    timer = new Timer(true);  
                    timer.schedule(task, 1000, 2000);  
                }  
            } catch (Exception e) {  
                try {  
                    session.close();  
                } catch (IOException e1) {}  
            }  
        }  
      
        /**  
         *          
         * @param session  
         * @param msg        
         * @param last  
         */  
        @OnMessage  
        public void echoTextMessage(Session session, String msg, boolean last) {  
            try {  
                if (session.isOpen()) {  
                    System.out.println("string:" + msg);  
                    session.getBasicRemote().sendText(msg, last);  
                }  
            } catch (IOException e) {  
                try {  
                    session.close();  
                } catch (IOException e1) {  
    //                 Ignore  
                }  
            }  
        }  
      
        /**  
         *          
         * @param session  
         * @param bb        
         * @param last  
         */  
        @OnMessage  
        public void echoBinaryMessage(Session session, ByteBuffer bb, boolean last) {  
            try {  
                if (session.isOpen()) {  
    //                         ,         
                    if (bb.compareTo(stopbuffer) == 0) {  
                        if (timer != null) {  
                            timer.cancel();  
                        }  
                    } else {  
                        session.getBasicRemote().sendBinary(bb, last);  
                    }  
                }  
            } catch (IOException e) {  
                try {  
                    session.close();  
                } catch (IOException e1) {  
    //                 Ignore  
                }  
            }  
        }  
          
        /**  
         *   pong     。  
         *  
         * @param pm    Ignored.  
         */  
        @OnMessage  
        public void echoPongMessage(PongMessage pm) {  
    //              
        } 
        
          
        @OnClose  
        public void end(Session session) {  
            try {  
                System.out.println("close");  
                if (timer != null) {  
                    timer.cancel();  
                }  
            } catch(Exception e) {  
            }  
        }  
          
        /*  
         *         
         */  
        public void sendLong(long param) {  
            try {  
                if (session.isOpen()) {  
                    this.session.getBasicRemote().sendText(String.valueOf(param));  
                }  
            } catch (IOException e) {  
                try {  
                    this.session.close();  
                } catch (IOException e1) {}  
            }  
        }  
          
        /**  
         *     。     。  
         */  
        TimerTask task = new TimerTask() {  
            public void run() {     
                long param = random.nextInt(100);  
                sendLong(param);  
            }     
        };  
        
    }
    
  • は、上記の3つのステップを経て、基本的にブラウザとサービス側が接続を確立し、メッセージミドルウェアを使用する場合はspringboot統合rabbitmq
  • を参照することができる.