package com.zchd.zim; import com.zchd.base.BaseService; import com.zchd.base.util.Kv; import com.zchd.base.util.QueueTask; import com.zchd.base.util.Utils; import com.zchd.zim.entity.ChannelMessage; import com.zchd.zim.entity.ChannelUser; import com.zchd.zim.entity.FriendMessage; import org.redkale.net.http.RestService; import org.redkale.net.http.WebSocketNode; import org.redkale.util.AnyValue; import org.redkale.util.Comment; import org.redkale.util.TypeToken; import org.redkale.util.Utility; import org.redkalex.cache.redis.MyRedisCacheSource; import javax.annotation.Resource; import java.util.BitSet; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; @RestService(name = "im_message_monitor", comment = "总线消息订阅管理") public class ImMessageMonitor extends BaseService { @Resource protected ImChatService chatService; @Resource(name = "im_chat") protected WebSocketNode wsnode; @Resource(name = "int_cache") protected MyRedisCacheSource intCache; @Resource(name = "long_cache") protected MyRedisCacheSource longCache; protected BitSet bitSet = new BitSet(); // IM用户连接当前实例记录 protected ConcurrentHashMap> channelSubscribe = new ConcurrentHashMap<>(); protected final QueueTask messageQueue = new QueueTask<>(1); @Override public void init(AnyValue config) { messageQueue.init(Logger.getLogger(this.getClass().getSimpleName()), Runnable::run); // 订阅用户动态广播消息, 用户未连接当前IM 服务实例,不做通道订阅 zhub.subscribe("im:channel", new TypeToken>() { }, para -> { int userid = para.getInt("userid"); if (!bitSet.get(userid)) { return; } String channelid = para.get("channelid"); String type = para.get("type"); switch (type) { case "subscribe" -> subscribeChannel(userid, channelid); case "unsubscribe" -> unsubscribeChannel(userid, channelid); } }); } @Comment("用户上线") public void online(int userid) { // 设置用户在当前实例标记 bitSet.set(userid); // 开启频道订阅 List list = chatService.userChannels(userid); list.forEach(x -> { subscribeChannel(userid, x.buildChannelid()); }); // 开启对点订阅 zhub.subscribe("im:friend:" + userid, new TypeToken() { }, x -> { messageQueue.add(() -> { chatService.sendMsg(x, userid); }); }); } @Comment("用户下线") public void offline(int userid) { // 设置用户在当前实例标记 bitSet.clear(userid); // 取消点对点订阅 zhub.unsubscribe("im:friend:" + userid); // 取消频道订阅 List list = chatService.userChannels(userid); list.forEach(x -> { unsubscribeChannel(userid, x.buildChannelid()); }); } @Comment("订阅频道") private void subscribeChannel(int userid, String channelid) { Set uids = channelSubscribe.get(channelid); if (uids == null) { uids = new HashSet<>(); zhub.subscribe("im:channel:" + channelid, new TypeToken() { }, x -> { messageQueue.add(() -> { Set _uids = channelSubscribe.get(channelid); _uids.forEach(uid -> { // 自己发的频道消息 不发送给自己 if (x.getSenduserid() == uid) { return; } wsnode.sendMessage(chatService.buildMessageDetail(x), uid).thenAccept(status -> { // 完成群发消息同步 if (status != 0) { return; } // todo:并发场景下,会影响拉取离线消息 longCache.setHm("im:channel:" + x.buildChannelid(), uid, System.currentTimeMillis()); }); }); }); }); } uids.add(userid); channelSubscribe.put(channelid, uids); } @Comment("取消订阅频道") private void unsubscribeChannel(int userid, String channelid) { Set uids = channelSubscribe.get(channelid); if (uids == null) { return; } // 先移除用户 uids.remove(userid); // 当主题订阅没有人在线,取消总线订阅 if (uids.isEmpty()) { channelSubscribe.remove(channelid); zhub.unsubscribe("im:channel:" + channelid); return; } channelSubscribe.put(channelid, uids); } public void subscribeChannel(int userid, String channelid, boolean subscribe) { Kv kv = Kv.of("userid", userid) .set("channelid", channelid) .set("type", subscribe ? "subscribe" : "unsubscribe"); zhub.broadcast("im:channel", kv); } public static void main(String[] args) { long millis = System.currentTimeMillis(); System.out.println(Utils.fmt36(millis)); System.out.println(millis); System.out.println(Utility.uuid()); } }