SpringBoot+Vue+WebSocket实现消息推送

论坛 期权论坛 脚本     
匿名技术用户   2021-1-7 21:07   967   0

在给导师做项目的时候,APP端需要通过后端服务推送消息,该消息与userId绑定。

(1)服务端实现

实现的话首先说下服务端,基于微服务,将该模块设计成单独模块,部署端口8100。

首先引入maven依赖(根据自己的需求,只要包含websocket即可)

<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
  <groupId>cn.edu.bjfu</groupId>
  <artifactId>fdcp</artifactId>
  <version>0.0.1-SNAPSHOT</version>
 </parent>
 <artifactId>fdcp-provider-websocket</artifactId>
 <dependencies>
  <dependency><!-- 引入自己定义的api通用包 -->
   <groupId>cn.edu.bjfu</groupId>
   <artifactId>fdcp-api</artifactId>
   <version>${project.version}</version>
  </dependency>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
  </dependency>
  <dependency>
   <groupId>ch.qos.logback</groupId>
   <artifactId>logback-core</artifactId>
  </dependency>
  <dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jetty</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
  </dependency>
  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
  </dependency>
  <!--整合redis -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <!-- 修改后立即生效,热部署 -->
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>springloaded</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-devtools</artifactId>
  </dependency>
  <!-- 将微服务provider侧注册进eureka -->
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-eureka</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-config</artifactId>
  </dependency>
  <dependency>
   <!-- SpringBoot健康监控 -->
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>

  <!-- websocket -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-configuration-processor</artifactId>
   <optional>true</optional>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
     <execution>
      <goals>
       <goal>repackage</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
  </plugins>
 </build>
</project>

实现原理如下:

(1)先配置一个ws的配置类WebSocketConfig

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

 @Override
 public void configureMessageBroker(MessageBrokerRegistry config) {
  config.enableSimpleBroker("/topic");
  config.setApplicationDestinationPrefixes("/app");
 }

 @Override
 public void registerStompEndpoints(StompEndpointRegistry registry) {
  registry.addEndpoint("/fdcp").setAllowedOrigins("*").withSockJS();;
 }

 @Override
 public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
  // TODO Auto-generated method stub
  
 }

 @Override
 public void configureClientInboundChannel(ChannelRegistration registration) {
  // TODO Auto-generated method stub
  
 }

 @Override
 public void configureClientOutboundChannel(ChannelRegistration registration) {
  // TODO Auto-generated method stub
  
 }

 @Override
 public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
  // TODO Auto-generated method stub
  
 }

 @Override
 public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
  // TODO Auto-generated method stub
  
 }

 @Override
 public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
  // TODO Auto-generated method stub
  return false;
 }
}

(2)连接ws监听类STOMPConnectEventListener,需实现监听接口:ApplicationListener,开启ws连接。客户端将userId传入进来,服务端将该userId保存到map中,key为userId,value为sessionId。该map是一个全局变量,保存在springboot启动类WebSocketApp中。

【注】此时可确定客户端连接的地址为:http://localhost:8100/fdcp

@Component
public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Override
    public void onApplicationEvent(SessionConnectEvent event) {
        StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
        String userId = sha.getNativeHeader("userId").get(0);
        String sessionId = sha.getSessionId();
        //System.out.println("STOMPConnectEventListener........"+userId+"-"+sessionId);    
        WebSocketApp.userMap.put(userId, sessionId);
    }
}

启动类:

@EnableScheduling//开启定时任务
@EnableAsync//开启异步
@SpringBootApplication
@EnableEurekaClient
public class WebSocketApp {
 public static Map<String, String> userMap = new ConcurrentHashMap<>();//userId:sessionId
 public static void main(String[] args) {
  SpringApplication.run(WebSocketApp.class, args);
 }
}

(3)开启定时任务:每15min监听一次消息,该消息的是通过客户端传入的userId获取redis中的数据,当获取完之后清空该redis的key。(2)中提到,userId保存在一个全局map中,可以获取到。获取的value是一个存储的json字符串,格式大致是这样:

{

"userId": "a56sd1a6s51dxzcs5",

"islook": 0,

"message": "推送消息内容",

"createtime": "2020-05-15 19:57:29",

"messageId": "de91d3f80d0841219caeb9298f3fc09a",

"title": "消息标题"

}

定时任务类AppPushTask:

@Component
public class AppPushTask {
 @Autowired
 private SimpMessagingTemplate messagingTemplate;
 @Autowired
 private StringRedisTemplate stringRedisTemplate;
 private static final String appPushPrefix = "APP_PUSH_MESSAGE";//消息推送app中redis的前缀

 @Scheduled(fixedRate = 1000*60*15)//15分钟执行一次
 public void callback() throws Exception {
  
  Map<String, String> userMap = WebSocketApp.userMap;
  
  for(Entry<String, String> entry : userMap.entrySet()) {
   String userId = entry.getKey();
   String key = appPushPrefix+":"+userId;
   String message = stringRedisTemplate.opsForValue().get(key);
   if(message != null) {
    messagingTemplate.convertAndSend("/topic/callback", message);
    stringRedisTemplate.delete(key);
   }else {
    //不执行任何操作
   }
  }
  
  
 }

}

服务端的大致代码结构如下:

(2)客户端

首先需要安装sockJS和StompJS

cnpm install sockjs-client --save
cnpm install stompjs --save
cnpm install net --save

直接贴代码:

<template>
  <div>
    <button @click="connect">连接ws</button><br/>
    消息标题:<span v-text="title"></span><br/>
    消息内容:<span v-html="message"></span>
  </div>
</template>
<script>
import SockJS from "sockjs-client";
import Stomp from "stompjs";
export default {
  data() {
    return {
      messageId: "", //推送消息的id
      userId: "1", //当前该设备用户id(消息推送接收者)
      title: "", //消息标题
      message: "", //消息
      stompClient: null//stomp
    };
  },
   methods:{
      connect(){         
         let socket=new SockJS('http://localhost:8100/fdcp')
         this.stompClient = Stomp.over(socket)
         this.stompClient.connect({"userId": this.userId }, this.onConnected)   
      },
      onConnected(frame) {
           this.stompClient.subscribe('/topic/callback', this.callback)
      },
      callback(msg){
            let body= JSON.parse(msg.body) 
            //console.log(body)
            this.message = body.message;
            this.title = body.title;   
      },
  }
};
</script>

最终实现的效果如下,点击“连接ws”之后会与服务端建立连接,之后服务端向客户端推送数据。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP