spring boot下WebSocket消息推送

WebSocket 协议

WebSocket是一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API 也被 W3C 定为标准。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输

STOMP 协议

STOMP 是面向文本的消息传送协议。STOMP 客户端与支持 STOMP 协议的消息代理进行通信。STOMP 使用不同的命令,如连接,发送,订阅,断开等进行通信。

具体参考:官方介绍

SockJS

SockJS 是一个 JavaScript 库,提供跨浏览器 JavaScript 的 API,创建了一个低延迟、全双工的浏览器和 web 服务器之间通信通道


以上内容出自维基百科和百度百科

 

使用 websocket 有两种方式:1 是使用 sockjs,2 是使用 h5 的标准。使用 Html5 标准自然更方便简单,所以记录的是配合 h5 的使用方法。

1、pom 引入

 核心是 @ServerEndpoint 这个注解。这个注解是 Javaee 标准里的注解,tomcat7 以上已经对其进行了实现,如果是用传统方法使用 tomcat 发布项目,只要在 pom 文件中引入 javaee 标准即可使用。

 

1     <dependency>
2       <groupId>javax</groupId>
3       <artifactId>javaee-api</artifactId>
4       <version>7.0</version>
5       <scope>provided</scope>
6     </dependency>

但使用 springboot 的内置 tomcat 时,就不需要引入 javaee-api 了,spring-boot 已经包含了。使用 springboot 的 websocket 功能首先引入 springboot 组件。

1         <dependency>
2             <groupId>org.springframework.boot</groupId>
3             <artifactId>spring-boot-starter-websocket</artifactId>
4         </dependency>

2、使用 @ServerEndpoint 创立 websocket endpoint

  首先要注入 ServerEndpointExporter,这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 Websocket endpoint。要注意,如果使用独立的 servlet 容器,而不是直接使用 springboot 的内置容器,就不要注入 ServerEndpointExporter,因为它将由容器自己提供和管理。

1     @Configuration  
2     public class WebSocketConfig {  
3         @Bean  
4         public ServerEndpointExporter serverEndpointExporter() {  
5             return new ServerEndpointExporter();  
6         }  
7       
8     }  

下面事 websocket 的具体实现方法,代码如下:

  1 import org.springframework.stereotype.Component;
  2 
  3 import javax.websocket.*;
  4 import javax.websocket.server.ServerEndpoint;
  5 import java.io.IOException;
  6 import java.util.concurrent.CopyOnWriteArraySet;
  7 
  8 @ServerEndpoint(value = "/websocket")
  9 @Component    //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理
 10 public class WebSocket {
 11   //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
 12   private static int onlineCount = 0;
 13 
 14   //concurrent 包的线程安全 Set,用来存放每个客户端对应的 MyWebSocket 对象。
 15   private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
 16 
 17   //与某个客户端的连接会话,需要通过它来给客户端发送数据
 18   private Session session;
 19 
 20   /**
 21    * 连接建立成功调用的方法
 22    */
 23   @OnOpen
 24   public void onOpen(Session session) {
 25     this.session = session;
 26     webSocketSet.add(this);     //加入 set 中
 27     addOnlineCount();           //在线数加 1
 28     System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
 29     try {
 30       sendMessage("Hello world");
 31     } catch (IOException e) {
 32       System.out.println("IO 异常");
 33     }
 34   }
 35 
 36   /**
 37    * 连接关闭调用的方法
 38    */
 39   @OnClose
 40   public void onClose() {
 41     webSocketSet.remove(this);  //从 set 中删除
 42     subOnlineCount();           //在线数减 1
 43     System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
 44   }
 45 
 46   /**
 47    * 收到客户端消息后调用的方法
 48    *
 49    * @param message 客户端发送过来的消息
 50    */
 51   @OnMessage
 52   public void onMessage(String message, Session session) {
 53     System.out.println("来自客户端的消息:" + message);
 54 
 55     //群发消息
 56     for (WebSocket item : webSocketSet) {
 57       try {
 58         item.sendMessage(message);
 59       } catch (IOException e) {
 60         e.printStackTrace();
 61       }
 62     }
 63   }
 64 
 65   /**
 66    * 发生错误时调用
 67    */
 68   @OnError
 69   public void onError(Session session, Throwable error) {
 70     System.out.println("发生错误");
 71     error.printStackTrace();
 72   }
 73 
 74 
 75   public void sendMessage(String message) throws IOException {
 76     this.session.getBasicRemote().sendText(message);
 77     //this.session.getAsyncRemote().sendText(message);
 78   }
 79 
 80 
 81   /**
 82    * 群发自定义消息
 83    */
 84   public static void sendInfo(String message) throws IOException {
 85     for (WebSocket item : webSocketSet) {
 86       try {
 87         item.sendMessage(message);
 88       } catch (IOException e) {
 89         continue;
 90       }
 91     }
 92   }
 93 
 94   public static synchronized int getOnlineCount() {
 95     return onlineCount;
 96   }
 97 
 98   public static synchronized void addOnlineCount() {
 99     WebSocket.onlineCount++;
100   }
101 
102   public static synchronized void subOnlineCount() {
103     WebSocket.onlineCount--;
104   }
105 }

 

