spring boot下WebSocket消息推送
WebSocket 协议
WebSocket是一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API 也被 W3C 定为标准。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输
STOMP 协议
STOMP 是面向文本的消息传送协议。STOMP 客户端与支持 STOMP 协议的消息代理进行通信。STOMP 使用不同的命令,如连接,发送,订阅,断开等进行通信。
具体参考:官方介绍
SockJS
SockJS 是一个 JavaScript 库,提供跨浏览器 JavaScript 的 API,创建了一个低延迟、全双工的浏览器和 web 服务器之间通信通道
以上内容出自维基百科和百度百科
使用 websocket 有两种方式:1 是使用 sockjs,2 是使用 h5 的标准。使用 Html5 标准自然更方便简单,所以记录的是配合 h5 的使用方法。
1、pom 引入
核心是 @ServerEndpoint 这个注解。这个注解是 Javaee 标准里的注解,tomcat7 以上已经对其进行了实现,如果是用传统方法使用 tomcat 发布项目,只要在 pom 文件中引入 javaee 标准即可使用。
1 <dependency> 2 <groupId>javax</groupId> 3 <artifactId>javaee-api</artifactId> 4 <version>7.0</version> 5 <scope>provided</scope> 6 </dependency>
但使用 springboot 的内置 tomcat 时,就不需要引入 javaee-api 了,spring-boot 已经包含了。使用 springboot 的 websocket 功能首先引入 springboot 组件。
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-websocket</artifactId> 4 </dependency>
2、使用 @ServerEndpoint 创立 websocket endpoint
首先要注入 ServerEndpointExporter,这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 Websocket endpoint。要注意,如果使用独立的 servlet 容器,而不是直接使用 springboot 的内置容器,就不要注入 ServerEndpointExporter,因为它将由容器自己提供和管理。
1 @Configuration 2 public class WebSocketConfig { 3 @Bean 4 public ServerEndpointExporter serverEndpointExporter() { 5 return new ServerEndpointExporter(); 6 } 7 8 }
下面事 websocket 的具体实现方法,代码如下:
1 import org.springframework.stereotype.Component; 2 3 import javax.websocket.*; 4 import javax.websocket.server.ServerEndpoint; 5 import java.io.IOException; 6 import java.util.concurrent.CopyOnWriteArraySet; 7 8 @ServerEndpoint(value = "/websocket") 9 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理 10 public class WebSocket { 11 //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 12 private static int onlineCount = 0; 13 14 //concurrent 包的线程安全 Set,用来存放每个客户端对应的 MyWebSocket 对象。 15 private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>(); 16 17 //与某个客户端的连接会话,需要通过它来给客户端发送数据 18 private Session session; 19 20 /** 21 * 连接建立成功调用的方法 22 */ 23 @OnOpen 24 public void onOpen(Session session) { 25 this.session = session; 26 webSocketSet.add(this); //加入 set 中 27 addOnlineCount(); //在线数加 1 28 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); 29 try { 30 sendMessage("Hello world"); 31 } catch (IOException e) { 32 System.out.println("IO 异常"); 33 } 34 } 35 36 /** 37 * 连接关闭调用的方法 38 */ 39 @OnClose 40 public void onClose() { 41 webSocketSet.remove(this); //从 set 中删除 42 subOnlineCount(); //在线数减 1 43 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); 44 } 45 46 /** 47 * 收到客户端消息后调用的方法 48 * 49 * @param message 客户端发送过来的消息 50 */ 51 @OnMessage 52 public void onMessage(String message, Session session) { 53 System.out.println("来自客户端的消息:" + message); 54 55 //群发消息 56 for (WebSocket item : webSocketSet) { 57 try { 58 item.sendMessage(message); 59 } catch (IOException e) { 60 e.printStackTrace(); 61 } 62 } 63 } 64 65 /** 66 * 发生错误时调用 67 */ 68 @OnError 69 public void onError(Session session, Throwable error) { 70 System.out.println("发生错误"); 71 error.printStackTrace(); 72 } 73 74 75 public void sendMessage(String message) throws IOException { 76 this.session.getBasicRemote().sendText(message); 77 //this.session.getAsyncRemote().sendText(message); 78 } 79 80 81 /** 82 * 群发自定义消息 83 */ 84 public static void sendInfo(String message) throws IOException { 85 for (WebSocket item : webSocketSet) { 86 try { 87 item.sendMessage(message); 88 } catch (IOException e) { 89 continue; 90 } 91 } 92 } 93 94 public static synchronized int getOnlineCount() { 95 return onlineCount; 96 } 97 98 public static synchronized void addOnlineCount() { 99 WebSocket.onlineCount++; 100 } 101 102 public static synchronized void subOnlineCount() { 103 WebSocket.onlineCount--; 104 } 105 }
使用 springboot 的唯一区别是要 @Component 声明下,而使用独立容器是由容器自己管理 websocket 的,但在 springboot 中连容器都是 spring 管理的。
虽然 @Component 默认是单例模式的,但 springboot 还是会为每个 websocket 连接初始化一个 bean,所以可以用一个静态 set 保存起来。
3、前端代码
1 <!DOCTYPE HTML> 2 <html> 3 <head> 4 <title>My WebSocket</title> 5 </head> 6 7 <body> 8 Welcome<br/> 9 <input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> 10 <div id="message"> 11 </div> 12 </body> 13 14 <script type="text/javascript"> 15 var websocket = null; 16 17 //判断当前浏览器是否支持 WebSocket 18 if('WebSocket' in window){ 19 websocket = new WebSocket("ws://localhost:8084/websocket"); 20 } 21 else{ 22 alert('Not support websocket') 23 } 24 25 //连接发生错误的回调方法 26 websocket.onerror = function(){ 27 setMessageInnerHTML("error"); 28 }; 29 30 //连接成功建立的回调方法 31 websocket.onopen = function(event){ 32 setMessageInnerHTML("open"); 33 } 34 35 //接收到消息的回调方法 36 websocket.onmessage = function(event){ 37 setMessageInnerHTML(event.data); 38 } 39 40 //连接关闭的回调方法 41 websocket.onclose = function(){ 42 setMessageInnerHTML("close"); 43 } 44 45 //监听窗口关闭事件,当窗口关闭时,主动去关闭 websocket 连接,防止连接还没断开就关闭窗口,server 端会抛异常。 46 window.onbeforeunload = function(){ 47 websocket.close(); 48 } 49 50 //将消息显示在网页上 51 function setMessageInnerHTML(innerHTML){ 52 document.getElementById('message').innerHTML += innerHTML + '<br/>'; 53 } 54 55 //关闭连接 56 function closeWebSocket(){ 57 websocket.close(); 58 } 59 60 //发送消息 61 function send(){ 62 var message = document.getElementById('text').value; 63 websocket.send(message); 64 } 65 </script> 66 </html>
以上代码,实现了 websocket 简单消息推送,可以实现两个页面间的消息显示,但是 Java 后台主动推送消息时,无法获取消息推送的 websocket 下的 session,即无法实现 websocket 下 session 的共享。
为解决主动推送的难题,需要在建立连接时,将 websocket 下的 session 与 servlet 下的 HttpSession(或者其他 session,我们这用到了 shiro 下的 session)建立关联关系。
webSocket 配置 Java 类:
1 import com.bootdo.common.utils.ShiroUtils; 2 import org.apache.catalina.session.StandardSessionFacade; 3 import org.apache.shiro.session.Session; 4 import org.springframework.context.annotation.Bean; 5 import org.springframework.context.annotation.Configuration; 6 import org.springframework.web.socket.server.standard.ServerEndpointExporter; 7 8 import javax.servlet.http.HttpSession; 9 import javax.websocket.HandshakeResponse; 10 import javax.websocket.server.HandshakeRequest; 11 import javax.websocket.server.ServerEndpointConfig; 12 import javax.websocket.server.ServerEndpointConfig.Configurator; 13 14 @Configuration 15 public class WebSocketConfig extends Configurator { 16 17 @Override 18 public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { 19 /*如果没有监听器, 那么这里获取到的 HttpSession 是 null*/ 20 StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession(); 21 if (ssf != null) { 22 HttpSession httpSession = (HttpSession) request.getHttpSession(); 23 //关键操作 24 sec.getUserProperties().put("sessionId", httpSession.getId()); 25 System.out.println("获取到的 SessionID:" + httpSession.getId()); 26 } 27 } 28 29 /** 30 * 引入 shiro 框架下的 session,获取 session 信息 31 */ 32 /* 33 @Override 34 public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { 35 Session shiroSession = ShiroUtils.getSubjct().getSession(); 36 sec.getUserProperties().put("sessionId", shiroSession.getId()); 37 } 38 */ 39 40 @Bean 41 public ServerEndpointExporter serverEndpointExporter() { 42 //这个对象说一下,貌似只有服务器是 tomcat 的时候才需要配置, 具体我没有研究 43 return new ServerEndpointExporter(); 44 } 45 }
webSocket 消息实现类方法:
1 import org.springframework.stereotype.Component; 2 3 import javax.websocket.*; 4 import javax.websocket.server.ServerEndpoint; 5 import java.io.IOException; 6 import java.util.concurrent.CopyOnWriteArraySet; 7 8 //configurator = WebsocketConfig.class 该属性就是我上面配置的信息 9 @ServerEndpoint(value = "/websocket", configurator = WebSocketConfig.class) 10 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理 11 public class WebSocket { 12 //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 13 private static int onlineCount = 0; 14 15 //concurrent 包的线程安全 Set,用来存放每个客户端对应的 MyWebSocket 对象。 16 private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>(); 17 18 //与某个客户端的连接会话,需要通过它来给客户端发送数据 19 private Session session; 20 21 /** 22 * 连接建立成功调用的方法 23 * <p> 24 * config 用来获取 WebsocketConfig 中的配置信息 25 */ 26 @OnOpen 27 public void onOpen(Session session, EndpointConfig config) { 28 29 //获取 WebsocketConfig.java 中配置的“sessionId”信息值 30 String httpSessionId = (String) config.getUserProperties().get("sessionId"); 31 32 this.session = session; 33 webSocketSet.add(this); //加入 set 中 34 addOnlineCount(); //在线数加 1 35 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); 36 try { 37 sendMessage("Hello world"); 38 } catch (IOException e) { 39 System.out.println("IO 异常"); 40 } 41 } 42 43 /** 44 * 连接关闭调用的方法 45 */ 46 @OnClose 47 public void onClose() { 48 webSocketSet.remove(this); //从 set 中删除 49 subOnlineCount(); //在线数减 1 50 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); 51 } 52 53 /** 54 * 收到客户端消息后调用的方法 55 * 56 * @param message 客户端发送过来的消息 57 */ 58 @OnMessage 59 public void onMessage(String message, Session session) { 60 System.out.println("来自客户端的消息:" + message); 61 62 //群发消息 63 for (WebSocket item : webSocketSet) { 64 try { 65 item.sendMessage(message); 66 } catch (IOException e) { 67 e.printStackTrace(); 68 } 69 } 70 } 71 72 /** 73 * 发生错误时调用 74 */ 75 @OnError 76 public void onError(Session session, Throwable error) { 77 System.out.println("发生错误"); 78 error.printStackTrace(); 79 } 80 81 82 public void sendMessage(String message) throws IOException { 83 this.session.getBasicRemote().sendText(message); 84 //this.session.getAsyncRemote().sendText(message); 85 } 86 87 88 /** 89 * 群发自定义消息 90 */ 91 public static void sendInfo(String message) throws IOException { 92 for (WebSocket item : webSocketSet) { 93 try { 94 item.sendMessage(message); 95 } catch (IOException e) { 96 continue; 97 } 98 } 99 } 100 101 public static synchronized int getOnlineCount() { 102 return onlineCount; 103 } 104 105 public static synchronized void addOnlineCount() { 106 WebSocket.onlineCount++; 107 } 108 109 public static synchronized void subOnlineCount() { 110 WebSocket.onlineCount--; 111 } 112 }
注意,有上面配置后,如果配置获取的信息为 null,需加入监听实现类:
1 import org.springframework.stereotype.Component; 2 3 import javax.servlet.ServletRequestEvent; 4 import javax.servlet.ServletRequestListener; 5 import javax.servlet.http.HttpServletRequest; 6 import javax.servlet.http.HttpSession; 7 8 /** 9 * 监听器类: 主要任务是用 ServletRequest 将我们的 HttpSession 携带过去 10 */ 11 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理, 相当于注册监听吧 12 public class RequestListener implements ServletRequestListener { 13 @Override 14 public void requestInitialized(ServletRequestEvent sre) { 15 //将所有 request 请求都携带上 httpSession 16 HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession(); 17 System.out.println("将所有 request 请求都携带上 httpSession" + httpSession.getId()); 18 } 19 20 public RequestListener() { 21 } 22 23 @Override 24 public void requestDestroyed(ServletRequestEvent arg0) { 25 } 26 }
对应的前端页面无需改变。
以上信息类之于网络上多篇文章整理的信息,参考文章:spring boot Websocket;使用 spring boot +WebSocket 实现(后台主动)消息推送 ; Springboot-WebSocket 初探 - 获取 HttpSession 问题