From c47051476b2eed0ee31487e13952a05770a15c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=9D=E5=B0=98?= <237809796@qq.com> Date: Wed, 28 May 2025 03:20:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7=EF=BC=9A=E6=9B=BF=E6=8D=A2?= =?UTF-8?q?=20Redis=20=E7=BC=93=E5=AD=98=E6=BA=90=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=9B=86=E7=BE=A4=E5=92=8C=E5=93=A8=E5=85=B5=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E5=8F=8A=E7=9B=B8=E5=85=B3=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/application.xml | 13 ++- conf/logging.properties | 1 + pom.xml | 8 +- src/com/zchd/base/BaseService.java | 14 ++- src/com/zchd/base/RedissionCacheSourcex.java | 99 ++++++++++++++++++++ src/com/zchd/zim/ImAccountService.java | 25 +++-- src/com/zchd/zim/ImChatService.java | 6 +- src/com/zchd/zim/ImChatWebSocket.java | 15 ++- src/com/zchd/zim/ImMessageMonitor.java | 8 +- 9 files changed, 148 insertions(+), 41 deletions(-) create mode 100644 src/com/zchd/base/RedissionCacheSourcex.java diff --git a/conf/application.xml b/conf/application.xml index 12fd411..eebfe32 100644 --- a/conf/application.xml +++ b/conf/application.xml @@ -10,7 +10,7 @@ - + + + + + + + diff --git a/conf/logging.properties b/conf/logging.properties index a9d6087..9ab4a38 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -6,6 +6,7 @@ javax.level=INFO com.sun.level=INFO sun.level=INFO jdk.level=INFO +io.netty.level = INFO java.util.logging.FileHandler.level=FINE #10M java.util.logging.FileHandler.limit=10485760 diff --git a/pom.xml b/pom.xml index 2c51595..9a63b32 100644 --- a/pom.xml +++ b/pom.xml @@ -34,10 +34,16 @@ 2.2.0 + + org.redisson + redisson + 3.47.0 + + com.mysql mysql-connector-j - 8.2.0 + 9.3.0 diff --git a/src/com/zchd/base/BaseService.java b/src/com/zchd/base/BaseService.java index 17a97e0..1aa6248 100644 --- a/src/com/zchd/base/BaseService.java +++ b/src/com/zchd/base/BaseService.java @@ -10,7 +10,6 @@ import org.redkale.source.DataSource; import org.redkale.source.PoolSource; import org.redkale.util.AnyValue; import org.redkale.util.Sheet; -import org.redkalex.cache.redis.MyRedisCacheSource; import javax.annotation.Resource; import java.io.File; @@ -29,14 +28,13 @@ public class BaseService extends AbstractService { @Resource(name = "zhub") protected ZHubClient zhub; - @Resource(name = "int_cache") - protected MyRedisCacheSource intCache; + /*@Resource(name = "redis") + protected RedissionCacheSourcex intCache; - @Resource(name = "long_cache") - protected MyRedisCacheSource longCache; - - @Resource(name = "str_cache") - protected MyRedisCacheSource strCache; + @Resource(name = "redis") + protected RedissionCacheSourcex longCache;*/ + @Resource(name = "redis") + protected RedissionCacheSourcex redisCache; @Resource(name = "z_im") protected DataSource zimSource; diff --git a/src/com/zchd/base/RedissionCacheSourcex.java b/src/com/zchd/base/RedissionCacheSourcex.java new file mode 100644 index 0000000..617fc5d --- /dev/null +++ b/src/com/zchd/base/RedissionCacheSourcex.java @@ -0,0 +1,99 @@ +package com.zchd.base; + +import org.redisson.Redisson; +import org.redisson.config.BaseConfig; +import org.redisson.config.Config; +import org.redkale.service.Local; +import org.redkale.source.CacheSource; +import org.redkale.util.AnyValue; +import org.redkale.util.AutoLoad; +import org.redkale.util.ResourceType; +import org.redkalex.cache.RedisCacheSource; +import org.redkalex.cache.RedissionCacheSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Local +@AutoLoad(false) +@ResourceType(CacheSource.class) +public class RedissionCacheSourcex extends RedissionCacheSource { + + private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + @Override + public void init(AnyValue conf) { + if (this.convert == null) this.convert = this.defaultConvert; + if (conf == null) conf = new AnyValue.DefaultAnyValue(); + + final List addresses = new ArrayList<>(); + Config config = new Config(); + AnyValue[] nodes = conf.getAnyValues("node"); + String type = conf.getOrDefault("type", ""); + String masterName = conf.getOrDefault("master", ""); + BaseConfig baseConfig = null; + for (AnyValue node : nodes) { + String addr = node.getValue("addr"); + addresses.add(addr); + String db0 = node.getValue("db", "").trim(); + if (!db0.isEmpty()) this.db = Integer.valueOf(db0); + if (nodes.length == 1) { + baseConfig = config.useSingleServer(); + config.useSingleServer().setAddress(addr); + config.useSingleServer().setDatabase(this.db); + } else if ("masterslave".equalsIgnoreCase(type)) { //主从 + baseConfig = config.useMasterSlaveServers(); + if (node.get("master") != null) { + config.useMasterSlaveServers().setMasterAddress(addr); + } else { + config.useMasterSlaveServers().addSlaveAddress(addr); + } + config.useMasterSlaveServers().setDatabase(this.db); + } else if ("cluster".equalsIgnoreCase(type)) { //集群 + baseConfig = config.useClusterServers(); + config.useClusterServers().addNodeAddress(addr); + } else if ("replicated".equalsIgnoreCase(type)) { // + baseConfig = config.useReplicatedServers(); + config.useReplicatedServers().addNodeAddress(addr); + config.useReplicatedServers().setDatabase(this.db); + } else if ("sentinel".equalsIgnoreCase(type)) { // + baseConfig = config.useSentinelServers(); + config.useSentinelServers().addSentinelAddress(addr); + config.useSentinelServers().setDatabase(this.db); + config.useSentinelServers().setMasterName(masterName); + } + } + if (baseConfig != null) { + String username = conf.getValue("username", "").trim(); + String password = conf.getValue("password", "").trim(); + String retryAttempts = conf.getValue("retryAttempts", "").trim(); + String retryInterval = conf.getValue("retryInterval", "").trim(); + if (!username.isEmpty()) baseConfig.setUsername(username); + if (!password.isEmpty()) baseConfig.setPassword(password); + if (!retryAttempts.isEmpty()) baseConfig.setRetryAttempts(Integer.parseInt(retryAttempts)); + if (!retryInterval.isEmpty()) baseConfig.setRetryInterval(Integer.parseInt(retryInterval)); + } + this.redisson = Redisson.create(config); + this.nodeAddrs = addresses; + if (logger.isLoggable(Level.FINE)) logger.log(Level.FINE, RedisCacheSource.class.getSimpleName() + ": addrs=" + addresses + ", db=" + db); + + } + + public boolean getBit(String key, int offset) { + return redisson.getBitSet(key).get(offset); + } + + public void setBit(String key, int offset, boolean bool) { + redisson.getBitSet(key).set(offset, bool); + } + + public V getHm(String key, T field) { + return (V) redisson.getMap(key).get(field); + } + + public void hset(String key, String field, Object value) { + hset(key, field, value.getClass(), value); + } +} diff --git a/src/com/zchd/zim/ImAccountService.java b/src/com/zchd/zim/ImAccountService.java index 7c349ae..2013ddc 100644 --- a/src/com/zchd/zim/ImAccountService.java +++ b/src/com/zchd/zim/ImAccountService.java @@ -9,7 +9,6 @@ import com.zchd.zim.entity.ChannelUser; import com.zchd.zim.entity.ImUser; import net.tccn.zhub.Lock; import net.tccn.zhub.RpcResult; -import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.RestMapping; import org.redkale.net.http.RestService; import org.redkale.service.RetResult; @@ -21,7 +20,6 @@ import org.redkale.util.AnyValue; import org.redkale.util.Comment; import org.redkale.util.TypeToken; import org.redkale.util.Utility; -import org.redkalex.cache.redis.MyRedisCacheSource; import javax.annotation.Resource; import java.io.Serializable; @@ -33,8 +31,8 @@ import java.util.concurrent.CompletableFuture; public class ImAccountService extends BaseService { - @Resource(name = "str_cache") - protected MyRedisCacheSource strCache; + /*@Resource(name = "redis") + protected RedissionCacheSourcex strCache;*/ @Resource protected ImMessageMonitor messageMonitor; /*@Resource @@ -50,7 +48,8 @@ public class ImAccountService extends BaseService { if (!list.isEmpty()) { userid = Kv.toAs(list.get(0), int.class); } - intCache.set("im:max-userid", userid); + // intCache.set("im:max-userid", userid); + redisCache.setLongAsync("im:max-userid", userid); }); // 游戏用户注册 @@ -100,7 +99,7 @@ public class ImAccountService extends BaseService { ImUser user = currentImUser(imtoken); int userid = user.getUserid(); zimSource.updateColumn(ImUser.class, userid, ColumnValue.create("status", ImUser.STATUS_FREEZE_ACTIVE)); - intCache.setBit("im:banned-talk", userid, true); + redisCache.setBit("im:banned-talk", userid, true); return r.render(); }); @@ -113,7 +112,7 @@ public class ImAccountService extends BaseService { ImUser user = currentImUser(imtoken); int userid = user.getUserid(); zimSource.updateColumn(ImUser.class, userid, ColumnValue.create("status", 10)); - intCache.setBit("im:banned-talk", userid, false); + redisCache.setBit("im:banned-talk", userid, false); return r.render(); }); } @@ -121,12 +120,11 @@ public class ImAccountService extends BaseService { @Comment("获取当前用户ID") public ImUser currentImUser(String token) { - - ImUser user = strCache.hget("im:user-token", token, new TypeToken() { - }.getType()); + ImUser user = (ImUser) redisCache.hget("im:user-token", token, ImUser.class); if (user == null) { user = zimSource.find(ImUser.class, FilterNode.create("imtoken", token)); - strCache.hset("im:user-token", token, JsonConvert.root(), user); + // strCache.hset("im:user-token", token, JsonConvert.root(), user); + redisCache.hset("im:user-token", token, user); } return user; } @@ -159,7 +157,7 @@ public class ImAccountService extends BaseService { if (user == null) { user = ImUser.buildImUser(bean.getGuserid(), bean.getAppid()); - int userid = (int) intCache.incr("im:max-userid"); + int userid = (int) redisCache.incr("im:max-userid"); user.setUserid(userid); user.setImtoken(Utility.uuid()); // 生成IM-TOKEN @@ -171,7 +169,8 @@ public class ImAccountService extends BaseService { zimSource.updateColumn(ImUser.class, user.getUserid(), ColumnValue.create("imtoken", user.getImtoken())); } - strCache.hset("im:user-token", user.getImtoken(), JsonConvert.root(), user); + //strCache.hset("im:user-token", user.getImtoken(), JsonConvert.root(), user); + redisCache.hset("im:user-token", user.getImtoken(), user); return RetResult.success(Kv.of("token", user.getImtoken())); } diff --git a/src/com/zchd/zim/ImChatService.java b/src/com/zchd/zim/ImChatService.java index 71c7f06..a0a9006 100644 --- a/src/com/zchd/zim/ImChatService.java +++ b/src/com/zchd/zim/ImChatService.java @@ -128,7 +128,7 @@ public class ImChatService extends BaseService { // 拉取频道离线消息 List userGroups = userChannels(userid); for (ChannelUser userGroup : userGroups) { - Long lastAcceptTime = longCache.getHm("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid()); + Long lastAcceptTime = redisCache.hgetLong("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid() + "", 0); if (lastAcceptTime == null || lastAcceptTime == 0) { lastAcceptTime = userGroup.getCreatetime(); } @@ -158,7 +158,7 @@ public class ImChatService extends BaseService { } } finally { - longCache.setHm("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid(), lastAcceptTime); + redisCache.hsetLong("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid() + "", lastAcceptTime); // 变更最后拉取时间 /*FilterNode node = FilterNode.create("appid", userGroup.getAppid()) .and("channeltype", userGroup.getChanneltype()) @@ -177,7 +177,7 @@ public class ImChatService extends BaseService { @Comment("用户IM是否在线") public boolean userOnline(int userid) { - return intCache.getBit("im:user", userid); + return redisCache.getBit("im:user", userid); } @Comment("用户在线直接发送, 如果不在线先记录到数据库") diff --git a/src/com/zchd/zim/ImChatWebSocket.java b/src/com/zchd/zim/ImChatWebSocket.java index 551e7fb..22127a7 100644 --- a/src/com/zchd/zim/ImChatWebSocket.java +++ b/src/com/zchd/zim/ImChatWebSocket.java @@ -1,5 +1,6 @@ package com.zchd.zim; +import com.zchd.base.RedissionCacheSourcex; import com.zchd.base.info.SwearWordService; import com.zchd.base.util.Kv; import com.zchd.base.util.Utils; @@ -14,7 +15,6 @@ import org.redkale.net.http.WebSocket; import org.redkale.service.RetResult; import org.redkale.source.DataSource; import org.redkale.util.Comment; -import org.redkalex.cache.redis.MyRedisCacheSource; import javax.annotation.Resource; import java.util.Map; @@ -28,9 +28,8 @@ public class ImChatWebSocket extends WebSocket { @Resource(name = "z_im") protected DataSource zimSource; - - @Resource(name = "int_cache") - protected MyRedisCacheSource intCache; + @Resource(name = "redis") + protected RedissionCacheSourcex redisCache; @Resource(name = "zhub") protected ZHubClient zhub; @@ -78,7 +77,7 @@ public class ImChatWebSocket extends WebSocket { return CompletableFuture.runAsync(() -> { final int userid = (int) getUserid(); getLogger().info("im:state-change:" + userid + "---ws connected---"); - intCache.setBit("im:user", userid, true); + redisCache.setBit("im:user", userid, true); // 推送上线状态 zhub.publish("im:online:" + getAttribute("appid"), getAttribute("guserid")); // 上线开启订阅 @@ -104,7 +103,7 @@ public class ImChatWebSocket extends WebSocket { FriendMessage message = FriendMessage.buildFriendMessage(bean.get("content"), userid, targetguserid); // 禁言检查 - if (intCache.getBit("im:banned-talk", userid)) { + if (redisCache.getBit("im:banned-talk", userid)) { sendTip("发送失败,你已被禁言", mck, 3002); return; } @@ -139,7 +138,7 @@ public class ImChatWebSocket extends WebSocket { String mck = extmap.get("mck"); // 禁言检查 - if (intCache.getBit("im:banned-talk", userid)) { + if (redisCache.getBit("im:banned-talk", userid)) { sendTip("发送失败,你已被禁言", mck, 3002); return; } @@ -172,7 +171,7 @@ public class ImChatWebSocket extends WebSocket { public CompletableFuture onClose(int code, String reason) { final int userid = (int) getUserid(); getLogger().info("im:state-change:" + userid + "---close---" + code + "---" + reason); - intCache.setBit("im:user", userid, false); + redisCache.setBit("im:user", userid, false); // 推送离线状态 zhub.publish("im:offline:" + getAttribute("appid"), getAttribute("guserid")); // 取消用户订阅 diff --git a/src/com/zchd/zim/ImMessageMonitor.java b/src/com/zchd/zim/ImMessageMonitor.java index 73699a5..30c046d 100644 --- a/src/com/zchd/zim/ImMessageMonitor.java +++ b/src/com/zchd/zim/ImMessageMonitor.java @@ -13,7 +13,6 @@ import org.redkale.util.AnyValue; import org.redkale.util.Comment; import org.redkale.util.TypeToken; import org.redkale.util.Utility; -import org.redkalex.cache.redis.MyRedisCacheSource; import javax.annotation.Resource; import java.util.BitSet; @@ -32,11 +31,6 @@ public class ImMessageMonitor extends BaseService { @Resource(name = "im_chat") protected WebSocketNode wsnode; - @Resource(name = "int_cache") - protected MyRedisCacheSource intCache; - @Resource(name = "long_cache") - protected MyRedisCacheSource longCache; - protected BitSet bitSet = new BitSet(); // IM用户连接当前实例记录 protected ConcurrentHashMap> channelSubscribe = new ConcurrentHashMap<>(); @@ -122,7 +116,7 @@ public class ImMessageMonitor extends BaseService { } // todo:并发场景下,会影响拉取离线消息 - longCache.setHm("im:channel:" + x.buildChannelid(), uid, System.currentTimeMillis()); + redisCache.hset("im:channel:" + x.buildChannelid(), uid + "", System.currentTimeMillis()); }); }); });