使用 springboot 的唯一区别是要 @Component 声明下,而使用独立容器是由容器自己管理 websocket 的,但在 springboot 中连容器都是 spring 管理的。

虽然 @Component 默认是单例模式的,但 springboot 还是会为每个 websocket 连接初始化一个 bean,所以可以用一个静态 set 保存起来。

3、前端代码

 

 1 <!DOCTYPE HTML>
 2 <html>
 3 <head>
 4     <title>My WebSocket</title>
 5 </head>
 6 
 7 <body>
 8 Welcome<br/>
 9 <input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
10 <div id="message">
11 </div>
12 </body>
13 
14 <script type="text/javascript">
15     var websocket = null;
16 
17     //判断当前浏览器是否支持 WebSocket
18     if('WebSocket' in window){
19         websocket = new WebSocket("ws://localhost:8084/websocket");
20     }
21     else{
22         alert('Not support websocket')
23     }
24 
25     //连接发生错误的回调方法
26     websocket.onerror = function(){
27         setMessageInnerHTML("error");
28     };
29 
30     //连接成功建立的回调方法
31     websocket.onopen = function(event){
32         setMessageInnerHTML("open");
33     }
34 
35     //接收到消息的回调方法
36     websocket.onmessage = function(event){
37         setMessageInnerHTML(event.data);
38     }
39 
40     //连接关闭的回调方法
41     websocket.onclose = function(){
42         setMessageInnerHTML("close");
43     }
44 
45     //监听窗口关闭事件,当窗口关闭时,主动去关闭 websocket 连接,防止连接还没断开就关闭窗口,server 端会抛异常。
46     window.onbeforeunload = function(){
47         websocket.close();
48     }
49 
50     //将消息显示在网页上
51     function setMessageInnerHTML(innerHTML){
52         document.getElementById('message').innerHTML += innerHTML + '<br/>';
53     }
54 
55     //关闭连接
56     function closeWebSocket(){
57         websocket.close();
58     }
59 
60     //发送消息
61     function send(){
62         var message = document.getElementById('text').value;
63         websocket.send(message);
64     }
65 </script>
66 </html>

以上代码,实现了 websocket 简单消息推送,可以实现两个页面间的消息显示,但是 Java 后台主动推送消息时,无法获取消息推送的 websocket 下的 session,即无法实现 websocket 下 session 的共享。

为解决主动推送的难题,需要在建立连接时,将 websocket 下的 session 与 servlet 下的 HttpSession(或者其他 session,我们这用到了 shiro 下的 session)建立关联关系。

 webSocket 配置 Java 类:

 1 import com.bootdo.common.utils.ShiroUtils;
 2 import org.apache.catalina.session.StandardSessionFacade;
 3 import org.apache.shiro.session.Session;
 4 import org.springframework.context.annotation.Bean;
 5 import org.springframework.context.annotation.Configuration;
 6 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 7 
 8 import javax.servlet.http.HttpSession;
 9 import javax.websocket.HandshakeResponse;
10 import javax.websocket.server.HandshakeRequest;
11 import javax.websocket.server.ServerEndpointConfig;
12 import javax.websocket.server.ServerEndpointConfig.Configurator;
13 
14 @Configuration
15 public class WebSocketConfig extends Configurator {
16 
17   @Override
18   public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
19     /*如果没有监听器, 那么这里获取到的 HttpSession 是 null*/
20     StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
21     if (ssf != null) {
22       HttpSession httpSession = (HttpSession) request.getHttpSession();
23       //关键操作
24       sec.getUserProperties().put("sessionId", httpSession.getId());
25       System.out.println("获取到的 SessionID:" + httpSession.getId());
26     }
27   }
28 
29   /**
30    * 引入 shiro 框架下的 session,获取 session 信息
31    */
32   /*
33   @Override
34   public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
35     Session shiroSession = ShiroUtils.getSubjct().getSession();
36     sec.getUserProperties().put("sessionId", shiroSession.getId());
37   }
38   */
39 
40   @Bean
41   public ServerEndpointExporter serverEndpointExporter() {
42     //这个对象说一下,貌似只有服务器是 tomcat 的时候才需要配置, 具体我没有研究
43     return new ServerEndpointExporter();
44   }
45 }

