java websocket总结--spring websocket

时间:2025-08-27 15:00:02来源:互联网

下面小编就为大家分享一篇java websocket总结--spring websocket,具有很好的参考价值,希望对大家有所帮助。

spring的websocket使用

示例

添加依赖

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
    <version>5.0.7.RELEASE</version>
</dependency>

编写一个WebSocket的配置类

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

编写服务端消息处理类

package com.xgss.adapter.service.websocket;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.registry.NacosRegistration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.spring.util.NacosBeanUtils;
import com.xgss.adapter.common.properties.MyServerProperties;
import com.xgss.adapter.common.util.MyAESUtil;
import com.xgss.adapter.model.bo.MessageBO;
import com.xgss.adapter.service.ResultMessageService;
import com.xgss.adapter.service.impl.SpringContextBeanService;
import com.xgss.platform.id.generate.IDUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.commons.util.IdUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@ServerEndpoint(value = "/websocket/server/{clientCode}/{businessCode}")
public class WebSocketServer {
    @Autowired
    private MyServerProperties myServerProperties;
    @Autowired
    private ResultMessageService resultMessageService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;

    /**
     * key:客户端名称
     * value:客户端session
     */
    private static ConcurrentHashMap<String, Set<Session>> serverSessionMap = new ConcurrentHashMap<>();
//key:serverSessionId value:clientSessionId
    /**
     * key:服务端sessionId
     * value:客户端sessionId
     */
    private static ConcurrentHashMap<String, String> sessionInfoMap = new ConcurrentHashMap<>();

    public ConcurrentHashMap<String, Set<Session>> getServerSessionMap(){
        return serverSessionMap;
    }

    public ConcurrentHashMap<String, String> getSessionInfoMap(){
        return sessionInfoMap;
    }

    private static Long serverCode= IDUtils.getId();

    public static Long getServerCode() {
        return serverCode;
    }

    /**
     * 连接建立成功调用
     * 对于clientCode 和businessId做验证,不符合我们系统内部定义的,关闭连接
     * 一个服务端可以接受多个同业务类型的客户端,放在一个Set里面;
     * 如果同一业务类型的客户端已经跟服务端建立了链接,新过来的连接保存之前会检查之前连接的客户端是否都还在(看是否open,分别给它们发送消息),并把新连接保存到list
     * 测试联通性消息:  connectionTest:客户端在服务端的sessionId
     */

