300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 解决分布式下Websocket共享问题

解决分布式下Websocket共享问题

时间:2023-07-06 09:04:31

相关推荐

解决分布式下Websocket共享问题

解决分布式下Websocket共享问题

解决方案有2种,一个是redis,一个是mq。其中redis没仔细研究,就直接用了mq。项目中用F5代理了2台应用服务器,如果发生方和接受方不在同一个服务器,就会出现有问题。

下面就直接上代码

bo类

@Data@AllArgsConstructor@NoArgsConstructor@Builder@ToStringpublic class Message {private Integer id;private String msg;/*** 消息状态,1-未读,2-已读*/private Integer status;private Date sendDate;private Date readDate;private String from;private String to;}

配置类

@Configurationpublic class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate MyHandler myHandler;@Autowiredprivate MessageHandshakeInterceptor myHandshakeInterceptor;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(this.myHandler, "/ws/{uid}").setAllowedOrigins("*").addInterceptors(this.myHandshakeInterceptor);}}

核心类

/*** @author WGR* @create /1/20 -- 21:55*/@Componentpublic class MessageHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {String path = serverHttpRequest.getURI().getPath();String[] ss = path.split("/");map.put("uid",ss[2]);return true;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}}@Componentpublic class MyHandler extends TextWebSocketHandler {private static final Map<Integer,WebSocketSession> SESSIONS = new ConcurrentHashMap<>();@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;/*** 用于接收消息的方法* destination: 队列的名称或主题的名称*/@JmsListener(destination = "topic01")public void receiveMessage(javax.jms.Message message){if(message instanceof javax.jms.TextMessage){javax.jms.TextMessage textMessage = (javax.jms.TextMessage)message;try {System.out.println("接收消息:"+textMessage.getText());com.dalianpai.websocket.bo.Message msg = JSONObject.parseObject(textMessage.getText(), com.dalianpai.websocket.bo.Message.class);WebSocketSession session = SESSIONS.get(Integer.valueOf(msg.getTo()));if ((session != null && session.isOpen())) {try {session.sendMessage(new TextMessage(msg.getMsg()));} catch (IOException e) {e.printStackTrace();}}} catch (JMSException e) {e.printStackTrace();}}}@Overridepublic void handleTextMessage(WebSocketSession session, TextMessage message)throws IOException {System.out.println("获取到消息 >> " + message.getPayload());Message msg = JSONObject.parseObject(message.getPayload(),Message.class);System.out.println(msg);Object uid = session.getAttributes().get("uid");System.out.println(uid);//说明在这台服务器上String toId = msg.getTo();WebSocketSession toSession = SESSIONS.get(Integer.valueOf(toId));if ((toSession != null && toSession.isOpen())) {//TODO 具体格式需要和前端对接toSession.sendMessage(newTextMessage(message.getPayload()));}else{jmsMessagingTemplate.convertAndSend("topic01",message.getPayload());}}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throwsException {Integer uid = Integer.valueOf((String)session.getAttributes().get("uid"));session.sendMessage(new TextMessage(uid+", 你好!欢迎连接到ws服务"));SESSIONS.put(uid, session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status)throws Exception {Integer uid = Integer.valueOf((String)session.getAttributes().get("uid"));SESSIONS.remove(uid);System.out.println("断开连接!");}}

测试

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。