spring的websocket使用
示例
添加依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
编写一个WebSocket的配置类
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
编写服务端消息处理类
package com.xgss.adapter.service.websocket;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.registry.NacosRegistration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.spring.util.NacosBeanUtils;
import com.xgss.adapter.common.properties.MyServerProperties;
import com.xgss.adapter.common.util.MyAESUtil;
import com.xgss.adapter.model.bo.MessageBO;
import com.xgss.adapter.service.ResultMessageService;
import com.xgss.adapter.service.impl.SpringContextBeanService;
import com.xgss.platform.id.generate.IDUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.commons.util.IdUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@ServerEndpoint(value = "/websocket/server/{clientCode}/{businessCode}")
public class WebSocketServer {
@Autowired
private MyServerProperties myServerProperties;
@Autowired
private ResultMessageService resultMessageService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* key:客户端名称
* value:客户端session
*/
private static ConcurrentHashMap<String, Set<Session>> serverSessionMap = new ConcurrentHashMap<>();
//key:serverSessionId value:clientSessionId
/**
* key:服务端sessionId
* value:客户端sessionId
*/
private static ConcurrentHashMap<String, String> sessionInfoMap = new ConcurrentHashMap<>();
public ConcurrentHashMap<String, Set<Session>> getServerSessionMap(){
return serverSessionMap;
}
public ConcurrentHashMap<String, String> getSessionInfoMap(){
return sessionInfoMap;
}
private static Long serverCode= IDUtils.getId();
public static Long getServerCode() {
return serverCode;
}
/**
* 连接建立成功调用
* 对于clientCode 和businessId做验证,不符合我们系统内部定义的,关闭连接
* 一个服务端可以接受多个同业务类型的客户端,放在一个Set里面;
* 如果同一业务类型的客户端已经跟服务端建立了链接,新过来的连接保存之前会检查之前连接的客户端是否都还在(看是否open,分别给它们发送消息),并把新连接保存到list
* 测试联通性消息: connectionTest:客户端在服务端的sessionId
*/
@OnOpen
public void onOpen(Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws Exception{
init();
String key=getSessionKey(clientCode,businessCode);
String sessionId = session.getId();
if(!verifyParam(clientCode, businessCode)){//clientCode 和businessId验证
log.warn("接收到客户端非法连接请求,客户端key:{},sessionId:{},忽略",key, sessionId);
session.close();
return;
}
//这里同样的key会存在并发
RLock lock = redissonClient.getLock(key);
boolean locked = lock.tryLock(2, 2, TimeUnit.SECONDS);
if(!locked){
log.warn("接收到客户端连接获取锁失败,客户端key:{},sessionId:{}",key, sessionId);
}
try{
Set<Session> sessions = serverSessionMap.get(key);
if(sessions==null){
sessions= new CopyOnWriteArraySet<>();
serverSessionMap.put(key,sessions);
}
try{
if(session.isOpen()){
session.getAsyncRemote().sendText("connectionTest:"+ sessionId);
}else{
log.info("接收到客户端连接,session为关闭状态,关闭,客户端:{},sessionId:{}",key, sessionId);
session.close();
return;
}
}catch (Exception e){
log.warn("发送测试消息报错:客户端key:{},sessionId:{}",key, sessionId, e);
return;
}
//向所有该客户端连接发送测试消息避免有问题连接在里面
List<Session> removeSessions=new ArrayList<>();
for(Session son: sessions){
try{
if(son.isOpen()){
son.getAsyncRemote().sendText("connectionTest:"+son.getId());
}else{
son.close();
removeSessions.add(son);
}
}catch (Exception e){
removeSessions.add(son);
log.warn("发送测试消息报错:客户端key:{},sessionId:{}",key,son.getId(), e);
}
}
for(Session removeSession: removeSessions){
sessions.remove(removeSession);
sessionInfoMap.remove(removeSession.getId());
}
sessions.add(session);
sessionInfoMap.put(sessionId,"无");
//redis设置
stringRedisTemplate.opsForValue().set("network-adapter-server:"+serverCode+":"+ sessionId, sessionId,8,TimeUnit.SECONDS);
log.info("接收到客户端连接,连接成功,客户端:{},sessionId:{},客户端数量{}",key, sessionId,sessions.size());
}catch (Exception e){
log.error("客户端key:{}保存session出错",key, e);
}finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
//
private String getSessionKey(String clientCode,String businessCode){
return clientCode+"_"+businessCode;
}
/**
* 验证clientCode和businessCode 是否合法
* true 合法;false 非法
*/
private boolean verifyParam(String clientCode,String businessCode){
return true;
}
/**
* 初始化加载
*/
private void init() {
myServerProperties = SpringContextBeanService.getBean(MyServerProperties.class);
resultMessageService =SpringContextBeanService.getBean(ResultMessageService.class);
stringRedisTemplate =SpringContextBeanService.getBean(StringRedisTemplate.class);
redissonClient =SpringContextBeanService.getBean(RedissonClient.class);
}
/**
* 关闭连接时调用
*/
@OnClose
public void onClose(Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) {
//此时session.isOpen() 为false
String key=getSessionKey(clientCode,businessCode);
log.warn("onClose被触发,客户端key:{},sessionId:{}",key, session.getId());
Set<Session> sessions = serverSessionMap.get(key);
sessions.remove(session);
sessionInfoMap.remove(session.getId());
}
/**
* 异常
* 客户端服务关闭会调用到这里
*/
@OnError
public void onError(Session session, Throwable error,
@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws IOException {
//此时session.isOpen() 为true,触发onError后onClose也被触发,在onClose中去掉session即可
String key = getSessionKey(clientCode, businessCode);
log.error("onError被触发:客户端key:{},错误信息:{}", key,error.getMessage());
//如果客户端端口也会调用到这里
// error.printStackTrace();
}
/**
* 接收到客户端字符串消息
* maxMessageSize 单位byte 10000000 约10M
*
*/
@OnMessage(maxMessageSize = 5000000)
public void onMessage(String message, Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws Exception {
if( message.startsWith("connectionTest:")){
log.info("接收到测试消息,{}:{}:{}",getSessionKey(clientCode,businessCode),session.getId(),message);
String[] backs = message.split("back:");
String clientSessionId = backs[backs.length - 1];
sessionInfoMap.put(session.getId(), clientSessionId);
//redis设置
stringRedisTemplate.opsForValue().set("network-adapter-server:"+serverCode+":"+ session.getId(), clientSessionId,8,TimeUnit.SECONDS);
return;
}else{
log.info("接收到消息,客户端key:{},sessionId:{}",getSessionKey(clientCode,businessCode),session.getId());
}
resultMessageService.messageSave(message,session,clientCode,businessCode);
}
// 定时任务去定时发送ping,收到pong后更新失效时间,如果服务端的sessionId已经失效就移除该session
@OnMessage
public void onPong(PongMessage message, Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws IOException {
log.debug("{}:{}:接收到pong消息",getSessionKey(clientCode,businessCode),session.getId());
stringRedisTemplate.expire("network-adapter-server:"+serverCode+":"+session.getId(),8, TimeUnit.SECONDS);
}
/**
* 接收到客户端字节数组消息
* maxMessageSize 单位byte 10000000 约10M
*
*/
// @OnMessage(maxMessageSize = 5000000)
// public void onMessage( byte[] message, Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws Exception {
// log.info("消息内容:{}",message);
// }
/**
* 遍历群发消息
*
* @param text
*/
// public void send(String text) {
// for (ConcurrentHashMap.Entry<String, Session> entry : serverSessionMap.entrySet()) {
// send(entry.getKey(), text);
// }
// }
/**
* 根据clientId发送消息
*
* @param key
* @param text
*/
public void send(String key,String text) {
try {
log.info("发送消息,客户端key:{}",key);
Set<Session> sessions = serverSessionMap.get(key);
if(CollectionUtil.isNotEmpty(sessions)){
log.info("{}连接数量:{}",key,sessions.size());
for(Session session: sessions){
if(session!=null && session.isOpen()){
//对text进行加密
String encryptMsg = MyAESUtil.encrypt(text, myServerProperties.getAesKey());
session.getAsyncRemote().sendText(encryptMsg);
log.info("发送消息成功key:{},sessionId:{}",key,session.getId());
return;
}
}
}
//没有发送出去放到redis做广播
Map<String,String> map = new HashMap<String, String>();
map.put("msgKey", key);
map.put("msgValue", text);
log.info("发布广播消息key:{}",key);
stringRedisTemplate.convertAndSend("networkAdapter", JSON.toJSONString(map));
} catch (Exception e) {
log.error("系统异常",e);
}
}
public void sendMessageFromRedis(String key,String text){
try {
Set<Session> sessions = serverSessionMap.get(key);
if(CollectionUtil.isEmpty(sessions)){
log.warn("接收到广播消息,session不存在:客户端key:{},忽略消息",key);
return;
}
log.info("接收到广播消息,准备发送消息到客户端,客户端key:{}连接数量:{}",key,sessions.size());
for(Session session: sessions){
if(session!=null && session.isOpen()){
MessageBO messageBO = JSONObject.parseObject(text, MessageBO.class);
String messageNo = messageBO.getMessageNo();
Long increment = stringRedisTemplate.opsForValue().increment(messageNo);
stringRedisTemplate.expire(messageNo,1, TimeUnit.MINUTES);
if(increment!=null && increment>1L){//已经发送过了
log.info("消息已发送过,这里不再重复发送,客户端key:{},sessionId:{}",key,session.getId());
return;
}
//对text进行加密
String encryptMsg = MyAESUtil.encrypt(text, myServerProperties.getAesKey());
session.getAsyncRemote().sendText(encryptMsg);
log.info("发送消息成功,客户端key:{},sessionId:{}",key,session.getId());
return;
}
log.error("没有可用的连接,客户端key:{}",key);
}
} catch (Exception e) {
log.error("系统异常",e);
}
}
}
服务端心跳
避免客户端断网,服务端没有被触发事件,可以用心跳来保障服务端维护的session都是可用的,服务端定时去给客户端发送ping消息,客户端收到后会发送pong消息,服务端如果在指定的时间内没有收到pong消息就认为与客户端的这个session不可用。
package com.ctfo.adapter.service.task;
import cn.hutool.core.collection.CollectionUtil;
import com.ctfo.adapter.service.websocket.WebSocketServer;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.log4j.Log4j2;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.Session;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 定时任务
*/
@Log4j2
@Component
public class ServerTask {
@Resource
private WebSocketServer webSocketServer;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
*
*
*/
@XxlJob(value = "pingTask")
public ReturnT<String> pingTask(String param) {
XxlJobLogger.log("pingTask begin,param={}", param == null ? "null" : param);
ConcurrentHashMap<String, Set<Session>> serverSessionMap = webSocketServer.getServerSessionMap();
ConcurrentHashMap<String, String> sessionInfoMap = webSocketServer.getSessionInfoMap();
//遍历执行ping命令
Collection<Set<Session>> values = serverSessionMap.values();
ByteBuffer payload = ByteBuffer.wrap("Ping".getBytes());
//key:客户端名称,value:会话session列表
for(Map.Entry<String, Set<Session>> entry: serverSessionMap.entrySet()){
List<Session> removeSessions=new ArrayList<>();
Set<Session> sessions = entry.getValue();
String clientName = entry.getKey();
for(Session session:sessions){
String sessionId = session.getId();
try{
//redis获取看是否还在,不在的话从里面剔除掉
if(stringRedisTemplate.opsForValue().get("network-adapter-server:"+WebSocketServer.getServerCode()+":"+sessionId)==null){
removeSessions.add(session);
continue;
}
session.getAsyncRemote().sendPing(payload);
}catch (Exception e){
log.error("执行ping命令报错,{}:{}",clientName,sessionId);
removeSessions.add(session);
}
}
if(CollectionUtil.isNotEmpty(removeSessions)){
removeSessions.forEach(session -> {
String sessionId = session.getId();
try{
session.close();
}catch (Exception ex){
log.error("执行ping命令后close失败,{}:{}:{}",clientName,sessionId,ex.getMessage());
}
sessions.remove(session);
sessionInfoMap.remove(sessionId);
log.info("session长时间未ping通,移除客户端:{}:{}",clientName, sessionId);
});
}
}
return ReturnT.SUCCESS;
}
}
服务端测试类
对应的controller和service接口这里忽略展示,作用主要是:可以调用接口了解服务端有多少个客户端进行连接,客户端session的对应关系
public List<SessionInfoVO> getSessionInfo() {
ConcurrentHashMap<String, Set<Session>> serverSessionMap = webSocketServer.getServerSessionMap();
ConcurrentHashMap<String, String> sessionInfoMap = webSocketServer.getSessionInfoMap();
List<SessionInfoVO> resultList=new ArrayList<>();
for(Map.Entry<String, Set<Session>> entry: serverSessionMap.entrySet()){
SessionInfoVO sessionInfoVO=new SessionInfoVO();
sessionInfoVO.setClientName(entry.getKey());
List<SessionStatusVO> statusVOS=new ArrayList<>();
for(Session session: entry.getValue()){
SessionStatusVO statusVO=new SessionStatusVO();
statusVO.setClientSessionId(session.getId());
statusVO.setOpenStatus(session.isOpen());
statusVO.setServerSessionId(sessionInfoMap.get(session.getId()));
statusVOS.add(statusVO);
}
sessionInfoVO.setSessionStatusVOList(statusVOS);
resultList.add(sessionInfoVO);
}
return resultList;
}
@Data
public class SessionInfoVO {
/**
* 客户端名称
*/
private String clientName;
private List<SessionStatusVO> sessionStatusVOList;
}
@Data
public class SessionStatusVO {
/**
* 客户端sessionId
*/
private String clientSessionId;
/**
* 服务端sessionId
*/
private String serverSessionId;
/**
* 会话是否打开
*/
private Boolean openStatus;
}
客户端编写
package com.xgss.adapter.service.websocket;
import com.xgss.adapter.common.constants.ClientConstants;
import com.xgss.adapter.common.properties.MyServerProperties;
import com.xgss.adapter.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@ClientEndpoint
public class PayOpenidClient {
private static PayOpenidClient client;
@Autowired
private MyServerProperties myServerProperties;
@Autowired
private MessageService messageService;
private Session session;
/** 尝试重连标识 */
private AtomicBoolean tryReconnect;//是否正在重连
private final String code= ClientConstants.PAY_CODE;
private final String business= ClientConstants.PAY_BUSINESS_OPENID;
private final String clientName=code+":"+business;
private String serverSessionId;
public Session getSession(){
return this.session;
}
public String getClientName(){
return this.clientName;
}
public String getServerSessionId(){
return this.serverSessionId;
}
@PostConstruct
void init() {
// log.info("{},{}初始化:{}",code,business,this.toString());
client = this;
this.tryReconnect = new AtomicBoolean(false);
hearBeat();
}
@PreDestroy
public void destroy(){
try{
if(client.session !=null){
log.info("销毁session:{},sessionId:{}",clientName,client.session.getId());
client.session.close();
}
}catch (Exception e){
log.error("session关闭失败");
}
}
private void hearBeat(){
Timer timer=new Timer(true);
log.info(clientName+":心跳定时器启动");
timer.scheduleAtFixedRate(new TimerTask(){
public void run() {
log.debug(clientName+":心跳触发");
ping();
}
},5000,2000);//5秒后执行,每2秒钟执行一次
}
public void ping() {
try {
if(session == null){
log.warn(clientName+":session不存在,触发重新连接");
if(!tryReconnect.get()){
toConnect();
}
}else if(!session.isOpen()){
log.warn(clientName+":{}"+ ":session关闭了,触发重新连接",session.getId());
session = null;
if(!tryReconnect.get()){
toConnect();
}
}
if(session != null && session.isOpen()){
String data = "Ping";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
log.debug(clientName+":{}:发送Ping",client.session.getId());
session.getAsyncRemote().sendPing(payload);
}
// Exception in thread "Timer-0" java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session
} catch (Exception ex) {
log.error(clientName+":Ping报错了",ex);
ex.printStackTrace();
}
}
private String getServerUrl(){
return client.myServerProperties.getWebSocketServerUrl()+"/"+ code+"/"+business;
}
private void getSession(URI uri) throws IOException, DeploymentException {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
session = container.connectToServer(client, uri);
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
log.info(clientName+":"+session.getId()+":已连接");
}
/**
* 异常处理
*/
@OnError
public void onError(Throwable throwable) {
log.error(clientName+":发生异常了");
throwable.printStackTrace();
}
@OnClose
public void onClose(CloseReason reason) throws IOException{
if(session !=null){
client.session.close();
log.info(clientName+":"+client.session.getId()+":关闭,原因:"+reason.toString());
}else{
log.info(clientName+":关闭,原因:"+reason.toString());
}
session = null;
if(!tryReconnect.get()){
toConnect();
}
}
/**
* 连接服务端
*/
private void toConnect(){
tryReconnect.set(true);
//这里进行循环连接服务端
String webSocketServerUrl = getServerUrl();
synchronized (PayOpenidClient.class) {
if (session ==null ){
log.debug(clientName+":尝试连接webSocket服务端");
try {
// 本机地址
// String hostAddress = InetAddress.getLocalHost().getHostAddress();
URI uri = URI.create(webSocketServerUrl);
getSession(uri);
} catch (Exception e) {
//如果服务端没有启动,这里会报错
log.warn(clientName+":连接webSocket服务端失败");
}
// if(session==null){
// log.warn(clientName+":连接webSocket服务端失败");
// }else{
// log.info(clientName+session.getId()+"已连接");
// }
}}
tryReconnect.set(false);
}
@OnMessage
public void onMessage(String msg, Session session) throws Exception {
if( msg.startsWith("connectionTest:")){
log.info( clientName+":"+session.getId() + ":接收到测试消息:" + msg);
String[] split = msg.split("connectionTest:");
this.serverSessionId=split[1];
session.getAsyncRemote().sendText(msg+":back:"+session.getId());
return;
}else{
log.info( clientName+":"+session.getId() + ":接收到的消息");
}
client.messageService.pay_openid(msg,session,clientName);
}
@OnMessage
public void onMessage(PongMessage message, Session session) throws IOException {
log.debug(clientName+":{}:接收到Pong消息",session.getId());
}
}
@OnMessage
属性maxMessageSize 单位byte
@OnMessage
public void handleTextMessage(String textMessage) {
...
}
//接收二进制消息
@OnMessage
public void handleBinaryMessage(byte[] messageData) {
...
}
//接收Pong消息,需要声明一个类型为 javax.websocket.PongMessage 的方法参数
@OnMessage
public String handlePongMessage(PongMessage pongMessage) {
return "I got " + pongMessage.getApplicationData().length + " bytes of data.";
}
发送ping消息
ByteBuffer payload = ByteBuffer.wrap("Ping".getBytes());
session.getAsyncRemote().sendPing(payload);
发送ping命令后对方不需要做处理,如果对方可以会自动收到pong消息,如下事件会被触发
@OnMessage
public String handlePongMessage(PongMessage pongMessage) {
注入实例为null
参考链接:https://www.csdn.net/tags/OtTaQg3sODMwNzQtYmxvZwO0O0OO0O0O.html
问题原因:
Spring默认对bean的管理都是单例(singleton),和 websocket (多对象)相冲突。
项目启动时初始化,会初始化 websocket (非用户连接的),spring 同时会为其注入 service,该对象的 service 不是 null,被成功注入。但是,由于 spring 默认管理的是单例,所以只会注入一次 service。当用户建立新的连接时,系统又会创建一个新的 websocket 对象,这时不会再次注入了,所以导致只要是用户连接创建的 websocket 对象,都不能再注入了,后面的注入均为null。
方法1:使用静态变量 加 @PostConstruct 解决
@ServerEndpoint("/websocket/{userId}")
@Component //关键点1
public class WebSocket {
@Autowired
private UserService userService;
private static WebSocket webSocket; //关键点2
@PostConstruct //关键点3
public void init(){
webSocket = this;
}
//略去其他代码...
@OnMessage
public void onMessage(String message, Session session) {
if(!StringUtils.isEmpty(message)){
try {
log.info("收到用户【{}】的消息: {}",this.userId,message);
webSocket.userService.helloService(); //关键点4
session.getBasicRemote().sendText("收到 "+this.userId+" 的消息 ");
}catch (Exception e){
e.printStackTrace();
}
}
}
}
方法2:使用静态变量,加set注入
@ServerEndpoint("/websocket/{userId}")
@Component //关键点1
public class WebSocket {
private static UserService userService; //关键点2
@Autowired //关键点3
public void setUserService (UserService userService){
WebSocket.userService = userService;
}
//略去其他代码...
@OnMessage
public void onMessage(String message, Session session) {
if(!StringUtils.isEmpty(message)){
try {
log.info("收到用户【{}】的消息: {}",this.userId,message);
WebSocket.userService.helloService(); //关键点4
session.getBasicRemote().sendText("收到 "+this.userId+" 的消息 ");
}catch (Exception e){
e.printStackTrace();
}
}
}
}
方法3:使用工具类用于从Spring的上下文中去获取到类的实例
@ServerEndpoint("/websocket/{userId}")
@Component //关键点1
public class WebSocket {
private static UserService userService = SpringContextHolder.getBean(UserService.class); //关键点2
//略去其他代码...
@OnMessage
public void onMessage(String message, Session session) {
if(!StringUtils.isEmpty(message)){
try {
log.info("收到用户【{}】的消息: {}",this.userId,message);
userService.helloService(); //关键点3
session.getBasicRemote().sendText("收到 "+this.userId+" 的消息 ");
}catch (Exception e){
e.printStackTrace();
}
}
}
}
工具类SpringContextHolder
此工具类用于从Spring的上下文中去获取到类的实例
@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
private static ApplicationContext applicationContext = null;
/**
* 取得存储在静态变量中的ApplicationContext.
*/
public static ApplicationContext getApplicationContext() {
assertContextInjected();
return applicationContext;
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) {
assertContextInjected();
return (T) applicationContext.getBean(name);
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
public static <T> T getBean(Class<T> requiredType) {
assertContextInjected();
return applicationContext.getBean(requiredType);
}
/**
* 清除SpringContextHolder中的ApplicationContext为Null.
*/
public static void clearHolder() {
applicationContext = null;
}
/**
* 实现ApplicationContextAware接口, 注入Context到静态变量中.
*/
@Override
public void setApplicationContext(ApplicationContext appContext) {
applicationContext = appContext;
}
/**
* 实现DisposableBean接口, 在Context关闭时清理静态变量.
*/
@Override
public void destroy() throws Exception {
SpringContextHolder.clearHolder();
}
/**
* 检查ApplicationContext不为空.
*/
private static void assertContextInjected() {
Validate.validState(applicationContext != null, "applicaitonContext属性未注入, 请在applicationContext.xml中定义SpringContextHolder.");
}
}
一个连接多条消息
上一条消息在onMessage中还没有处理完,是接收不到后面这个客户端发的消息的
同一个连接的消息是串行处理的
多个连接的消息
多个连接可以同时收到消息,并行处理
Session的方法
javax.websocket.Session的方法
getBasicRemote和getAsyncRemote
getAsyncRemote是异步发送消息,getBasicRemote是同步发送消息。也就是说getBasicRemote()要等上一条消息发送完才能发送下一条消息
session.setMaxTextMessageBufferSize(10*1024*1024);//设置最大字符串消息大小,超过将断开连接,单位为byte,默认是8192约8kb
session.setMaxBinaryMessageBufferSize(10*1024*1024);//设置最大二进制消息大小,超过将断开连接,8192约8kb
session.setMaxIdleTimeout(-1);最大空闲时间,超过服务端会断开连接,设置0或负数代表无限,默认是0
消息大小限制设置:
对于websoket服务:
@OnMessage的属性maxMessageSize(单位byte) 可以设置接收的消息的最大大小
或者在@OnOpen标注的方法内对Session进行设置,(单位为byte)
session.setMaxTextMessageBufferSize(10*1024*1024);
session.setMaxBinaryMessageBufferSize(10*1024*1024);
一个是设置字符串消息的最大byte,等同于session.setMaxTextMessageBufferSize
一个是设置二进制消息最大byte,等同于session.setMaxBinaryMessageBufferSize
SpringCloudGateway中默认定义的消息大小为65536 : 单位是Byte
NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE = 64 * 1024;
所以如果发送的消息超过了64KB ,就会导致消息过大,然后WebSocket就会断开连接。
解决方法:
在 gateway 项目的配置文件中加入下面这句话并设置大小。
官网地址
https://docs.spring.io/spring-cloud-gateway/docs/2.2.7.RELEASE/reference/html/appendix.html
连接的数量
各个浏览器支持的最大websocket连接数不同
不同的服务器支持的websocket连接数也不同
springboot项目默认的启动容器是tomcat,而tomcat默认支持1W的连接数量。
linux也会有限制,Linux操作系统中,一切都是文件。所以每个TCP连接,都会打开一个文件。为此Linux操作系统限制了每个用户能打开的文件数量,通过ulimit -n 查看。如:
TCP连接数量还受到端口数量限制,由于端口号只有1-65535,所以最大TCP连接数也只有65535个(包括系统端口1-1024)
Linux操作系统对所有用户最大能打开文件的限制:cat /proc/sys/fs/file-max。如:
网络核心模块对tcp连接的限制(最大不能超过65535):cat /etc/sysctl.conf
防火墙对tcp连接的限制
综上,在Linux操作系统中,首先对TCP连接数量的限制依次有:端口数量限制,网络核心限制,最大文件数量限制(因为每建立一个连接就要打开一个文件),防火墙限制,用户打开文件限制。
长连接最大空闲时间设置
如果长连接超过超时时间没有进行交互,服务端就会关闭session;
session.setMaxIdleTimeout 可以设置最大空闲时间,单位为毫秒,0或负数代表无限,关闭session后服务端和客户端都能触发@OnClose
websocket的长连接会因为各种中间环节断开,而且很多时候Onclose没有被触发的(比如断网),所以要想一直保持连接,可以让客户端定期发送一条心跳包,用于维持Session
心跳机制
websocket规范定义了心跳机制,一方可以通过发送ping(协议头opcode 0x9)消息给另一方,另一方收到ping后应该按照协议要求,尽可能快的返回pong(协议头opcode 0xA);
浏览器:
各个浏览器并没有为websocket的发送/接收ping/pong消息提供JavaScript API,但各个浏览器都有各自的缺省检测方案和处理方法,如chrome一旦收到ping包,会马上自动答复一个相同的包作为pong,再如某些浏览器在一段时间没有发送数据传输时,会自动发送一个ping包到服务器检测是否还在线/保持连接。
如在客户端添加心跳:
package com.ctfo.adapter.service.websocket;
import com.ctfo.adapter.common.properties.MyServerProperties;
import com.ctfo.adapter.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
@Slf4j
@Component
@ClientEndpoint
public class InvoiceRedClient {
private static InvoiceRedClient invoiceRedClient;
@Autowired
private MyServerProperties myServerProperties;
@Autowired
private MessageService messageService;
private Session session;
private final String clientName="客户端";
@PostConstruct
void init() {
invoiceRedClient = this;
hearBeat();
}
private void hearBeat(){
Timer timer=new Timer(true);
log.info(clientName+"心跳定时器启动");
timer.schedule(new TimerTask(){
public void run() {
log.debug(clientName+"心跳触发");
ping();
}
},5000,5000);//5秒后执行,每5秒钟执行一次
}
public void ping() {
try {
if(session == null){
log.warn(clientName+"session不存在,触发重新连接");
toConnect();
}else if(!session.isOpen()){
log.warn(clientName+"session关闭了,触发重新连接");
session = null;
toConnect();
}
if(session != null && session.isOpen()){
String data = "Ping";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
log.debug(clientName+"发送Ping");
session.getBasicRemote().sendPing(payload);
}
// Exception in thread "Timer-0" java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session
} catch (Exception ex) {
log.error(clientName+"Ping报错了",ex);
ex.printStackTrace();
}
}
private String getServerUrl(){
String clientCode = invoiceRedClient.myServerProperties.getInvoiceClient().getClientCode();
String businessCode = invoiceRedClient.myServerProperties.getInvoiceClient().getRedInvoice();
return invoiceRedClient.myServerProperties.getWebSocketServerUrl()+"/"+clientCode+"/"+businessCode;
}
private void getSession(URI uri) throws IOException, DeploymentException {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
session = container.connectToServer(InvoiceRedClient.class, uri);
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
log.info(clientName+session.getId()+"连接");
}
/**
* 异常处理
* @param throwable
*/
@OnError
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@OnClose
public void onClose() throws IOException{
//当服务端关闭了连接,或者服务停掉,这里会被调用
session.close();
log.info(clientName+session.getId()+"关闭");
session = null;
toConnect();
}
/**
* 连接服务端
*/
private void toConnect(){
//这里进行循环连接服务端
String webSocketServerUrl = getServerUrl();
if (session ==null ){
log.info(clientName+"尝试连接webSocket服务端:{}",webSocketServerUrl);
try {
URI uri = URI.create(webSocketServerUrl);
getSession(uri);
} catch (Exception e) {
log.warn(clientName+"连接webSocket服务端{}失败",webSocketServerUrl);
}
if(session ==null){
log.warn(clientName+"连接webSocket服务端{}失败",webSocketServerUrl);
}else{
log.info(clientName+session.getId()+"已连接");
}
}
}
@OnMessage
public void onMessage(String msg, Session session) throws Exception {
log.info( clientName+session.getId() + "接收到的消息:" + msg);
invoiceRedClient.messageService.invoice_red(msg,session,clientName);
}
@OnMessage
public void onMessage(PongMessage message, Session session) throws IOException {
log.debug(clientName+"接收到Pong消息");
}
}
服务端触发OnClose,客户端没有触发
服务端已经触发OnClose了, 客户端的session发送心跳还能够收到Pong消息
IOException:Broken pipe
客户端读取超时关闭了连接,这时服务器往客户端再写数据就发生了broken pipe异常
引起这个的两方面
1对客户端来说,检查客户端连接超时时间;
2对服务端 nginx配置的几个超时时间
Connect reset by peer
如果一端的Socket被关闭(或主动关闭,或因为异常退出而引起的关闭),另一端仍发送数据,发送的第一个数据包引发该异常
一端退出,但退出时并未关闭该连接,另一端如果在从连接中读数据则抛出该异常(Connection reset)
错误码总结
1000 正常关闭 当你的会话成功完成时发送这个代码
1001 离开 因应用程序离开且不期望后续的连接尝试而关闭连接时,发送这一代码。服务器可能关闭,或者客户端应用程序可能关闭
1002 协议错误 当因协议错误而关闭连接时发送这一代码
1003 不可接受的数据类型 当应用程序接收到一条无法处理的意外类型消息时发送这一代码
1004 保留 不要发送这一代码。根据 RFC 6455,这个状态码保留,可能在未来定义
1005 保留 不要发送这一代码。WebSocket API 用这个代码表示没有接收到任何代码
1006 保留 不要发送这一代码。WebSocket API 用这个代码表示连接异常关闭
1007 无效数据 在接收一个格式与消息类型不匹配的消息之后发送这一代码。如果文本消息包含错误格式的 UTF-8 数据,连接应该用这个代码关闭
1008 消息违反政策 当应用程序由于其他代码所不包含的原因终止连接,或者不希望泄露消息无法处理的原因时,发送这一代码
1009 消息过大 当接收的消息太大,应用程序无法处理时发送这一代码(记住,帧的载荷长度最多为64 字节。即使你有一个大服务器,有些消息也仍然太大。)
1010 需要扩展 当应用程序需要一个或者多个服务器无法协商的特殊扩展时,从客户端(浏览器)发送这一代码
1011 意外情况 当应用程序由于不可预见的原因,无法继续处理连接时,发送这一代码
1015 TLS失败(保留) 不要发送这个代码。WebSocket API 用这个代码表示 TLS 在 WebSocket 握手之前失败。
0 ~ 999 禁止 1000 以下的代码是无效的,不能用于任何目的
1000 ~ 2999 保留 这些代码保留以用于扩展和 WebSocket 协议的修订版本。按照标准规定使用这些代码,参见表 3-4
3000 ~ 3999 需要注册 这些代码用于“程序库、框架和应用程序”。这些代码应该在 IANA(互联网编号分配机构)公开注册
4000 ~ 4999 私有 在应用程序中将这些代码用于自定义用途。因为它们没有注册,所以不要期望它们能被其他 WebSocket广泛理解
session的共享
比如两个服务端做负载均衡,一个客户端建立websocket连接后,两个服务端都可以与它进行通信
websocket的Session没有实现序列化接口,不能进行序列化存储,无法通过redis来进行实现session共享
可以使用redis的发布订阅(也可以使用其他mq的方式来实现)来实现负载均衡,当请求达到服务器1上是,如果它没有与客户端建立连接,就进行发布消息,其他服务端介绍到消息看是否与客户端建立了连接,如果有建立连接就发送给客户端
@Autowired
private StringRedisTemplate stringRedisTemplate;
stringRedisTemplate.convertAndSend("myChannel", "myMsg");
public void onMessage(Message message, byte[] pattern){
String channel = new String(message.getChannel());// 订阅的频道名称
String msg = "";
String key="myClient";
try {
msg = new String(message.getBody(), Charsets.UTF_8);//注意与发布消息编码一致,否则会乱码
if (!StringUtils.isEmpty(msg)) {
if ("myChannel".equals(channel))// 最新消息
{
Set<Session> sessions = serverSessionMap.get(key);
if (CollectionUtil.isEmpty(sessions)) {
log.warn("接收到广播消息,session不存在:客户端key:{},忽略消息", key);
return;
}
log.info("接收到广播消息,准备发送消息到客户端,客户端key:{}连接数量:{}", key, sessions.size());
for (Session session : sessions) {
if (session != null && session.isOpen()) {
Long increment = stringRedisTemplate.opsForValue().increment("messageNo");
stringRedisTemplate.expire("messageNo", 1, TimeUnit.MINUTES);
if (increment != null && increment > 1L) {//已经发送过了
log.info("消息已发送过,这里不再重复发送,客户端key:{},sessionId:{}", key, session.getId());
return;
}
session.getAsyncRemote().sendText(msg, new RetrySendHandler(msg, session));
log.info("发送消息成功,客户端key:{},sessionId:{}", key, session.getId());
return;
}
}
} else {
log.info("消息内容为空,不处理。");
}
}
}catch (Exception e)
{
log.error("处理消息异常",e);
}
}