    @OnOpen
    public void onOpen(Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws Exception{
        init();
        String key=getSessionKey(clientCode,businessCode);
        String sessionId = session.getId();
        if(!verifyParam(clientCode, businessCode)){//clientCode 和businessId验证
            log.warn("接收到客户端非法连接请求,客户端key:{},sessionId:{},忽略",key, sessionId);
            session.close();
            return;
        }

        //这里同样的key会存在并发
        RLock lock = redissonClient.getLock(key);
        boolean locked = lock.tryLock(2, 2, TimeUnit.SECONDS);
        if(!locked){
            log.warn("接收到客户端连接获取锁失败,客户端key:{},sessionId:{}",key, sessionId);
        }
        try{
            Set<Session> sessions = serverSessionMap.get(key);
            if(sessions==null){
                sessions= new CopyOnWriteArraySet<>();
                serverSessionMap.put(key,sessions);
            }
            try{
                if(session.isOpen()){
                    session.getAsyncRemote().sendText("connectionTest:"+ sessionId);
                }else{
                    log.info("接收到客户端连接,session为关闭状态,关闭,客户端:{},sessionId:{}",key, sessionId);
                    session.close();
                    return;
                }
            }catch (Exception e){
                log.warn("发送测试消息报错:客户端key:{},sessionId:{}",key, sessionId, e);
                return;
            }


            //向所有该客户端连接发送测试消息避免有问题连接在里面
            List<Session> removeSessions=new ArrayList<>();
            for(Session son: sessions){
                try{
                    if(son.isOpen()){
                        son.getAsyncRemote().sendText("connectionTest:"+son.getId());
                    }else{
                        son.close();
                        removeSessions.add(son);
                    }
                }catch (Exception e){
                    removeSessions.add(son);
                    log.warn("发送测试消息报错:客户端key:{},sessionId:{}",key,son.getId(), e);
                }
            }
            for(Session removeSession: removeSessions){
                sessions.remove(removeSession);
                sessionInfoMap.remove(removeSession.getId());
            }
            sessions.add(session);
            sessionInfoMap.put(sessionId,"无");
            //redis设置
            stringRedisTemplate.opsForValue().set("network-adapter-server:"+serverCode+":"+ sessionId, sessionId,8,TimeUnit.SECONDS);
            log.info("接收到客户端连接,连接成功,客户端:{},sessionId:{},客户端数量{}",key, sessionId,sessions.size());

        }catch (Exception e){
            log.error("客户端key:{}保存session出错",key, e);
        }finally {
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }

    }
    //
    private String getSessionKey(String clientCode,String businessCode){
        return clientCode+"_"+businessCode;
    }

    /**
     * 验证clientCode和businessCode 是否合法
     * true 合法;false 非法
     */
    private boolean verifyParam(String clientCode,String businessCode){
	return  true;
    }

    /**
     * 初始化加载
     */
    private void init() {
        myServerProperties = SpringContextBeanService.getBean(MyServerProperties.class);
        resultMessageService =SpringContextBeanService.getBean(ResultMessageService.class);
        stringRedisTemplate =SpringContextBeanService.getBean(StringRedisTemplate.class);
        redissonClient =SpringContextBeanService.getBean(RedissonClient.class);
    }

    /**
     * 关闭连接时调用
     */
    @OnClose
    public void onClose(Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) {
        //此时session.isOpen() 为false
        String key=getSessionKey(clientCode,businessCode);
        log.warn("onClose被触发,客户端key:{},sessionId:{}",key, session.getId());
        Set<Session> sessions = serverSessionMap.get(key);
        sessions.remove(session);
        sessionInfoMap.remove(session.getId());
    }

    /**
     * 异常
     * 客户端服务关闭会调用到这里
     */
    @OnError
    public void onError(Session session, Throwable error,
                        @PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws IOException {
        //此时session.isOpen() 为true,触发onError后onClose也被触发,在onClose中去掉session即可
        String key = getSessionKey(clientCode, businessCode);
        log.error("onError被触发:客户端key:{},错误信息:{}", key,error.getMessage());
        //如果客户端端口也会调用到这里
//        error.printStackTrace();
    }

    /**
     * 接收到客户端字符串消息
     * maxMessageSize 单位byte 10000000 约10M
     *
     */
    @OnMessage(maxMessageSize = 5000000)
    public void onMessage(String message, Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws Exception {

        if( message.startsWith("connectionTest:")){
            log.info("接收到测试消息,{}:{}:{}",getSessionKey(clientCode,businessCode),session.getId(),message);
            String[] backs = message.split("back:");
            String clientSessionId = backs[backs.length - 1];
            sessionInfoMap.put(session.getId(), clientSessionId);
            //redis设置
            stringRedisTemplate.opsForValue().set("network-adapter-server:"+serverCode+":"+ session.getId(), clientSessionId,8,TimeUnit.SECONDS);
            return;
        }else{
            log.info("接收到消息,客户端key:{},sessionId:{}",getSessionKey(clientCode,businessCode),session.getId());
        }
        resultMessageService.messageSave(message,session,clientCode,businessCode);

    }

    // 定时任务去定时发送ping,收到pong后更新失效时间,如果服务端的sessionId已经失效就移除该session
    @OnMessage
    public void onPong(PongMessage message, Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws IOException {
        log.debug("{}:{}:接收到pong消息",getSessionKey(clientCode,businessCode),session.getId());
        stringRedisTemplate.expire("network-adapter-server:"+serverCode+":"+session.getId(),8, TimeUnit.SECONDS);
    }

    /**
     * 接收到客户端字节数组消息
     * maxMessageSize 单位byte 10000000 约10M
     *
     */
//    @OnMessage(maxMessageSize = 5000000)
//    public void onMessage( byte[] message, Session session,@PathParam(value = "clientCode") String clientCode,@PathParam(value = "businessCode") String businessCode) throws Exception {
//        log.info("消息内容:{}",message);
//    }


    /**
     * 遍历群发消息
     *
     * @param text
     */
//    public void send(String text) {
//            for (ConcurrentHashMap.Entry<String, Session> entry : serverSessionMap.entrySet()) {
//                send(entry.getKey(), text);
//            }
//    }

    /**
     * 根据clientId发送消息
     *
     * @param key
     * @param text
     */
    public void send(String key,String text) {
        try {
            log.info("发送消息,客户端key:{}",key);
            Set<Session> sessions = serverSessionMap.get(key);
            if(CollectionUtil.isNotEmpty(sessions)){
                log.info("{}连接数量:{}",key,sessions.size());
                for(Session session: sessions){
                    if(session!=null && session.isOpen()){
                        //对text进行加密
                        String encryptMsg = MyAESUtil.encrypt(text, myServerProperties.getAesKey());
                        session.getAsyncRemote().sendText(encryptMsg);
                        log.info("发送消息成功key:{},sessionId:{}",key,session.getId());
                        return;
                    }
                }
            }

            //没有发送出去放到redis做广播
            Map<String,String> map = new HashMap<String, String>();
            map.put("msgKey", key);
            map.put("msgValue", text);
            log.info("发布广播消息key:{}",key);
            stringRedisTemplate.convertAndSend("networkAdapter", JSON.toJSONString(map));

        } catch (Exception e) {
           log.error("系统异常",e);
        }
    }

    public void sendMessageFromRedis(String key,String text){
        try {
            Set<Session> sessions = serverSessionMap.get(key);
            if(CollectionUtil.isEmpty(sessions)){
                log.warn("接收到广播消息,session不存在:客户端key:{},忽略消息",key);
                return;
            }
            log.info("接收到广播消息,准备发送消息到客户端,客户端key:{}连接数量:{}",key,sessions.size());
            for(Session session: sessions){
                if(session!=null && session.isOpen()){
                    MessageBO messageBO = JSONObject.parseObject(text, MessageBO.class);
                    String messageNo = messageBO.getMessageNo();
                    Long increment = stringRedisTemplate.opsForValue().increment(messageNo);
                    stringRedisTemplate.expire(messageNo,1, TimeUnit.MINUTES);
                    if(increment!=null && increment>1L){//已经发送过了
                        log.info("消息已发送过,这里不再重复发送,客户端key:{},sessionId:{}",key,session.getId());
                        return;
                    }
                    //对text进行加密
                    String encryptMsg = MyAESUtil.encrypt(text, myServerProperties.getAesKey());
                    session.getAsyncRemote().sendText(encryptMsg);
                    log.info("发送消息成功,客户端key:{},sessionId:{}",key,session.getId());
                    return;
                }
                log.error("没有可用的连接,客户端key:{}",key);
            }

        } catch (Exception e) {
            log.error("系统异常",e);
        }
    }
}

服务端心跳

避免客户端断网,服务端没有被触发事件,可以用心跳来保障服务端维护的session都是可用的,服务端定时去给客户端发送ping消息,客户端收到后会发送pong消息,服务端如果在指定的时间内没有收到pong消息就认为与客户端的这个session不可用。

package com.ctfo.adapter.service.task;

import cn.hutool.core.collection.CollectionUtil;
import com.ctfo.adapter.service.websocket.WebSocketServer;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.log4j.Log4j2;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.websocket.Session;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 定时任务
 */
@Log4j2
@Component
public class ServerTask {

    @Resource
    private WebSocketServer webSocketServer;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     *
     *
     */
    @XxlJob(value = "pingTask")
    public ReturnT<String> pingTask(String param) {
        XxlJobLogger.log("pingTask begin,param={}", param == null ? "null" : param);
        ConcurrentHashMap<String, Set<Session>> serverSessionMap = webSocketServer.getServerSessionMap();
        ConcurrentHashMap<String, String> sessionInfoMap = webSocketServer.getSessionInfoMap();
        //遍历执行ping命令
        Collection<Set<Session>> values = serverSessionMap.values();
        ByteBuffer payload = ByteBuffer.wrap("Ping".getBytes());
        //key:客户端名称,value:会话session列表
        for(Map.Entry<String, Set<Session>> entry:  serverSessionMap.entrySet()){
            List<Session> removeSessions=new ArrayList<>();
            Set<Session> sessions = entry.getValue();
            String clientName = entry.getKey();
            for(Session session:sessions){
                String sessionId = session.getId();
                try{
                    //redis获取看是否还在,不在的话从里面剔除掉
                    if(stringRedisTemplate.opsForValue().get("network-adapter-server:"+WebSocketServer.getServerCode()+":"+sessionId)==null){
                        removeSessions.add(session);
                        continue;
                    }
                    session.getAsyncRemote().sendPing(payload);
                }catch (Exception e){
                    log.error("执行ping命令报错,{}:{}",clientName,sessionId);
                    removeSessions.add(session);
                }
            }
            if(CollectionUtil.isNotEmpty(removeSessions)){
                removeSessions.forEach(session -> {
                    String sessionId = session.getId();
                    try{
                        session.close();
                    }catch (Exception ex){
                        log.error("执行ping命令后close失败,{}:{}:{}",clientName,sessionId,ex.getMessage());
                    }
                    sessions.remove(session);
                    sessionInfoMap.remove(sessionId);
                    log.info("session长时间未ping通,移除客户端:{}:{}",clientName, sessionId);
                });

            }
        }
        return ReturnT.SUCCESS;
    }
}

服务端测试类

对应的controller和service接口这里忽略展示,作用主要是:可以调用接口了解服务端有多少个客户端进行连接,客户端session的对应关系

public List<SessionInfoVO> getSessionInfo() {
        ConcurrentHashMap<String, Set<Session>> serverSessionMap = webSocketServer.getServerSessionMap();
        ConcurrentHashMap<String, String> sessionInfoMap = webSocketServer.getSessionInfoMap();
        List<SessionInfoVO> resultList=new ArrayList<>();
        for(Map.Entry<String, Set<Session>> entry: serverSessionMap.entrySet()){
            SessionInfoVO sessionInfoVO=new SessionInfoVO();
            sessionInfoVO.setClientName(entry.getKey());
            List<SessionStatusVO> statusVOS=new ArrayList<>();
            for(Session session: entry.getValue()){
                SessionStatusVO statusVO=new SessionStatusVO();
                statusVO.setClientSessionId(session.getId());
                statusVO.setOpenStatus(session.isOpen());
                statusVO.setServerSessionId(sessionInfoMap.get(session.getId()));
                statusVOS.add(statusVO);
            }
            sessionInfoVO.setSessionStatusVOList(statusVOS);
            resultList.add(sessionInfoVO);
        }
        return resultList;
    }
@Data
public class SessionInfoVO {
    /**
     * 客户端名称
     */
    private String clientName;
    private List<SessionStatusVO> sessionStatusVOList;
}
@Data
public class SessionStatusVO {
    /**
     * 客户端sessionId
     */
    private String clientSessionId;
    /**
     * 服务端sessionId
     */
    private String serverSessionId;
    /**
     * 会话是否打开
     */
    private Boolean openStatus;
}

客户端编写

package com.xgss.adapter.service.websocket;

import com.xgss.adapter.common.constants.ClientConstants;
import com.xgss.adapter.common.properties.MyServerProperties;
import com.xgss.adapter.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Component
@ClientEndpoint
public class PayOpenidClient {
    private static PayOpenidClient client;

    @Autowired
    private MyServerProperties myServerProperties;
    @Autowired
    private MessageService messageService;
    private Session session;
    /** 尝试重连标识 */
    private AtomicBoolean tryReconnect;//是否正在重连

    private final String code= ClientConstants.PAY_CODE;
    private final String business= ClientConstants.PAY_BUSINESS_OPENID;
    private final String  clientName=code+":"+business;
    private String serverSessionId;

    public Session getSession(){
        return this.session;
    }
    public String getClientName(){
        return this.clientName;
    }
    public String getServerSessionId(){
        return this.serverSessionId;
    }

    @PostConstruct
    void init() {
//        log.info("{},{}初始化:{}",code,business,this.toString());
       client = this;
        this.tryReconnect = new AtomicBoolean(false);
        hearBeat();
    }
    @PreDestroy
    public void destroy(){
        try{
            if(client.session !=null){
                log.info("销毁session:{},sessionId:{}",clientName,client.session.getId());
                client.session.close();
            }
        }catch (Exception e){
            log.error("session关闭失败");
        }
    }
    private void hearBeat(){
        Timer timer=new Timer(true);
        log.info(clientName+":心跳定时器启动");
        timer.scheduleAtFixedRate(new TimerTask(){
            public void run() {
                log.debug(clientName+":心跳触发");
                ping();
            }
        },5000,2000);//5秒后执行,每2秒钟执行一次
    }

    public void ping() {
        try {
            if(session == null){
                log.warn(clientName+":session不存在,触发重新连接");
                if(!tryReconnect.get()){
            toConnect();
        }
            }else if(!session.isOpen()){
                log.warn(clientName+":{}"+ ":session关闭了,触发重新连接",session.getId());
                session = null;
                if(!tryReconnect.get()){
            toConnect();
        }
            }
            if(session != null && session.isOpen()){
                String data = "Ping";
                ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                log.debug(clientName+":{}:发送Ping",client.session.getId());
                session.getAsyncRemote().sendPing(payload);
            }
//            Exception in thread "Timer-0" java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session
        } catch (Exception ex) {
            log.error(clientName+":Ping报错了",ex);
            ex.printStackTrace();
        }
    }
    private String getServerUrl(){
        return client.myServerProperties.getWebSocketServerUrl()+"/"+ code+"/"+business;
    }

    private void getSession(URI uri) throws IOException, DeploymentException {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        session = container.connectToServer(client, uri);
    }


    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        log.info(clientName+":"+session.getId()+":已连接");
    }

    /**
     * 异常处理
     */
    @OnError
    public void onError(Throwable throwable) {
        log.error(clientName+":发生异常了");
        throwable.printStackTrace();
    }

    @OnClose
    public void onClose(CloseReason reason) throws IOException{
        if(session !=null){
            client.session.close();
            log.info(clientName+":"+client.session.getId()+":关闭,原因:"+reason.toString());
        }else{
            log.info(clientName+":关闭,原因:"+reason.toString());
        }
        session = null;
        if(!tryReconnect.get()){
            toConnect();
        }
    }

    /**
     * 连接服务端
     */
    private void toConnect(){
        tryReconnect.set(true);
        //这里进行循环连接服务端
        String webSocketServerUrl = getServerUrl();
        synchronized (PayOpenidClient.class) {
        if (session ==null ){
            log.debug(clientName+":尝试连接webSocket服务端");
            try {
                // 本机地址
//            String hostAddress = InetAddress.getLocalHost().getHostAddress();
                URI uri = URI.create(webSocketServerUrl);
                getSession(uri);
            } catch (Exception e) {
                //如果服务端没有启动,这里会报错
                log.warn(clientName+":连接webSocket服务端失败");
            }
//            if(session==null){
//                log.warn(clientName+":连接webSocket服务端失败");
//            }else{
//                log.info(clientName+session.getId()+"已连接");
//            }
        }}
        tryReconnect.set(false);
    }

    @OnMessage
    public void onMessage(String msg, Session session) throws Exception {
        if( msg.startsWith("connectionTest:")){
            log.info( clientName+":"+session.getId() + ":接收到测试消息:" + msg);
            String[] split = msg.split("connectionTest:");
            this.serverSessionId=split[1];
            session.getAsyncRemote().sendText(msg+":back:"+session.getId());
            return;
        }else{
            log.info( clientName+":"+session.getId() + ":接收到的消息");
        }
        client.messageService.pay_openid(msg,session,clientName);
    }
    @OnMessage
    public void onMessage(PongMessage message, Session session) throws IOException {
        log.debug(clientName+":{}:接收到Pong消息",session.getId());
    }
}

@OnMessage

属性maxMessageSize 单位byte

@OnMessage

public void handleTextMessage(String textMessage) {

...

}

 

//接收二进制消息

@OnMessage

public void handleBinaryMessage(byte[] messageData) {

...

}

//接收Pong消息,需要声明一个类型为 javax.websocket.PongMessage 的方法参数

@OnMessage

public String handlePongMessage(PongMessage pongMessage) {

return "I got " + pongMessage.getApplicationData().length + " bytes of data.";

}

发送ping消息

ByteBuffer payload = ByteBuffer.wrap("Ping".getBytes());
session.getAsyncRemote().sendPing(payload);

发送ping命令后对方不需要做处理,如果对方可以会自动收到pong消息,如下事件会被触发

@OnMessage

public String handlePongMessage(PongMessage pongMessage) {

 

注入实例为null

参考链接:https://www.csdn.net/tags/OtTaQg3sODMwNzQtYmxvZwO0O0OO0O0O.html

 

问题原因:

Spring默认对bean的管理都是单例(singleton),和 websocket (多对象)相冲突。

项目启动时初始化,会初始化 websocket (非用户连接的),spring 同时会为其注入 service,该对象的 service 不是 null,被成功注入。但是,由于 spring 默认管理的是单例,所以只会注入一次 service。当用户建立新的连接时,系统又会创建一个新的 websocket 对象,这时不会再次注入了,所以导致只要是用户连接创建的 websocket 对象,都不能再注入了,后面的注入均为null。

 

方法1:使用静态变量 加 @PostConstruct 解决

@ServerEndpoint("/websocket/{userId}")
@Component  //关键点1
public class WebSocket {
    @Autowired
    private UserService userService;

    private static WebSocket webSocket; //关键点2

    @PostConstruct  //关键点3
    public void init(){
        webSocket = this;
    }
    //略去其他代码...
    @OnMessage
    public void onMessage(String message, Session session) {
        if(!StringUtils.isEmpty(message)){
            try {
                log.info("收到用户【{}】的消息: {}",this.userId,message);
                webSocket.userService.helloService();   //关键点4
                session.getBasicRemote().sendText("收到 "+this.userId+" 的消息 "); 
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

方法2:使用静态变量,加set注入

@ServerEndpoint("/websocket/{userId}")
@Component  //关键点1
public class WebSocket {

    private static UserService userService; //关键点2

    @Autowired  //关键点3
    public void setUserService (UserService userService){
        WebSocket.userService = userService;
    }
    //略去其他代码...
    @OnMessage
    public void onMessage(String message, Session session) {
        if(!StringUtils.isEmpty(message)){
            try {
                log.info("收到用户【{}】的消息: {}",this.userId,message);
                WebSocket.userService.helloService();   //关键点4
                session.getBasicRemote().sendText("收到 "+this.userId+" 的消息 "); 
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

方法3:使用工具类用于从Spring的上下文中去获取到类的实例

@ServerEndpoint("/websocket/{userId}")
@Component  //关键点1
public class WebSocket {

    private static UserService userService =  SpringContextHolder.getBean(UserService.class); //关键点2
    
    //略去其他代码...
    @OnMessage
    public void onMessage(String message, Session session) {
        if(!StringUtils.isEmpty(message)){
            try {
                log.info("收到用户【{}】的消息: {}",this.userId,message);
                userService.helloService();   //关键点3
                session.getBasicRemote().sendText("收到 "+this.userId+" 的消息 "); 
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

工具类SpringContextHolder

此工具类用于从Spring的上下文中去获取到类的实例

@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {

    private static ApplicationContext applicationContext = null;


    /**
     * 取得存储在静态变量中的ApplicationContext.
     */
    public static ApplicationContext getApplicationContext() {
        assertContextInjected();
        return applicationContext;
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
        assertContextInjected();
        return (T) applicationContext.getBean(name);
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    public static <T> T getBean(Class<T> requiredType) {
        assertContextInjected();
        return applicationContext.getBean(requiredType);
    }

    /**
     * 清除SpringContextHolder中的ApplicationContext为Null.
     */
    public static void clearHolder() {
        applicationContext = null;
    }

    /**
     * 实现ApplicationContextAware接口, 注入Context到静态变量中.
     */
    @Override
    public void setApplicationContext(ApplicationContext appContext) {
        applicationContext = appContext;
    }

    /**
     * 实现DisposableBean接口, 在Context关闭时清理静态变量.
     */
    @Override
    public void destroy() throws Exception {
        SpringContextHolder.clearHolder();
    }

    /**
     * 检查ApplicationContext不为空.
     */
    private static void assertContextInjected() {
        Validate.validState(applicationContext != null, "applicaitonContext属性未注入, 请在applicationContext.xml中定义SpringContextHolder.");
    }
}

一个连接多条消息

上一条消息在onMessage中还没有处理完,是接收不到后面这个客户端发的消息的

同一个连接的消息是串行处理的

 

多个连接的消息

多个连接可以同时收到消息,并行处理

Session的方法

javax.websocket.Session的方法

getBasicRemote和getAsyncRemote

getAsyncRemote是异步发送消息,getBasicRemote是同步发送消息。也就是说getBasicRemote()要等上一条消息发送完才能发送下一条消息

 

session.setMaxTextMessageBufferSize(10*1024*1024);//设置最大字符串消息大小,超过将断开连接,单位为byte,默认是8192约8kb

session.setMaxBinaryMessageBufferSize(10*1024*1024);//设置最大二进制消息大小,超过将断开连接,8192约8kb

 

session.setMaxIdleTimeout(-1);最大空闲时间,超过服务端会断开连接,设置0或负数代表无限,默认是0

 

 

消息大小限制设置:

对于websoket服务:

@OnMessage的属性maxMessageSize(单位byte) 可以设置接收的消息的最大大小

或者在@OnOpen标注的方法内对Session进行设置,(单位为byte)

session.setMaxTextMessageBufferSize(10*1024*1024);

session.setMaxBinaryMessageBufferSize(10*1024*1024);

一个是设置字符串消息的最大byte,等同于session.setMaxTextMessageBufferSize

一个是设置二进制消息最大byte,等同于session.setMaxBinaryMessageBufferSize

 

 

SpringCloudGateway中默认定义的消息大小为65536 : 单位是Byte

   NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE = 64 * 1024;

所以如果发送的消息超过了64KB ,就会导致消息过大,然后WebSocket就会断开连接。

 

解决方法:

 

在 gateway 项目的配置文件中加入下面这句话并设置大小。

官网地址

https://docs.spring.io/spring-cloud-gateway/docs/2.2.7.RELEASE/reference/html/appendix.html

连接的数量

各个浏览器支持的最大websocket连接数不同

不同的服务器支持的websocket连接数也不同

springboot项目默认的启动容器是tomcat,而tomcat默认支持1W的连接数量。

linux也会有限制,Linux操作系统中,一切都是文件。所以每个TCP连接,都会打开一个文件。为此Linux操作系统限制了每个用户能打开的文件数量,通过ulimit -n 查看。如:

TCP连接数量还受到端口数量限制,由于端口号只有1-65535,所以最大TCP连接数也只有65535个(包括系统端口1-1024)

Linux操作系统对所有用户最大能打开文件的限制:cat /proc/sys/fs/file-max。如:

网络核心模块对tcp连接的限制(最大不能超过65535):cat /etc/sysctl.conf

防火墙对tcp连接的限制

综上,在Linux操作系统中,首先对TCP连接数量的限制依次有:端口数量限制,网络核心限制,最大文件数量限制(因为每建立一个连接就要打开一个文件),防火墙限制,用户打开文件限制。

长连接最大空闲时间设置

如果长连接超过超时时间没有进行交互,服务端就会关闭session;

session.setMaxIdleTimeout 可以设置最大空闲时间,单位为毫秒,0或负数代表无限,关闭session后服务端和客户端都能触发@OnClose

websocket的长连接会因为各种中间环节断开,而且很多时候Onclose没有被触发的(比如断网),所以要想一直保持连接,可以让客户端定期发送一条心跳包,用于维持Session

心跳机制

websocket规范定义了心跳机制,一方可以通过发送ping(协议头opcode 0x9)消息给另一方,另一方收到ping后应该按照协议要求,尽可能快的返回pong(协议头opcode 0xA);

 

浏览器:

各个浏览器并没有为websocket的发送/接收ping/pong消息提供JavaScript API,但各个浏览器都有各自的缺省检测方案和处理方法,如chrome一旦收到ping包,会马上自动答复一个相同的包作为pong,再如某些浏览器在一段时间没有发送数据传输时,会自动发送一个ping包到服务器检测是否还在线/保持连接。

如在客户端添加心跳:

package com.ctfo.adapter.service.websocket;

import com.ctfo.adapter.common.properties.MyServerProperties;
import com.ctfo.adapter.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;

@Slf4j
@Component
@ClientEndpoint
public class InvoiceRedClient {
    private static InvoiceRedClient invoiceRedClient;

    @Autowired
    private MyServerProperties myServerProperties;
    @Autowired
    private MessageService messageService;
    private Session session;

    private final String  clientName="客户端";

    @PostConstruct
    void init() {
        invoiceRedClient = this;
        hearBeat();
    }
    private void hearBeat(){
        Timer timer=new Timer(true);
        log.info(clientName+"心跳定时器启动");
        timer.schedule(new TimerTask(){
            public void run() {
                log.debug(clientName+"心跳触发");
                ping();
            }
        },5000,5000);//5秒后执行,每5秒钟执行一次
    }

    public void ping() {
        try {
            if(session == null){
                log.warn(clientName+"session不存在,触发重新连接");
                toConnect();
            }else if(!session.isOpen()){
                log.warn(clientName+"session关闭了,触发重新连接");
                session = null;
                toConnect();
            }
            if(session != null && session.isOpen()){
                String data = "Ping";
                ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                log.debug(clientName+"发送Ping");
                session.getBasicRemote().sendPing(payload);
            }
//            Exception in thread "Timer-0" java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session
        } catch (Exception ex) {
            log.error(clientName+"Ping报错了",ex);
            ex.printStackTrace();
        }
    }
    private String getServerUrl(){
        String clientCode = invoiceRedClient.myServerProperties.getInvoiceClient().getClientCode();
        String businessCode = invoiceRedClient.myServerProperties.getInvoiceClient().getRedInvoice();
        return invoiceRedClient.myServerProperties.getWebSocketServerUrl()+"/"+clientCode+"/"+businessCode;
    }

    private void getSession(URI uri) throws IOException, DeploymentException {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        session = container.connectToServer(InvoiceRedClient.class, uri);
    }


    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        log.info(clientName+session.getId()+"连接");
    }

    /**
     * 异常处理
     * @param throwable
     */
    @OnError
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @OnClose
    public void onClose() throws IOException{
        //当服务端关闭了连接,或者服务停掉,这里会被调用
        session.close();
        log.info(clientName+session.getId()+"关闭");
        session = null;
        toConnect();
    }

    /**
     * 连接服务端
     */
    private void toConnect(){
        //这里进行循环连接服务端
        String webSocketServerUrl = getServerUrl();

        if (session ==null ){
            log.info(clientName+"尝试连接webSocket服务端:{}",webSocketServerUrl);
            try {
                URI uri = URI.create(webSocketServerUrl);
                getSession(uri);
            } catch (Exception e) {
                log.warn(clientName+"连接webSocket服务端{}失败",webSocketServerUrl);
            }
            if(session ==null){
                log.warn(clientName+"连接webSocket服务端{}失败",webSocketServerUrl);
            }else{
                log.info(clientName+session.getId()+"已连接");
            }
        }
    }
    @OnMessage
    public void onMessage(String msg, Session session) throws Exception {
        log.info( clientName+session.getId() + "接收到的消息:" + msg);
        invoiceRedClient.messageService.invoice_red(msg,session,clientName);
    }
    @OnMessage
    public void onMessage(PongMessage message, Session session) throws IOException {
        log.debug(clientName+"接收到Pong消息");
    }
}

服务端触发OnClose,客户端没有触发

服务端已经触发OnClose了, 客户端的session发送心跳还能够收到Pong消息

 

IOException:Broken pipe

客户端读取超时关闭了连接,这时服务器往客户端再写数据就发生了broken pipe异常

 

引起这个的两方面

1对客户端来说,检查客户端连接超时时间;

2对服务端 nginx配置的几个超时时间

Connect reset by peer

如果一端的Socket被关闭(或主动关闭,或因为异常退出而引起的关闭),另一端仍发送数据,发送的第一个数据包引发该异常

一端退出,但退出时并未关闭该连接,另一端如果在从连接中读数据则抛出该异常(Connection reset)

 

错误码总结

1000 正常关闭 当你的会话成功完成时发送这个代码

1001 离开 因应用程序离开且不期望后续的连接尝试而关闭连接时,发送这一代码。服务器可能关闭,或者客户端应用程序可能关闭

1002 协议错误 当因协议错误而关闭连接时发送这一代码

1003 不可接受的数据类型 当应用程序接收到一条无法处理的意外类型消息时发送这一代码

1004 保留 不要发送这一代码。根据 RFC 6455,这个状态码保留,可能在未来定义

1005 保留 不要发送这一代码。WebSocket API 用这个代码表示没有接收到任何代码

1006 保留 不要发送这一代码。WebSocket API 用这个代码表示连接异常关闭

1007 无效数据 在接收一个格式与消息类型不匹配的消息之后发送这一代码。如果文本消息包含错误格式的 UTF-8 数据,连接应该用这个代码关闭

1008 消息违反政策 当应用程序由于其他代码所不包含的原因终止连接,或者不希望泄露消息无法处理的原因时,发送这一代码

1009 消息过大 当接收的消息太大,应用程序无法处理时发送这一代码(记住,帧的载荷长度最多为64 字节。即使你有一个大服务器,有些消息也仍然太大。)

1010 需要扩展 当应用程序需要一个或者多个服务器无法协商的特殊扩展时,从客户端(浏览器)发送这一代码

1011 意外情况 当应用程序由于不可预见的原因,无法继续处理连接时,发送这一代码

1015 TLS失败(保留) 不要发送这个代码。WebSocket API 用这个代码表示 TLS 在 WebSocket 握手之前失败。

0 ~ 999 禁止 1000 以下的代码是无效的,不能用于任何目的

1000 ~ 2999 保留 这些代码保留以用于扩展和 WebSocket 协议的修订版本。按照标准规定使用这些代码,参见表 3-4

3000 ~ 3999 需要注册 这些代码用于“程序库、框架和应用程序”。这些代码应该在 IANA(互联网编号分配机构)公开注册

4000 ~ 4999 私有 在应用程序中将这些代码用于自定义用途。因为它们没有注册,所以不要期望它们能被其他 WebSocket广泛理解

session的共享

比如两个服务端做负载均衡,一个客户端建立websocket连接后,两个服务端都可以与它进行通信

websocket的Session没有实现序列化接口,不能进行序列化存储,无法通过redis来进行实现session共享

可以使用redis的发布订阅(也可以使用其他mq的方式来实现)来实现负载均衡,当请求达到服务器1上是,如果它没有与客户端建立连接,就进行发布消息,其他服务端介绍到消息看是否与客户端建立了连接,如果有建立连接就发送给客户端

@Autowired
private StringRedisTemplate stringRedisTemplate;
stringRedisTemplate.convertAndSend("myChannel", "myMsg");
public void onMessage(Message message, byte[] pattern){
        String channel = new String(message.getChannel());// 订阅的频道名称
        String msg = "";
        String key="myClient";
        try {
            msg = new String(message.getBody(), Charsets.UTF_8);//注意与发布消息编码一致,否则会乱码
            if (!StringUtils.isEmpty(msg)) {
                if ("myChannel".equals(channel))// 最新消息
                {
                    Set<Session> sessions = serverSessionMap.get(key);
                    if (CollectionUtil.isEmpty(sessions)) {
                        log.warn("接收到广播消息,session不存在:客户端key:{},忽略消息", key);
                        return;
                    }
                    log.info("接收到广播消息,准备发送消息到客户端,客户端key:{}连接数量:{}", key, sessions.size());
                    for (Session session : sessions) {
                        if (session != null && session.isOpen()) {
                            Long increment = stringRedisTemplate.opsForValue().increment("messageNo");
                            stringRedisTemplate.expire("messageNo", 1, TimeUnit.MINUTES);
                            if (increment != null && increment > 1L) {//已经发送过了
                                log.info("消息已发送过,这里不再重复发送,客户端key:{},sessionId:{}", key, session.getId());
                                return;
                            }
                            session.getAsyncRemote().sendText(msg, new RetrySendHandler(msg, session));
                            log.info("发送消息成功,客户端key:{},sessionId:{}", key, session.getId());
                            return;
                        }

                    }
                } else {
                    log.info("消息内容为空,不处理。");
                }
            }
        }catch (Exception e)
            {
                log.error("处理消息异常",e);
            }
        }

 

本站部分内容转载自互联网,如果有网站内容侵犯了您的权益,可直接联系我们删除,感谢支持!