本文介绍: 在websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储redis这些中间存储里面,因此这里我们只能把session存储本地内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享

websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储redis这些中间存储里面,因此这里我们只能把session存储在本地内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享。在websocket中,其实是无法做到session共享的,目前通用的解决方案都是通过消息中间件实现消息发布订阅,也就是一个服务端实例都订阅某个消息队列topic,根据对应sessionid判断是否在本地存储,如果在本地通过sessionid找到了session,则给客户端发送消息,如果在本地找不到对应的session,那么就直接把这条消息丢弃掉。具体的如下图所示

 这里的图来自于网上网上大多都是基于redis发布与订阅,在真实的环境中,我们一般用kafka或者rocketmq等。根据上面的图示,我们介绍下整个流程

1、我们同时有A,B,C,D四个websocket服务端,同时订阅消息队列的topic: test8
2、我们发送一条消息a1到消息队列的topictest8
3、此时A,B,C,D四个websocket服务端都会收到这条消息a1
4、A根据a1的消息体,获取对应sessionid然后在本地的map查找是否对应的session,如果没有直接放弃掉此条消息。
5、B根据a1的消息体,获取对应sessionid然后在本地的map查找是否对应的session,如果没有直接放弃掉此条消息。
6、C根据a1的消息体,获取到对应sessionid然后在本地的map查找是否有对应的session,如果没有直接放弃掉此条消息。
7、D根据a1的消息体,获取到对应的sessionid然后在本地的map查找是否有对应的session,结果找到有对应的session,此时我们就把这条消息发送给=这个session。
8、客户端收到了对应的消息。

一、创建一个公共map用来存放session

package com.websocket.utils;

import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.Session;

import org.springframework.stereotype.Component;

@Component
public class OnlineSessionCache {

	private ConcurrentHashMap<Integer, Session> onlines = new ConcurrentHashMap<Integer, Session>();

	public void setUserSession(Integer userId, Session session) {
		onlines.put(userId, session);
	}

	public Session getUserSession(Integer userId) {
		return onlines.get(userId);
	}

	public void removeUserSession(Integer userId) {
		onlines.remove(userId);
	}
	
	public ConcurrentHashMap<Integer, Session> getAllSession() {
		return this.onlines;
	}

}

二、在websocket连接关闭时候,把session关闭

@OnOpen
	public void onOpen(Session session,EndpointConfig config) {

		this.session = session;
		log.info("当前session id : {}  登录进来了", session.getId());
		OnlineCalUtils.addOnlineCount();
		onlineSessionCache.setUserSession(Integer.valueOf(session.getId()), session);
		log.info("存储session了多少个session:{}", onlineSessionCache.getAllSession().size());
		log.info("有新连接加入当前在线人数为 :{} ", getOnlineCount());
	}
@OnClose
	public void onClose() {
		OnlineCalUtils.subOnlineCount();
		log.info("有一连接关闭当前在线人数为: {}", getOnlineCount());
		onlineSessionCache.removeUserSession(Integer.valueOf(this.session.getId()));
		log.info("当前session id : {}  退出去了");
	}

三、编写一个接口用来指定用户发送消息

package com.websocket.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.websocket.model.ChatModel;
import com.websocket.producer.RocketProducer;
import com.websocket.utils.ChatModelUtils;

import lombok.extern.slf4j.Slf4j;

@RestController
@Slf4j
public class ChatMsgController {

	@Autowired
	private RocketProducer rocketProducer;
	
	@RequestMapping("/sendToSimpleUser")
	public String sendToSimpleUser(Integer fromUserId,Integer toUserId) {
		
		ChatModel model = ChatModelUtils.createNewChatModel(fromUserId, toUserId, "手动发送消息");
		rocketProducer.sendDirectMessage(model);
		
		return "成功";
	}
	
	
}

这里我们是把消息直接发送给rocketmq里面发送者代码如下

package com.websocket.producer;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;

@Component
public class RocketProducer {

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	public void sendDirectMessage(ChatModel message) {
		String msg = JSON.toJSONString(message);
        rocketMQTemplate.syncSend("test8", msg);
	}
	
}

四、编写消费者,获取mq的消息,并且发送消息给对应的session

package com.websocket.producer;

import javax.websocket.Session;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;
import com.websocket.product.SocketServerProduct;
import com.websocket.utils.OnlineSessionCache;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@RocketMQMessageListener(topic = "test8", consumerGroup = "${chat.group.groupname}")
public class RocketConsumer implements RocketMQListener<String>{

	@Autowired
	private OnlineSessionCache onlineSessionCache;

	@Autowired
	private SocketServerProduct socketServerProduct;
	
	@Value("${chat.group.groupname}")
	private String groupName;
	
	@Override
	public void onMessage(String message) {
		log.info("监听到的topic是:{}  groupname是:{}","test8",groupName);
		ChatModel model = JSON.parseObject(message, ChatModel.class);
		Integer userId = model.getToUserId();
		Session session = onlineSessionCache.getUserSession(userId);
		if (null != session) {
			log.info("找到了对应的session,准备回复消息");
			socketServerProduct.sendMessage(session, model.getMessage());
		}else {
			log.info("没有找到对应的session,准备丢弃");
		}
	}
}

以上就是一个完整关于websocket服务端集群关于session共享的解决方案

WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)_websocket心跳机制-CSDN博客

原文地址:https://blog.csdn.net/askuld/article/details/134775586

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_42904.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注