WebSocket实战详解【Springboot+Vue+sockjs+webstomp】

论坛 期权论坛 脚本     
匿名技术用户   2021-1-7 21:07   404   0

WebScoket是什么?

WebSocket是一种在单个TCP连接上进行全双工通信的协议。(来自百度百科) 也就是说,WebSocket使得浏览器和服务器可以主动发送给对方。

同样是基于TCP连接,与以往的HTTP的协议不同,HTTP采用请求/响应模式,三次握手建立一次连接,只能由客户端发给服务端。而WebSocket只通过一次握手,建立一个持久的连接,客户端和服务端都可以主动发消息,更实时的通讯。

一次握手建立WebSocket连接

浏览器先向服务器发送个url以ws://开头的http的GET请求,服务器根据请求头中Upgrade:websocket客户端的请求切换到对应的协议,即websocket协议。

同时,响应头中也包含了内容Upgrade:websocket,表示升级成WebSocket协议。

响应101,握手成功,http协议切换成websocket协议了,连接建立成功,浏览器和服务器可以随时主动发送消息给对方了,并且这个连接一直持续到客户端或服务器一方主动关闭连接。

具体的WebSocket到底跟Socket、HTTP的关系与区别,请移步我的另一篇博客《常见网络协议知识集》:

https://blog.csdn.net/qq_38345296/article/details/104043710

WebSocket客户端实现 (SockJS+Stomp)

由于WebSocket是一个相对比较新的规范,在Web浏览器和应用服务器上没有得到一致的支持。所以我们需要一种WebSocket的备选方案。而这恰恰是SockJS所擅长的。

SockJS是WebSocket技术的一种模拟,在表面上,它尽可能对应WebSocket API,但是在底层非常智能。如果WebSocket技术不可用的话,就会选择另外的通信方式。

为什么使用Stomp?

STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。如果只使用WebSocket的API,发送的内容可以是文本或者是二进制,但没有规范的定义,也不可以添加请求头,做连接认证。所以使用Stomp。

STOMP在WebSocket之上提供了一个基于帧的线路格式层,用来定义消息的语义。STOMP帧由命令、一个或多个头信息以及负载所组成。例如如下就是发送数据的一个STOMP帧:

>>> SEND
destination:/app/marco
content-length:20

{"message":"Maeco!"}

在这个简单的样例中,STOMP命令是SEND,表明会发送一些内容。紧接着是两个头信息:一个用来表示消息要发送到哪里的目的地,另外一个则包含了负载的大小。然后,紧接着是一个空行,STOMP帧的最后是负载内容。
STOMP帧中最有意思的是destination头信息了。它表明STOMP是一个消息协议。消息会发布到某个目的地,这个目的地实际上可能真的有消息代理作为支撑。另一方面,消息处理器也可以监听这些目的地,接收所发送过来的消息。

Vue中类库的引入:

import SockJS from 'sockjs-client';

import WebStomp from 'webstomp-client';

客户端Js代码:

// 页面加载完成,初始化WebSocket
mounted(){
    this.initWebSocket();
  },
methods: {
    // 初始化WebSocket, 定时器检查是否需要重连
    initWebSocket() {
      const vm = this;
      vm.connectWebSocket();
      // websocket断开重连, 每5s检查一次
      vm.webSocketTimer = setInterval(() => {
        if (!this.stompClient.connected) {
          console.log("websocket重连中 ...");
          vm.connectWebSocket();
        }
      }, 5000);
    },
    // 建立WebSocket连接
    connectWebSocket() {
      const vm = this;
      const socket = new SockJS("http://127.0.0.1:9000/ws");
      const webStompOptions = {
        debug: false
      }
      vm.stompClient = WebStomp.over(socket, webStompOptions);
      let headers = { Authorization: this.form.token }
      vm.stompClient.connect(headers, () => {
          console.log('stompClient 已连接');
          // 第一个订阅,广播级别
          vm.stompClient.subscribe(
          "/topic/flow-view/" + vm.jobStreamInPlanId,
          //收到消息的回调
          function (msg) {
            console.log('/topic/flow-view/' + vm.jobStreamInPlanId + '发出通知:');
            const body = JSON.parse(msg.body);
            console.log('body', body.event);
          }, headers);

          // 第二个订阅,用户级别
          vm.stompClient.subscribe(
          "/user/queue/flow-view/" + vm.jobStreamInPlanId,
          function (msg) {
            console.log('/user/flow-view' + '发出通知:');
            const body = JSON.parse(msg.body);
            console.log('body', body.event);
          }, headers);

          // 发送send指令给服务端
          vm.stompClient.send("/app/flow-view/init-data/" + vm.jobStreamInPlanId, {}, {});
      }, err => {
          console.log('stompClient 连接失败: ', err);
          socket.close();
      });
    },
    disconnectWebSocket() {
      console.log('disconnect...');
      this.stompClient.disconnect();
    },
},
// 页面销毁时候,关闭连接,并取消重连
destroyed: function () {
    clearInterval(this.webSocketTimer);
    this.disconnectWebSocket();
  },

上述代码中,两个Subscribe,一个Send。

第一个Subscribe:"/topic/flow-view/" + vm.jobStreamInPlanId 订阅一个topic,接受广播消息

第二个Subscribe:"/user/queue/flow-view/" + vm.jobStreamInPlanId 订阅一个用户频道,接收个人消息

Send:/app/flow-view/init-data/" + vm.jobStreamInPlanId 这个指令映射到后端的Controller里,后端处理请求,发送消息到某频道

注意:Send指令可以跟后端交互,但是destination必须附加前缀app,这个要在服务端代码配置。