webSocket 消息实现类方法:

  1 import org.springframework.stereotype.Component;
  2 
  3 import javax.websocket.*;
  4 import javax.websocket.server.ServerEndpoint;
  5 import java.io.IOException;
  6 import java.util.concurrent.CopyOnWriteArraySet;
  7 
  8 //configurator = WebsocketConfig.class 该属性就是我上面配置的信息
  9 @ServerEndpoint(value = "/websocket", configurator = WebSocketConfig.class)
 10 @Component    //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理
 11 public class WebSocket {
 12   //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
 13   private static int onlineCount = 0;
 14 
 15   //concurrent 包的线程安全 Set,用来存放每个客户端对应的 MyWebSocket 对象。
 16   private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
 17 
 18   //与某个客户端的连接会话,需要通过它来给客户端发送数据
 19   private Session session;
 20 
 21   /**
 22    * 连接建立成功调用的方法
 23    * <p>
 24    * config 用来获取 WebsocketConfig 中的配置信息
 25    */
 26   @OnOpen
 27   public void onOpen(Session session, EndpointConfig config) {
 28 
 29     //获取 WebsocketConfig.java 中配置的“sessionId”信息值
 30     String httpSessionId = (String) config.getUserProperties().get("sessionId");
 31 
 32     this.session = session;
 33     webSocketSet.add(this);     //加入 set 中
 34     addOnlineCount();           //在线数加 1
 35     System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
 36     try {
 37       sendMessage("Hello world");
 38     } catch (IOException e) {
 39       System.out.println("IO 异常");
 40     }
 41   }
 42 
 43   /**
 44    * 连接关闭调用的方法
 45    */
 46   @OnClose
 47   public void onClose() {
 48     webSocketSet.remove(this);  //从 set 中删除
 49     subOnlineCount();           //在线数减 1
 50     System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
 51   }
 52 
 53   /**
 54    * 收到客户端消息后调用的方法
 55    *
 56    * @param message 客户端发送过来的消息
 57    */
 58   @OnMessage
 59   public void onMessage(String message, Session session) {
 60     System.out.println("来自客户端的消息:" + message);
 61 
 62     //群发消息
 63     for (WebSocket item : webSocketSet) {
 64       try {
 65         item.sendMessage(message);
 66       } catch (IOException e) {
 67         e.printStackTrace();
 68       }
 69     }
 70   }
 71 
 72   /**
 73    * 发生错误时调用
 74    */
 75   @OnError
 76   public void onError(Session session, Throwable error) {
 77     System.out.println("发生错误");
 78     error.printStackTrace();
 79   }
 80 
 81 
 82   public void sendMessage(String message) throws IOException {
 83     this.session.getBasicRemote().sendText(message);
 84     //this.session.getAsyncRemote().sendText(message);
 85   }
 86 
 87 
 88   /**
 89    * 群发自定义消息
 90    */
 91   public static void sendInfo(String message) throws IOException {
 92     for (WebSocket item : webSocketSet) {
 93       try {
 94         item.sendMessage(message);
 95       } catch (IOException e) {
 96         continue;
 97       }
 98     }
 99   }
100 
101   public static synchronized int getOnlineCount() {
102     return onlineCount;
103   }
104 
105   public static synchronized void addOnlineCount() {
106     WebSocket.onlineCount++;
107   }
108 
109   public static synchronized void subOnlineCount() {
110     WebSocket.onlineCount--;
111   }
112 }

注意,有上面配置后,如果配置获取的信息为 null,需加入监听实现类:

 1 import org.springframework.stereotype.Component;
 2 
 3 import javax.servlet.ServletRequestEvent;
 4 import javax.servlet.ServletRequestListener;
 5 import javax.servlet.http.HttpServletRequest;
 6 import javax.servlet.http.HttpSession;
 7 
 8 /**
 9  * 监听器类: 主要任务是用 ServletRequest 将我们的 HttpSession 携带过去
10  */
11 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理, 相当于注册监听吧
12 public class RequestListener implements ServletRequestListener {
13   @Override
14   public void requestInitialized(ServletRequestEvent sre) {
15     //将所有 request 请求都携带上 httpSession
16     HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession();
17     System.out.println("将所有 request 请求都携带上 httpSession" + httpSession.getId());
18   }
19 
20   public RequestListener() {
21   }
22 
23   @Override
24   public void requestDestroyed(ServletRequestEvent arg0) {
25   }
26 }

对应的前端页面无需改变。

 

以上信息类之于网络上多篇文章整理的信息,参考文章:spring boot Websocket使用 spring boot +WebSocket 实现(后台主动)消息推送  ; Springboot-WebSocket 初探 - 获取 HttpSession 问题