Springboot Websocket Stopニュース購読プッシュ


背景が必要です
雑談は抜きにしてテーマに直行する。ウェブフロントエンドとの長いリンクが必要で、互いにリアルタイムで通信するため、必要に応じてWeb socketを思い出しました。後はユーザーがテーマを購読し、メッセージの正確なプッシュを実現し、購読などを行う必要があります。STOMP(Simple Text-Orintated Messaging Protocol)がメッセージに向かう簡単なテキストプロトコルを思い出しました。
websocketプロトコル
前に書いたwebsocketの長いリンクのdemoを思い出しました。コードも貼って参考にしてください。
pomファイル
spring-book-starter-websocketを直接導入すればいいです。

    	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

声明websocket endpoint

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @ClassName WebSocketConfig
 * @Author scott
 * @Date 2021/6/16
 * @Version V1.0
 **/
@Configuration
public class WebSocketConfig {

    /**
     *     ServerEndpointExporter, Bean       @ServerEndpoint     websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
websocket実現クラスは、注釈によって各種イベントを傍受し、プッシュメッセージなどの関連ロジックを実現しています。

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: DataTypePushWebSocket
 * @Author: scott
 * @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {

    private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);

    /**
     *          
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
            .initialCapacity(10)
            .maximumSize(300)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();

    /**
     *            
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token")String token) {
        String sessionId = session.getId();
        onlineCount.incrementAndGet(); //     1
        this.sendMessage("sessionId:" + sessionId +",   server    ", session);
        SESSION_CACHE.put(sessionId,session);
        log.info("      :{},        :{}", session.getId(), onlineCount.get());
    }

    /**
     *          
     */
    @OnClose
    public void onClose(Session session,@PathParam("token")String token) {
        onlineCount.decrementAndGet(); //     1
        SESSION_CACHE.invalidate(session.getId());
        log.info("      :{},        :{}", session.getId(), onlineCount.get());
    }

    /**
     *              
     *
     * @param message           
     */
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("token")String token) {
        log.info("        [{}]   :{}", session.getId(), message);
        this.sendMessage("          :" + message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("    ");
        error.printStackTrace();
    }

    /**
     *            
     */
    private static void sendMessage(String message, Session toSession) {
        try {
            log.info("       [{}]    {}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("             :{}", e);
        }
    }

    public static AjaxResult sendMessage(String message, String sessionId){
        Session session = SESSION_CACHE.getIfPresent(sessionId);
        if(Objects.isNull(session)){
            return AjaxResult.error("token   ");
        }
        sendMessage(message,session);
        return AjaxResult.success();
    }

    public static AjaxResult sendBroadcast(String message){
        long size = SESSION_CACHE.size();
        if(size <=0){
            return AjaxResult.error("         ,      ");
        }
        ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
        Set<String> keys = sessionConcurrentMap.keySet();
        for (String key : keys) {
            Session session = SESSION_CACHE.getIfPresent(key);
            DataTypePushWebSocket.sendMessage(message,session);
        }

        return AjaxResult.success();

    }

}
これでwebsocketのサービスコードはすでに完成しました。
stompプロトコル
フロントエンドコードです。これはあるvueプロジェクトに書かれたjsです。皆さんは自分で直します。ここでSettings.wsPathはバックエンド定義のwsアドレスであり、例えば、ws:/local host:9003/ws

import Stomp from 'stompjs'
import Settings from '@/settings.js'

export default {
  //            
  debug:true,
  //        
  stompClient:{},
  //    
  init(callBack){
    this.stompClient = Stomp.client(Settings.wsPath)
    this.stompClient.hasDebug = this.debug
    this.stompClient.connect({},suce =>{
      this.console("    ,     ↓")
      this.console(this.stompClient)
      if(callBack){
        callBack()
      }
    },err => {
      if(err) {
        this.console("    ,     ↓")
        this.console(err)
      }
    })
  },
  //   
  sub(address,callBack){
    if(!this.stompClient.connected){
      this.console("    ,    ")
      return
    }
    //    id
    let timestamp= new Date().getTime() + address
    this.console("     -> "+address)
    this.stompClient.subscribe(address,message => {
      this.console(address+"       ,     ↓")
      this.console(message)
      let data = message.body
      callBack(data)
    },{
      id: timestamp
    })
  },
  unSub(address){
    if(!this.stompClient.connected){
      this.console("    ,       -> "+address)
      return
    }
    let id = ""
    for(let item in this.stompClient.subscriptions){
      if(item.endsWith(address)){
        id = item
        break
      }
    }
    this.stompClient.unsubscribe(id)
    this.console("       -> id:"+ id + " address:"+address)
  },
  //     
  disconnect(callBack){
    if(!this.stompClient.connected){
      this.console("    ,      ")
      return
    }
    this.stompClient.disconnect(() =>{
      console.log("    ")
      if(callBack){
        callBack()
      }
    })
  },
  //     
  reconnect(time){
    setInterval(() =>{
      if(!this.stompClient.connected){
        this.console("     ...")
        this.init()
      }
    },time * 1000)
  },
  console(msg){
    if(this.debug){
      console.log(msg)
    }
  },
  //        
  send(address,msg) {
    this.stompClient.send(address,{},msg)
  }
}
バックエンドはstomp configで、中にはコメントがあります。詳しく書いてあります。そして、私はフロントエンドとのドキドキping pongを入れました。

package com.cn.scott.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @ClassName: WebSocketStompConfig
 * @Author: scott
 * @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    private static long HEART_BEAT=10000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //    socketJs    ,    webSocket,    
        //               
        //ws://127.0.0.1:port/ws      WebSocket  
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();
        //     STOMP       mq     
        //  Broker  ,/user           ,/topic         
        //setHeartbeatValue          
        registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
        //          ,     ,    /user/
        registry.setUserDestinationPrefix("/user/");
    }
}
バックエンドstompプロトコル受信、購読などの動作通知

