package com.zchd.zim; import com.zchd.base.BaseService; import com.zchd.base.util.Kv; import com.zchd.base.util.QueueTask; import com.zchd.zim.entity.ChannelMessage; import com.zchd.zim.entity.ChannelUser; import com.zchd.zim.entity.FriendMessage; import com.zchd.zim.entity.ImUser; import net.tccn.timer.Timers; import net.tccn.zhub.ZHubClient; import org.redkale.net.http.RestService; import org.redkale.net.http.WebSocketNode; import org.redkale.source.ColumnValue; import org.redkale.source.DataSource; import org.redkale.source.FilterNode; import org.redkale.source.Flipper; import org.redkale.util.AnyValue; import org.redkale.util.Comment; import javax.annotation.Resource; import javax.persistence.Transient; import java.util.List; import static org.redkale.source.FilterExpress.*; @RestService(name = "im_chat_x", comment = "IM消息总线") public class ImChatService extends BaseService { @Resource(name = "im_chat") protected WebSocketNode wsnode; @Resource(name = "z_im") protected DataSource zimSource; @Resource(name = "zhub") protected ZHubClient zhub; @Transient //消息发送 protected QueueTask msgQueue = new QueueTask<>(20); @Transient //消息存储 protected QueueTask dbQueue = new QueueTask<>(1); @Override public void init(AnyValue config) { msgQueue.init(logger, Runnable::run); dbQueue.init(logger, Runnable::run); } @Comment("发送私聊消息") public void sendMsg(FriendMessage bean, int uid) { wsnode.sendMessage(buildFriendMessageDeail(bean), uid).thenAccept(x -> { if (x != 0) { return; } // intCache.incrHm("im:heartbeat:" + uid, "messagecount", -1); // 更新状态失败, 200ms 后重试(当数据未写入的时候,更新失败) Timers.tryDelay(() -> { FilterNode node = FilterNode.create("senduserid", bean.getSenduserid()).and("receiveuserid", uid); int updateColumn = zimSource.updateColumn(FriendMessage.class, node, ColumnValue.create("status", 10)); return updateColumn != 0; }, 200, 3); }); } @Comment("拉取离线消息") public void pullOfflineMsg(int userid) { msgQueue.add(() -> { // 拉取私聊离线消息 List list = zimSource.queryList(FriendMessage.class, new Flipper(0, "createtime"), FilterNode.create("receiveuserid", userid).and("status", 20)); list.forEach(x -> { Integer join = wsnode.sendMessage(buildFriendMessageDeail(x), userid).join(); if (join == 0) { zimSource.updateColumn(FriendMessage.class, x.getMessageid(), ColumnValue.create("status", 10)); } }); // 拉取频道离线消息 List userGroups = userChannels(userid); for (ChannelUser userGroup : userGroups) { Long lastAcceptTime = longCache.getHm("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid()); if (lastAcceptTime == null || lastAcceptTime == 0) { lastAcceptTime = userGroup.getCreatetime(); } // 获取当前频道的数据列表 FilterNode node1 = FilterNode.create("appid", userGroup.getAppid()) .and("channeltype", userGroup.getChanneltype()) .and("channelvalue", userGroup.getChannelvalue()) .and("senduserid", NOTEQUAL, userid) .and("createtime", GREATERTHAN, lastAcceptTime) .and("createtime", LESSTHAN, System.currentTimeMillis()) .and("status", NOTEQUAL, 80); try { List messages = zimSource.queryList(ChannelMessage.class, new Flipper(50, "createtime desc"), node1); // 获取最新的消息集合后,从时间最早的开始推送 for (int i = messages.size() - 1; i > -1; i--) { ChannelMessage message = messages.get(i); int status = wsnode.sendMessage(buildMessageDetail(message), userid).join(); if (status != 0) { return; } // 每次推完一个消息后将拉取时间修改为当前消息的发送时间 lastAcceptTime = message.getCreatetime(); } } finally { longCache.setHm("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid(), lastAcceptTime); // 变更最后拉取时间 /*FilterNode node = FilterNode.create("appid", userGroup.getAppid()) .and("channeltype", userGroup.getChanneltype()) .and("channelvalue", userGroup.getChannelvalue()).and("userid", userid); zimSource.updateColumn(ChannelUser.class, node, ColumnValue.create("lastaccepttime", lastAcceptTime)); // todo 中途失败时间有问题*/ } } }); } public List userChannels(int userid) { FilterNode node = FilterNode.create("userid", userid) .and(FilterNode.create("status", 10).or("status", 40)); return zimSource.queryList(ChannelUser.class, node); } @Comment("用户IM是否在线") public boolean userOnline(int userid) { return intCache.getBit("im:user", userid); } @Comment("用户在线直接发送, 如果不在线先记录到数据库") public void sendFriendMessage(FriendMessage message) { dbQueue.add(() -> { zimSource.insert(message); }); int receiveuserid = message.getReceiveuserid(); if (userOnline(receiveuserid)) { zhub.publish("im:friend:" + receiveuserid, message); } } public void sendChannelMessage(ChannelMessage message) { // 记录消息,发送群消息 dbQueue.add(() -> { zimSource.insert(message); }); zhub.broadcast("im:channel:" + message.buildChannelid(), message); } private String getGuserid(int userid) { ImUser user = zimSource.find(ImUser.class, userid); return user.getGuserid(); } public int getUserid(String appid, String guserid) { ImUser user = zimSource.find(ImUser.class, FilterNode.create("appid", appid).and("guserid", guserid)); return user.getUserid(); } public Kv buildFriendMessageDeail(FriendMessage message) { Kv detail = Kv.toKv(message, "content", "messageid", "sendtime"); detail.set("sendguserid", getGuserid(message.getSenduserid())); Kv data = Kv.of("detail", detail).set("type", "friend-text"); return data; } public Kv buildBackMessage(FriendMessage message, String mck) { Kv detail = Kv.toKv(message, "content", "messageid", "sendtime"); detail.set("sendguserid", getGuserid(message.getSenduserid())); Kv data = Kv.of("detail", detail).set("type", "friend-text").set("mck", mck); return data; } public Kv buildMessageDetail(ChannelMessage message) { Kv detail = Kv.toKv(message, "content", "messageid", "sendtime", "channeltype", "channelvalue"); detail.set("sendguserid", getGuserid(message.getSenduserid())); Kv data = Kv.of("detail", detail).set("type", "channel-text"); return data; } public Kv buildBackMessage(ChannelMessage message, String mck) { Kv detail = Kv.toKv(message, "content", "messageid", "sendtime", "channeltype", "channelvalue"); detail.set("sendguserid", getGuserid(message.getSenduserid())); Kv data = Kv.of("detail", detail).set("type", "channel-text").set("mck", mck); return data; } }