升级:替换 Redis 缓存源支持集群和哨兵模式及相关修改

This commit is contained in:
绝尘 2025-05-28 03:20:30 +08:00
parent 7f962319f5
commit c47051476b
9 changed files with 148 additions and 41 deletions

View File

@ -10,7 +10,7 @@
<listener value="com.zchd.base.AppListener"/>
<listener value="net.tccn.ZhubListener"/>
<source name="int_cache" value="org.redkalex.cache.redis.MyRedisCacheSource">
<!--<source name="int_cache" value="org.redkalex.cache.redis.MyRedisCacheSource">
<node addr="47.106.237.198" password="hello123!" port="6064" db="0"/>
</source>
<source name="str_cache" value="org.redkalex.cache.redis.MyRedisCacheSource">
@ -18,7 +18,18 @@
</source>
<source name="long_cache" value="org.redkalex.cache.redis.MyRedisCacheSource">
<node addr="47.106.237.198" password="hello123!" port="6064" db="0"/>
</source>-->
<source name="redis" value="com.zchd.base.RedissionCacheSourcex" type="cluster">
<node addr="redis://127.0.0.1:7000"/>
<node addr="redis://127.0.0.1:7001"/>
<node addr="redis://127.0.0.1:7002"/>
</source>
<!--<source name="redis" value="com.zchd.base.RedissionCacheSourcex" type="sentinel" master="mymaster">
<node addr="redis://127.0.0.1:26379"/>
<node addr="redis://127.0.0.1:26380"/>
<node addr="redis://127.0.0.1:26381"/>
</source>-->
</resources>
<server protocol="HTTP" port="8091" maxbody="2m">

View File

@ -6,6 +6,7 @@ javax.level=INFO
com.sun.level=INFO
sun.level=INFO
jdk.level=INFO
io.netty.level = INFO
java.util.logging.FileHandler.level=FINE
#10M
java.util.logging.FileHandler.limit=10485760

View File

@ -34,10 +34,16 @@
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.47.0</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.2.0</version>
<version>9.3.0</version>
</dependency>
<dependency>

View File