package com.cn.scott.ws;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName StompSocketHandler
 * @Author scott
 * @Date 2021/6/30
 * @Version V1.0
 **/
@RestController
public class StompSocketHandler {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    /**
    * @MethodName: subscribeMapping
     * @Description:       
     * @Param: [id]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @SubscribeMapping("/user/{id}/listener")
    public void subscribeMapping(@DestinationVariable("id") final long id) {
        System.out.println(">>>>>>  :"+id +",   ");
        SubscribeMsg param = new SubscribeMsg(id,String.format("  【%s】     ", id));
        sendToUser(param);
    }


    /**
    * @MethodName: test
     * @Description:     topic  
     * @Param: [id, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @MessageMapping(value = "/user/{id}/listener")
    public void UserSubListener(@DestinationVariable long  id, String msg) {
        System.out.println("     :" +id+",   ");
        SubscribeMsg param = new SubscribeMsg(id,String.format("     【%s】    【%s】", id,msg));
        sendToUser(param);
    }
    
     @GetMapping("/refresh/{userId}")
    public void refresh(@PathVariable Long userId, String msg) {
        StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("      【%s】    【%s】", userId,msg));
        sendToUser(param);
    }

    /**
    * @MethodName: sendToUser
     * @Description:          
     * @Param: [userId]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendToUser(SubscribeMsg screenChangeMsg){
        //         。。。
        simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
    }

    /**
    * @MethodName: sendBroadCast
     * @Description:     ,          
     * @Param: [topic, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendBroadCast(String topic,String msg){
        simpMessagingTemplate.convertAndSend(topic,msg);
    }


    /**
     * @ClassName: SubMsg
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public static class SubscribeMsg {
        private Long userId;
        private String msg;
        public SubscribeMsg(Long UserId, String msg){
            this.userId = UserId;
            this.msg = msg;
        }
        public Long getUserId() {
            return userId;
        }
        public String getMsg() {
            return msg;
        }
    }
}
接続展示
接続の確立に成功しました。ここではwebsocketプロトコルに基づいていると見られます。
在这里插入图片描述
接続情報
在这里插入图片描述
ping pong
在这里插入图片描述
呼び出しインターフェースは、購読者1にメッセージを送信し、http://localhost:9003/refresh/1?msg=Hello Stopは、クライアントコンソールで受信したメッセージを確認することができます。この時、ユーザーは自分のuserIdを通じて購読のテーマを区別できます。userIdを通じて正確にクライアントにメッセージを送ることができます。
在这里插入图片描述
バックエンドの設定時にラジオの購読テーマ/topicを指定したことを覚えています。この時、私達のフロントエンドはjsを通してこのテーマを購読している限り、バックエンドはこのテーマのようにメッセージを送る時に、購読したクライアントは全部受け取ります。興味のある仲間は自分で試してみてもいいです。apiは全部書きました。
在这里插入图片描述
これで実戦が終わり、好きな仲間にも注目してもらいたいです。
springboot+stompバックエンドのソースコードの住所:https://gitee.com/ErGouGeSiBaKe/stomp-server
ここで、Springboot Websocket Stopに関する記事を紹介します。Sprigbook Websocket Stopに関する記事をもっと多く紹介します。以前の記事を検索してください。または次の関連記事を見てください。これからも応援してください。