WebSocket服务端实现(WebStomp)

首先需要配置WebStompConfig,实现WebSocketMessageBrokerConfigurer接口,重写方法如下:


/**
 * WebStompConfig
 *
 * @Author fengjiale
 * @Date 2020-01-10 13:21
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebStompConfig implements WebSocketMessageBrokerConfigurer {
    @Autowired
    @Qualifier("userSecurityUtil")
    UserSecurityUtil userSecurityUtil;

    // 设置WebSocket请求的路径后缀,withSockJS(),代表支持sockjs
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //客户端与服务端交互的前缀
        registry.setApplicationDestinationPrefixes("/app");
        //客户端订阅消息的前缀
        registry.enableSimpleBroker("/topic", "/queue");
        //用户级别的消息订阅的前缀
        registry.setUserDestinationPrefix("/user");
    }

    // 定义拦截器校验请求Header,此处使用JWT的令牌校验
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    String token = accessor.getFirstNativeHeader("Authorization");
                    // 验证token是否有效
                    if (!userSecurityUtil.verifyWebToken(token)) {
                        throw new MessageDeliveryException("token验证失败");
                    }
                    DecodedJWT jwtToken = WebTokenUtil.decode(token);
                    Principal principal = new Principal() {
                        @Override
                        public String getName() {
                            return jwtToken.getSubject();
                        }
                    };
                    System.err.println("Stomp Client Inbound = " + message.getHeaders());
                    accessor.setUser(principal);
                }
                return message;
            }
        });
    }

}

服务端发送广播消息给客户端:

/**
 * WebsocketUtil
 *
 * @Author fengjiale
 * @Date 2020-01-16 15:47
 */
public class WebsocketUtil {
    private static SimpMessagingTemplate messagingTemplate;
    public static SimpMessagingTemplate getMessagingTemplate() {
        if (messagingTemplate == null) {
            messagingTemplate = SpringContextUtil.getBean(SimpMessagingTemplate.class);
        }
        return messagingTemplate;
    }


    /**
     * 广播发送消息
     * @param jobstreamInPlanId 指定发给哪个作业流的订阅者们
     * @param nodeList 要发送的对象
     */
    public static void flowViewUpdateNotify(String jobstreamInPlanId, List<Graph.Node> nodeList){
        getMessagingTemplate().convertAndSend("/topic/flow-view/"+jobstreamInPlanId, nodeList);
    }


}

服务端接受Send指令并发送用户消息

/**
 * WebSocketController
 * @Author fengjiale
 * @Date 2019-01-18 15:47
 */
@Controller
public class WebSocketController {
    private static SimpMessagingTemplate messagingTemplate;
    @Autowired
    JobInPlanRelationService jobInPlanRelationService;

    private static SimpMessagingTemplate getMessagingTemplate() {
        if (messagingTemplate == null) {
            messagingTemplate = SpringContextUtil.getBean(SimpMessagingTemplate.class);
        }
        return messagingTemplate;
    }

    /**
     * 用户级别的订阅,初始化全量数据
     *
     * @param principal
     * @param jobStreamInPlanId
     */
    @MessageMapping("/flow-view/init-data/{jobStreamInPlanId}")
    public void flowViewInitData(Principal principal, @DestinationVariable("jobStreamInPlanId") String jobStreamInPlanId) {
        Graph graph = jobInPlanRelationService.getGraphByJobstreamInPlanId(jobStreamInPlanId);
        getMessagingTemplate().convertAndSendToUser(principal.getName(), "/queue/flow-view/" + jobStreamInPlanId, graph);
    }

    //用户级别的订阅的注解写法
//    @MessageMapping("/flow-view/init-data/{jobStreamInPlanId}")
//    @SendToUser("/queue/flow-view/20200118173610149")
//    public Graph test6(Principal principal, @DestinationVariable("jobStreamInPlanId") String jobStreamInPlanId) throws Exception {
//        Graph graph = jobInPlanRelationService.getGraphByJobstreamInPlanId(jobStreamInPlanId);
//        return graph;
//    }

    //订阅flow-view这个频道时候,直接返回。(客户端订阅的路径带了/app所以弃用)
//    @SubscribeMapping("/topic/flow-view/{jobStreamInPlanId}")
//    public NotifyMessage test51(@DestinationVariable("jobStreamInPlanId") String jobStreamInPlanId) throws Exception {
//        Graph graph = jobInPlanRelationService.getGraphByJobstreamInPlanId(jobStreamInPlanId);
//        return WebsocketUtil.newNotifyMessage("INIT_DATA",graph);
//    }


}

@MessageMapping:接收Send指令"/app/flow-view/sync-data/" + vm.jobStreamInPlanId 的消息,并做处理。

最后调用SimpMessagingTemplate的convertAndSendToUser发送给个人频道的订阅。

第一个参数principal.getName()就是拦截器里,下面那几行代码放进去的用户身份,Stomp将其与连接的session绑定到了一起。

DecodedJWT jwtToken = WebTokenUtil.decode(token);
Principal principal = new Principal() {
    @Override
    public String getName() {
        return jwtToken.getSubject();
    }
};
accessor.setUser(principal);

注意个人频道路由的关系:

前端订阅的个人频道为:"/user/queue/flow-view/" + vm.jobStreamInPlanId

对应后端发送个人频道:无需加/user, sendToUser会自动添加

注解@SendToUser(""/queue/flow-view/..."") 由于路径有变量,所以使用代码方式如下:

convertAndSendToUser(principal.getName(),"/queue/flow-view"+ jobStreamInPlanId, data);

以上为项目中使用学习的情况,做一个学习以及笔记。若有错误或疑惑,还请提问或指正。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP