300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)

CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)

时间:2019-08-04 15:03:32

相关推荐

CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)

BindMessageListener

如下代码作为理解入口,安卓、IOS、web在连接成功后才会发起所谓登录,登录时会携带必要信息,例如个人账号唯一标识uid、设备类型deviceId等字段发起登录请求,目前重写的onMessage回调方法中只是对登录逻辑进行处理,若要实现单聊及群聊需要额外拓展参数msgType(信息参数类型),根据信息类型作出相应的逻辑处理,在登录逻辑中:

1.Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class); String uid = session.getUid();获取uid,后面根据uid及设备限定类型过滤筛选出此账号的所有频道(因为一个账号可以在不同终端登录,而同一终端只能一个账号登录)

2.channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));意思是将要通知下线的频道集合中过滤掉本次连接的频道,不然也会通知刚连接上的频道

3.Collection<Channel> 相同账号不同终端的频道都放在此

4.@Resource private SessionGroup sessionGroup;sessionGroup是存放所有频道信息的容器,后续单聊、群聊拓展需要从此理解及开发逻辑

package com.ponent.message;import com.farsunset.cim.entity.Session;import com.farsunset.cim.sdk.server.constant.ChannelAttr;import com.farsunset.cim.sdk.server.group.SessionGroup;import com.farsunset.cim.sdk.server.model.Message;import com.farsunset.cim.util.JSONUtils;import ty.channel.Channel;import org.springframework.data.redis.connection.MessageListener;import org.ponent;import javax.annotation.Resource;import java.util.Collection;import java.util.HashMap;import java.util.Map;import java.util.Objects;/*** 集群环境下,监控多设备登录情况,控制是否其余终端下线的逻辑*/@Componentpublic class BindMessageListener implements MessageListener {private static final String FORCE_OFFLINE_ACTION = "999";private static final String SYSTEM_ID = "0";/*一个账号只能在同一个类型的终端登录如: 多个android或ios不能同时在线一个android或ios可以和web,桌面同时在线*/private final Map<String,String[]> conflictMap = new HashMap<>();@Resourceprivate SessionGroup sessionGroup;public BindMessageListener(){conflictMap.put(Session.CHANNEL_ANDROID,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});conflictMap.put(Session.CHANNEL_IOS,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});conflictMap.put(Session.CHANNEL_WINDOWS,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});conflictMap.put(Session.CHANNEL_WEB,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});conflictMap.put(Session.CHANNEL_MAC,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});}@Overridepublic void onMessage(org.springframework.data.redis.connection.Message redisMessage, byte[] bytes) {Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class);String uid = session.getUid();String[] conflictChannels = conflictMap.get(session.getChannel());Collection<Channel> channelList = sessionGroup.find(uid,conflictChannels);channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));/** 获取到其他在线的终端连接,提示账号再其他终端登录*/channelList.forEach(channel -> {if (Objects.equals(session.getDeviceId(),channel.attr(ChannelAttr.DEVICE_ID).get())){channel.close();return;}Message message = new Message();message.setAction(FORCE_OFFLINE_ACTION);message.setReceiver(uid);message.setSender(SYSTEM_ID);message.setContent(session.getDeviceName());channel.writeAndFlush(message);channel.close();});}}

netty即时通讯SDK部分源码

package com.farsunset.cim.sdk.server.group;import com.farsunset.cim.sdk.server.constant.ChannelAttr;import com.farsunset.cim.sdk.server.model.Message;import ty.channel.Channel;import ty.channel.ChannelFuture;import ty.channel.ChannelFutureListener;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.function.Predicate;import java.util.stream.Collectors;public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();private final transient ChannelFutureListener remover = new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future){future.removeListener(this);remove(future.channel());}};protected String getKey(Channel channel){return channel.attr(ChannelAttr.UID).get();}public void remove(Channel channel){String uid = getKey(channel);if(uid == null){return;}Collection<Channel> collections = getOrDefault(uid,EMPTY_LIST);collections.remove(channel);if (collections.isEmpty()){remove(uid);}}public void add(Channel channel){String uid = getKey(channel);if (uid == null || !channel.isActive()){return;}channel.closeFuture().addListener(remover);Collection<Channel> collections = this.putIfAbsent(uid,new ConcurrentLinkedQueue<>(Collections.singleton(channel)));if (collections != null){collections.add(channel);}if (!channel.isActive()){remove(channel);}}public void write(String key,Message message){find(key).forEach(channel -> channel.writeAndFlush(message));}public void write(String key, Message message, Predicate<Channel> matcher){find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));}public void write(String key, Message message, Collection<String> excludedSet){find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message));}public void write(Message message){this.write(message.getReceiver(),message);}public Collection<Channel> find(String key){return this.getOrDefault(key,EMPTY_LIST);}public Collection<Channel> find(String key,String... channel){List<String> channels = Arrays.asList(channel);return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList());}}

安卓发送信息接口

package com.farsunset.cim.http;import retrofit2.Call;import retrofit2.http.Field;import retrofit2.http.FormUrlEncoded;import retrofit2.http.POST;public interface MessageService {@POST("/send")@FormUrlEncodedCall<Void> send(@Field("sender") String sender,@Field("receiver") String receiver,@Field("action") String action,@Field("content") String content);}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。