@ -10,7 +10,6 @@ import org.redkale.source.DataSource;
import org.redkale.source.PoolSource;
import org.redkale.util.AnyValue;
import org.redkale.util.Sheet;
import org.redkalex.cache.redis.MyRedisCacheSource;
import javax.annotation.Resource;
import java.io.File;
@ -29,14 +28,13 @@ public class BaseService extends AbstractService {
@Resource(name = "zhub")
protected ZHubClient zhub;
@Resource(name = "int_cache")
protected MyRedisCacheSource<Integer> intCache;
/*@Resource(name = "redis")
protected RedissionCacheSourcex<Integer> intCache;
@Resource(name = "long_cache")
protected MyRedisCacheSource<Long> longCache;
@Resource(name = "str_cache")
protected MyRedisCacheSource<String> strCache;
@Resource(name = "redis")
protected RedissionCacheSourcex<Long> longCache;*/
@Resource(name = "redis")
protected RedissionCacheSourcex redisCache;
@Resource(name = "z_im")
protected DataSource zimSource;

View File

@ -0,0 +1,99 @@
package com.zchd.base;
import org.redisson.Redisson;
import org.redisson.config.BaseConfig;
import org.redisson.config.Config;
import org.redkale.service.Local;
import org.redkale.source.CacheSource;
import org.redkale.util.AnyValue;
import org.redkale.util.AutoLoad;
import org.redkale.util.ResourceType;
import org.redkalex.cache.RedisCacheSource;
import org.redkalex.cache.RedissionCacheSource;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@Local
@AutoLoad(false)
@ResourceType(CacheSource.class)
public class RedissionCacheSourcex<V> extends RedissionCacheSource<V> {
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Override
public void init(AnyValue conf) {
if (this.convert == null) this.convert = this.defaultConvert;
if (conf == null) conf = new AnyValue.DefaultAnyValue();
final List<String> addresses = new ArrayList<>();
Config config = new Config();
AnyValue[] nodes = conf.getAnyValues("node");
String type = conf.getOrDefault("type", "");
String masterName = conf.getOrDefault("master", "");
BaseConfig baseConfig = null;
for (AnyValue node : nodes) {
String addr = node.getValue("addr");
addresses.add(addr);
String db0 = node.getValue("db", "").trim();
if (!db0.isEmpty()) this.db = Integer.valueOf(db0);
if (nodes.length == 1) {
baseConfig = config.useSingleServer();
config.useSingleServer().setAddress(addr);
config.useSingleServer().setDatabase(this.db);
} else if ("masterslave".equalsIgnoreCase(type)) { //主从
baseConfig = config.useMasterSlaveServers();
if (node.get("master") != null) {
config.useMasterSlaveServers().setMasterAddress(addr);
} else {
config.useMasterSlaveServers().addSlaveAddress(addr);
}
config.useMasterSlaveServers().setDatabase(this.db);
} else if ("cluster".equalsIgnoreCase(type)) { //集群
baseConfig = config.useClusterServers();
config.useClusterServers().addNodeAddress(addr);
} else if ("replicated".equalsIgnoreCase(type)) { //
baseConfig = config.useReplicatedServers();
config.useReplicatedServers().addNodeAddress(addr);
config.useReplicatedServers().setDatabase(this.db);
} else if ("sentinel".equalsIgnoreCase(type)) { //
baseConfig = config.useSentinelServers();
config.useSentinelServers().addSentinelAddress(addr);
config.useSentinelServers().setDatabase(this.db);
config.useSentinelServers().setMasterName(masterName);
}
}
if (baseConfig != null) {
String username = conf.getValue("username", "").trim();
String password = conf.getValue("password", "").trim();
String retryAttempts = conf.getValue("retryAttempts", "").trim();
String retryInterval = conf.getValue("retryInterval", "").trim();
if (!username.isEmpty()) baseConfig.setUsername(username);
if (!password.isEmpty()) baseConfig.setPassword(password);
if (!retryAttempts.isEmpty()) baseConfig.setRetryAttempts(Integer.parseInt(retryAttempts));
if (!retryInterval.isEmpty()) baseConfig.setRetryInterval(Integer.parseInt(retryInterval));
}
this.redisson = Redisson.create(config);
this.nodeAddrs = addresses;
if (logger.isLoggable(Level.FINE)) logger.log(Level.FINE, RedisCacheSource.class.getSimpleName() + ": addrs=" + addresses + ", db=" + db);
}
public boolean getBit(String key, int offset) {
return redisson.getBitSet(key).get(offset);
}
public void setBit(String key, int offset, boolean bool) {
redisson.getBitSet(key).set(offset, bool);
}
public <T extends Object> V getHm(String key, T field) {
return (V) redisson.getMap(key).get(field);
}
public void hset(String key, String field, Object value) {
hset(key, field, value.getClass(), value);
}
}

View File

@ -9,7 +9,6 @@ import com.zchd.zim.entity.ChannelUser;
import com.zchd.zim.entity.ImUser;
import net.tccn.zhub.Lock;
import net.tccn.zhub.RpcResult;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.RestMapping;
import org.redkale.net.http.RestService;
import org.redkale.service.RetResult;
@ -21,7 +20,6 @@ import org.redkale.util.AnyValue;
import org.redkale.util.Comment;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
import org.redkalex.cache.redis.MyRedisCacheSource;
import javax.annotation.Resource;
import java.io.Serializable;
@ -33,8 +31,8 @@ import java.util.concurrent.CompletableFuture;
public class ImAccountService extends BaseService {
@Resource(name = "str_cache")
protected MyRedisCacheSource<String> strCache;
/*@Resource(name = "redis")
protected RedissionCacheSourcex<String> strCache;*/
@Resource
protected ImMessageMonitor messageMonitor;
/*@Resource
@ -50,7 +48,8 @@ public class ImAccountService extends BaseService {
if (!list.isEmpty()) {
userid = Kv.toAs(list.get(0), int.class);
}
intCache.set("im:max-userid", userid);
// intCache.set("im:max-userid", userid);
redisCache.setLongAsync("im:max-userid", userid);
});
// 游戏用户注册
@ -100,7 +99,7 @@ public class ImAccountService extends BaseService {
ImUser user = currentImUser(imtoken);
int userid = user.getUserid();
zimSource.updateColumn(ImUser.class, userid, ColumnValue.create("status", ImUser.STATUS_FREEZE_ACTIVE));
intCache.setBit("im:banned-talk", userid, true);
redisCache.setBit("im:banned-talk", userid, true);
return r.render();
});
@ -113,7 +112,7 @@ public class ImAccountService extends BaseService {
ImUser user = currentImUser(imtoken);
int userid = user.getUserid();
zimSource.updateColumn(ImUser.class, userid, ColumnValue.create("status", 10));
intCache.setBit("im:banned-talk", userid, false);
redisCache.setBit("im:banned-talk", userid, false);
return r.render();
});
}
@ -121,12 +120,11 @@ public class ImAccountService extends BaseService {
@Comment("获取当前用户ID")
public ImUser currentImUser(String token) {
ImUser user = strCache.hget("im:user-token", token, new TypeToken<ImUser>() {
}.getType());
ImUser user = (ImUser) redisCache.hget("im:user-token", token, ImUser.class);
if (user == null) {
user = zimSource.find(ImUser.class, FilterNode.create("imtoken", token));
strCache.hset("im:user-token", token, JsonConvert.root(), user);
// strCache.hset("im:user-token", token, JsonConvert.root(), user);
redisCache.hset("im:user-token", token, user);
}
return user;
}
@ -159,7 +157,7 @@ public class ImAccountService extends BaseService {
if (user == null) {
user = ImUser.buildImUser(bean.getGuserid(), bean.getAppid());
int userid = (int) intCache.incr("im:max-userid");
int userid = (int) redisCache.incr("im:max-userid");
user.setUserid(userid);
user.setImtoken(Utility.uuid()); // 生成IM-TOKEN
@ -171,7 +169,8 @@ public class ImAccountService extends BaseService {
zimSource.updateColumn(ImUser.class, user.getUserid(), ColumnValue.create("imtoken", user.getImtoken()));
}
strCache.hset("im:user-token", user.getImtoken(), JsonConvert.root(), user);
//strCache.hset("im:user-token", user.getImtoken(), JsonConvert.root(), user);
redisCache.hset("im:user-token", user.getImtoken(), user);
return RetResult.success(Kv.of("token", user.getImtoken()));
}

View File

@ -128,7 +128,7 @@ public class ImChatService extends BaseService {
// 拉取频道离线消息
List<ChannelUser> userGroups = userChannels(userid);
for (ChannelUser userGroup : userGroups) {
Long lastAcceptTime = longCache.getHm("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid());
Long lastAcceptTime = redisCache.hgetLong("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid() + "", 0);
if (lastAcceptTime == null || lastAcceptTime == 0) {
lastAcceptTime = userGroup.getCreatetime();
}
@ -158,7 +158,7 @@ public class ImChatService extends BaseService {
}
} finally {
longCache.setHm("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid(), lastAcceptTime);
redisCache.hsetLong("im:channel:" + userGroup.buildChannelid(), userGroup.getUserid() + "", lastAcceptTime);
// 变更最后拉取时间
/*FilterNode node = FilterNode.create("appid", userGroup.getAppid())
.and("channeltype", userGroup.getChanneltype())
@ -177,7 +177,7 @@ public class ImChatService extends BaseService {
@Comment("用户IM是否在线")
public boolean userOnline(int userid) {
return intCache.getBit("im:user", userid);
return redisCache.getBit("im:user", userid);
}
@Comment("用户在线直接发送, 如果不在线先记录到数据库")

View File

@ -1,5 +1,6 @@
package com.zchd.zim;
import com.zchd.base.RedissionCacheSourcex;
import com.zchd.base.info.SwearWordService;
import com.zchd.base.util.Kv;
import com.zchd.base.util.Utils;
@ -14,7 +15,6 @@ import org.redkale.net.http.WebSocket;
import org.redkale.service.RetResult;
import org.redkale.source.DataSource;
import org.redkale.util.Comment;
import org.redkalex.cache.redis.MyRedisCacheSource;
import javax.annotation.Resource;
import java.util.Map;
@ -28,9 +28,8 @@ public class ImChatWebSocket extends WebSocket {
@Resource(name = "z_im")
protected DataSource zimSource;
@Resource(name = "int_cache")
protected MyRedisCacheSource<Integer> intCache;
@Resource(name = "redis")
protected RedissionCacheSourcex redisCache;
@Resource(name = "zhub")
protected ZHubClient zhub;
@ -78,7 +77,7 @@ public class ImChatWebSocket extends WebSocket {
return CompletableFuture.runAsync(() -> {
final int userid = (int) getUserid();
getLogger().info("im:state-change:" + userid + "---ws connected---");
intCache.setBit("im:user", userid, true);
redisCache.setBit("im:user", userid, true);
// 推送上线状态
zhub.publish("im:online:" + getAttribute("appid"), getAttribute("guserid"));
// 上线开启订阅
@ -104,7 +103,7 @@ public class ImChatWebSocket extends WebSocket {
FriendMessage message = FriendMessage.buildFriendMessage(bean.get("content"), userid, targetguserid);
// 禁言检查
if (intCache.getBit("im:banned-talk", userid)) {
if (redisCache.getBit("im:banned-talk", userid)) {
sendTip("发送失败,你已被禁言", mck, 3002);
return;
}
@ -139,7 +138,7 @@ public class ImChatWebSocket extends WebSocket {
String mck = extmap.get("mck");
// 禁言检查
if (intCache.getBit("im:banned-talk", userid)) {
if (redisCache.getBit("im:banned-talk", userid)) {
sendTip("发送失败,你已被禁言", mck, 3002);
return;
}
@ -172,7 +171,7 @@ public class ImChatWebSocket extends WebSocket {
public CompletableFuture onClose(int code, String reason) {
final int userid = (int) getUserid();
getLogger().info("im:state-change:" + userid + "---close---" + code + "---" + reason);
intCache.setBit("im:user", userid, false);
redisCache.setBit("im:user", userid, false);
// 推送离线状态
zhub.publish("im:offline:" + getAttribute("appid"), getAttribute("guserid"));
// 取消用户订阅

View File

@ -13,7 +13,6 @@ import org.redkale.util.AnyValue;
import org.redkale.util.Comment;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
import org.redkalex.cache.redis.MyRedisCacheSource;
import javax.annotation.Resource;
import java.util.BitSet;
@ -32,11 +31,6 @@ public class ImMessageMonitor extends BaseService {
@Resource(name = "im_chat")
protected WebSocketNode wsnode;
@Resource(name = "int_cache")
protected MyRedisCacheSource<Integer> intCache;
@Resource(name = "long_cache")
protected MyRedisCacheSource<Long> longCache;
protected BitSet bitSet = new BitSet(); // IM用户连接当前实例记录
protected ConcurrentHashMap<String, Set<Integer>> channelSubscribe = new ConcurrentHashMap<>();
@ -122,7 +116,7 @@ public class ImMessageMonitor extends BaseService {
}
// todo并发场景下会影响拉取离线消息
longCache.setHm("im:channel:" + x.buildChannelid(), uid, System.currentTimeMillis());
redisCache.hset("im:channel:" + x.buildChannelid(), uid + "", System.currentTimeMillis());
});
});
});