WebScoket是什么?
WebSocket是一种在单个TCP连接上进行全双工通信的协议。(来自百度百科) 也就是说,WebSocket使得浏览器和服务器可以主动发送给对方。
同样是基于TCP连接,与以往的HTTP的协议不同,HTTP采用请求/响应模式,三次握手建立一次连接,只能由客户端发给服务端。而WebSocket只通过一次握手,建立一个持久的连接,客户端和服务端都可以主动发消息,更实时的通讯。
一次握手建立WebSocket连接
浏览器先向服务器发送个url以ws://开头的http的GET请求,服务器根据请求头中Upgrade:websocket把客户端的请求切换到对应的协议,即websocket协议。

同时,响应头中也包含了内容Upgrade:websocket,表示升级成WebSocket协议。

响应101,握手成功,http协议切换成websocket协议了,连接建立成功,浏览器和服务器可以随时主动发送消息给对方了,并且这个连接一直持续到客户端或服务器一方主动关闭连接。
具体的WebSocket到底跟Socket、HTTP的关系与区别,请移步我的另一篇博客《常见网络协议知识集》:
https://blog.csdn.net/qq_38345296/article/details/104043710
WebSocket客户端实现 (SockJS+Stomp)
由于WebSocket是一个相对比较新的规范,在Web浏览器和应用服务器上没有得到一致的支持。所以我们需要一种WebSocket的备选方案。而这恰恰是SockJS所擅长的。
SockJS是WebSocket技术的一种模拟,在表面上,它尽可能对应WebSocket API,但是在底层非常智能。如果WebSocket技术不可用的话,就会选择另外的通信方式。
为什么使用Stomp?
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。如果只使用WebSocket的API,发送的内容可以是文本或者是二进制,但没有规范的定义,也不可以添加请求头,做连接认证。所以使用Stomp。
STOMP在WebSocket之上提供了一个基于帧的线路格式层,用来定义消息的语义。STOMP帧由命令、一个或多个头信息以及负载所组成。例如如下就是发送数据的一个STOMP帧:
>>> SEND destination:/app/marco content-length:20
{"message":"Maeco!"}
在这个简单的样例中,STOMP命令是SEND,表明会发送一些内容。紧接着是两个头信息:一个用来表示消息要发送到哪里的目的地,另外一个则包含了负载的大小。然后,紧接着是一个空行,STOMP帧的最后是负载内容。 STOMP帧中最有意思的是destination头信息了。它表明STOMP是一个消息协议。消息会发布到某个目的地,这个目的地实际上可能真的有消息代理作为支撑。另一方面,消息处理器也可以监听这些目的地,接收所发送过来的消息。
Vue中类库的引入:
import SockJS from 'sockjs-client';
import WebStomp from 'webstomp-client';
客户端Js代码:
// 页面加载完成,初始化WebSocket
mounted(){
this.initWebSocket();
},
methods: {
// 初始化WebSocket, 定时器检查是否需要重连
initWebSocket() {
const vm = this;
vm.connectWebSocket();
// websocket断开重连, 每5s检查一次
vm.webSocketTimer = setInterval(() => {
if (!this.stompClient.connected) {
console.log("websocket重连中 ...");
vm.connectWebSocket();
}
}, 5000);
},
// 建立WebSocket连接
connectWebSocket() {
const vm = this;
const socket = new SockJS("http://127.0.0.1:9000/ws");
const webStompOptions = {
debug: false
}
vm.stompClient = WebStomp.over(socket, webStompOptions);
let headers = { Authorization: this.form.token }
vm.stompClient.connect(headers, () => {
console.log('stompClient 已连接');
// 第一个订阅,广播级别
vm.stompClient.subscribe(
"/topic/flow-view/" + vm.jobStreamInPlanId,
//收到消息的回调
function (msg) {
console.log('/topic/flow-view/' + vm.jobStreamInPlanId + '发出通知:');
const body = JSON.parse(msg.body);
console.log('body', body.event);
}, headers);
// 第二个订阅,用户级别
vm.stompClient.subscribe(
"/user/queue/flow-view/" + vm.jobStreamInPlanId,
function (msg) {
console.log('/user/flow-view' + '发出通知:');
const body = JSON.parse(msg.body);
console.log('body', body.event);
}, headers);
// 发送send指令给服务端
vm.stompClient.send("/app/flow-view/init-data/" + vm.jobStreamInPlanId, {}, {});
}, err => {
console.log('stompClient 连接失败: ', err);
socket.close();
});
},
disconnectWebSocket() {
console.log('disconnect...');
this.stompClient.disconnect();
},
},
// 页面销毁时候,关闭连接,并取消重连
destroyed: function () {
clearInterval(this.webSocketTimer);
this.disconnectWebSocket();
},
上述代码中,两个Subscribe,一个Send。
第一个Subscribe:"/topic/flow-view/" + vm.jobStreamInPlanId 订阅一个topic,接受广播消息
第二个Subscribe:"/user/queue/flow-view/" + vm.jobStreamInPlanId 订阅一个用户频道,接收个人消息
Send:/app/flow-view/init-data/" + vm.jobStreamInPlanId 这个指令映射到后端的Controller里,后端处理请求,发送消息到某频道
注意:Send指令可以跟后端交互,但是destination必须附加前缀app,这个要在服务端代码配置。
WebSocket服务端实现(WebStomp)
首先需要配置WebStompConfig,实现WebSocketMessageBrokerConfigurer接口,重写方法如下:
/**
* WebStompConfig
*
* @Author fengjiale
* @Date 2020-01-10 13:21
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebStompConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
@Qualifier("userSecurityUtil")
UserSecurityUtil userSecurityUtil;
// 设置WebSocket请求的路径后缀,withSockJS(),代表支持sockjs
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客户端与服务端交互的前缀
registry.setApplicationDestinationPrefixes("/app");
//客户端订阅消息的前缀
registry.enableSimpleBroker("/topic", "/queue");
//用户级别的消息订阅的前缀
registry.setUserDestinationPrefix("/user");
}
// 定义拦截器校验请求Header,此处使用JWT的令牌校验
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String token = accessor.getFirstNativeHeader("Authorization");
// 验证token是否有效
if (!userSecurityUtil.verifyWebToken(token)) {
throw new MessageDeliveryException("token验证失败");
}
DecodedJWT jwtToken = WebTokenUtil.decode(token);
Principal principal = new Principal() {
@Override
public String getName() {
return jwtToken.getSubject();
}
};
System.err.println("Stomp Client Inbound = " + message.getHeaders());
accessor.setUser(principal);
}
return message;
}
});
}
}
服务端发送广播消息给客户端:
/**
* WebsocketUtil
*
* @Author fengjiale
* @Date 2020-01-16 15:47
*/
public class WebsocketUtil {
private static SimpMessagingTemplate messagingTemplate;
public static SimpMessagingTemplate getMessagingTemplate() {
if (messagingTemplate == null) {
messagingTemplate = SpringContextUtil.getBean(SimpMessagingTemplate.class);
}
return messagingTemplate;
}
/**
* 广播发送消息
* @param jobstreamInPlanId 指定发给哪个作业流的订阅者们
* @param nodeList 要发送的对象
*/
public static void flowViewUpdateNotify(String jobstreamInPlanId, List<Graph.Node> nodeList){
getMessagingTemplate().convertAndSend("/topic/flow-view/"+jobstreamInPlanId, nodeList);
}
}
服务端接受Send指令并发送用户消息:
/**
* WebSocketController
* @Author fengjiale
* @Date 2019-01-18 15:47
*/
@Controller
public class WebSocketController {
private static SimpMessagingTemplate messagingTemplate;
@Autowired
JobInPlanRelationService jobInPlanRelationService;
private static SimpMessagingTemplate getMessagingTemplate() {
if (messagingTemplate == null) {
messagingTemplate = SpringContextUtil.getBean(SimpMessagingTemplate.class);
}
return messagingTemplate;
}
/**
* 用户级别的订阅,初始化全量数据
*
* @param principal
* @param jobStreamInPlanId
*/
@MessageMapping("/flow-view/init-data/{jobStreamInPlanId}")
public void flowViewInitData(Principal principal, @DestinationVariable("jobStreamInPlanId") String jobStreamInPlanId) {
Graph graph = jobInPlanRelationService.getGraphByJobstreamInPlanId(jobStreamInPlanId);
getMessagingTemplate().convertAndSendToUser(principal.getName(), "/queue/flow-view/" + jobStreamInPlanId, graph);
}
//用户级别的订阅的注解写法
// @MessageMapping("/flow-view/init-data/{jobStreamInPlanId}")
// @SendToUser("/queue/flow-view/20200118173610149")
// public Graph test6(Principal principal, @DestinationVariable("jobStreamInPlanId") String jobStreamInPlanId) throws Exception {
// Graph graph = jobInPlanRelationService.getGraphByJobstreamInPlanId(jobStreamInPlanId);
// return graph;
// }
//订阅flow-view这个频道时候,直接返回。(客户端订阅的路径带了/app所以弃用)
// @SubscribeMapping("/topic/flow-view/{jobStreamInPlanId}")
// public NotifyMessage test51(@DestinationVariable("jobStreamInPlanId") String jobStreamInPlanId) throws Exception {
// Graph graph = jobInPlanRelationService.getGraphByJobstreamInPlanId(jobStreamInPlanId);
// return WebsocketUtil.newNotifyMessage("INIT_DATA",graph);
// }
}
@MessageMapping:接收Send指令"/app/flow-view/sync-data/" + vm.jobStreamInPlanId 的消息,并做处理。
最后调用SimpMessagingTemplate的convertAndSendToUser发送给个人频道的订阅。
第一个参数principal.getName()就是拦截器里,下面那几行代码放进去的用户身份,Stomp将其与连接的session绑定到了一起。
DecodedJWT jwtToken = WebTokenUtil.decode(token);
Principal principal = new Principal() {
@Override
public String getName() {
return jwtToken.getSubject();
}
};
accessor.setUser(principal);
注意个人频道路由的关系:
前端订阅的个人频道为:"/user/queue/flow-view/" + vm.jobStreamInPlanId
对应后端发送个人频道:无需加/user, sendToUser会自动添加
注解@SendToUser(""/queue/flow-view/..."") 由于路径有变量,所以使用代码方式如下:
convertAndSendToUser(principal.getName(),"/queue/flow-view"+ jobStreamInPlanId, data);
以上为项目中使用学习的情况,做一个学习以及笔记。若有错误或疑惑,还请提问或指正。
|