diff --git a/src/net/tccn/AbstractConsumer.java b/src/net/tccn/AbstractConsumer.java index a93de9f..f126411 100644 --- a/src/net/tccn/AbstractConsumer.java +++ b/src/net/tccn/AbstractConsumer.java @@ -4,7 +4,6 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.util.Resourcable; import org.redkale.util.TypeToken; -import javax.annotation.Resource; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -18,8 +17,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon protected JsonConvert convert = JsonConvert.root(); - @Resource(name = "APP_NAME") - protected String APP_NAME = ""; + protected static String APP_NAME = ""; private Map eventMap = new ConcurrentHashMap<>(); diff --git a/src/net/tccn/zhub/Rpc.java b/src/net/tccn/zhub/Rpc.java index 90c1e6a..5d1aaeb 100644 --- a/src/net/tccn/zhub/Rpc.java +++ b/src/net/tccn/zhub/Rpc.java @@ -58,15 +58,7 @@ public class Rpc { return ruk.split("::")[0]; } - @Deprecated(since = "2023.04.15") - public RpcResult buildResp() { - RpcResult response = new RpcResult<>(); - response.setRuk(ruk); - return response; - } - - @Deprecated(since = "2023.04.15") - public RpcResult buildResp(int retcode, String retinfo) { + public RpcResult render(int retcode, String retinfo) { RpcResult response = new RpcResult<>(); response.setRuk(ruk); response.setRetcode(retcode); @@ -74,23 +66,6 @@ public class Rpc { return response; } - @Deprecated(since = "2023.04.15") - public RpcResult buildError(String retinfo) { - RpcResult response = new RpcResult<>(); - response.setRuk(ruk); - response.setRetcode(100); - response.setRetinfo(retinfo); - return response; - } - - @Deprecated(since = "2023.04.15") - public RpcResult buildResp(R result) { - RpcResult response = new RpcResult<>(); - response.setRuk(ruk); - response.setResult(result); - return response; - } - public RpcResult render() { RpcResult response = new RpcResult<>(); response.setRuk(ruk); diff --git a/src/net/tccn/zhub/ZHubClient.java b/src/net/tccn/zhub/ZHubClient.java index 57f7b3f..de3150f 100644 --- a/src/net/tccn/zhub/ZHubClient.java +++ b/src/net/tccn/zhub/ZHubClient.java @@ -56,9 +56,14 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer };*/ private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient +/* + public ZHubClient() { + logger.info("ZHubClient:" + (application != null ? application.getName() : "NULL")); + }*/ @Override public void init(AnyValue config) { + APP_NAME = application.getName(); /*if (!preInit()) { return; }*/ @@ -617,7 +622,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return; } - RpcResult rpcResult = rpc.buildResp(505, "请求超时"); + RpcResult rpcResult = rpc.render(505, "请求超时"); rpc.setRpcResult(rpcResult); logger.warning("rpc timeout: " + convert.convertTo(rpc)); rpc.notify(); @@ -630,7 +635,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } catch (InterruptedException e) { e.printStackTrace(); // call error - RpcResult rpcResult = rpc.buildResp(501, "请求失败"); + RpcResult rpcResult = rpc.render(501, "请求失败"); rpc.setRpcResult(rpcResult); } return rpc.getRpcResult(); @@ -701,7 +706,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer publish(rpc.getBackTopic(), result); } catch (Exception e) { logger.log(Level.WARNING, "rpc call consumer error: " + v, e); - publish(rpc.getBackTopic(), rpc.buildError("服务调用失败!")); + publish(rpc.getBackTopic(), rpc.retError("服务调用失败!")); } // back }; diff --git a/src/org/redkalex/cache/redis/MyRedisCacheSource.java b/src/org/redkalex/cache/redis/MyRedisCacheSource.java index 3855770..493c350 100644 --- a/src/org/redkalex/cache/redis/MyRedisCacheSource.java +++ b/src/org/redkalex/cache/redis/MyRedisCacheSource.java @@ -20,6 +20,179 @@ public class MyRedisCacheSource extends RedisCacheSource { public void init(AnyValue conf) { super.init(conf); } +/* +//--------------------- zset ------------------------------ + + public int getZrank(String key, V v) { + byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + Long t = (Long) send("ZRANK", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + + return t == null ? -1 : (int) (long) t; + } + + public int getZrevrank(String key, V v) { + byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + Long t = (Long) send("ZREVRANK", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + + return t == null ? -1 : (int) (long) t; + } + + //ZRANGE/ZREVRANGE key start stop + public List getZset(String key) { + byte[][] bytes = Stream.of(key, 0, -1).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + List vs2 = new ArrayList(vs.size()); + + for (int i = 0; i < vs.size(); ++i) { + if (i % 2 == 1) { + vs2.add(this.convert.convertFrom(this.objValueType, String.valueOf(vs.get(i)))); + } else { + vs2.add(vs.get(i)); + } + } + + return vs2; + } + + public List getZset(String key, int offset, int limit) { + byte[][] bytes = Stream.of(key, offset, offset + limit - 1).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + List vs2 = new ArrayList(vs.size()); + + for (int i = 0; i < vs.size(); ++i) { + if (i % 2 == 1) { + vs2.add(this.convert.convertFrom(this.objValueType, String.valueOf(vs.get(i)))); + } else { + vs2.add(vs.get(i)); + } + } + + return vs2; + } + + public LinkedHashMap getZsetLongScore(String key) { + LinkedHashMap map = getZsetDoubleScore(key); + if (map.isEmpty()) { + return new LinkedHashMap<>(); + } + + LinkedHashMap map2 = new LinkedHashMap<>(map.size()); + map.forEach((k, v) -> map2.put(k, (long) (double) v)); + return map2; + } + + public LinkedHashMap getZsetItemsLongScore(String key) { + byte[][] bytes = Stream.of(key, 0, -1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + List vs = (List) send("ZRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + + LinkedHashMap map = new LinkedHashMap<>(); + for (int i = 0; i < vs.size(); i += 2) { + map.put((V) vs.get(i), (long) Double.parseDouble((String) vs.get(i + 1))); + } + return map; + } + + public Long getZsetLongScore(String key, V v) { + Double score = getZsetDoubleScore(key, v); + if (score == null) { + return null; + } + return (long) (double) score; + } + + public LinkedHashMap getZsetDoubleScore(String key) { + byte[][] bytes = Stream.of(key, 0, -1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + + LinkedHashMap map = new LinkedHashMap<>(); + for (int i = 0; i < vs.size(); i += 2) { + map.put((V) vs.get(i), Double.parseDouble((String) vs.get(i + 1))); + } + return map; + } + + public Double getZsetDoubleScore(String key, V v) { + byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + Serializable zscore = send("ZSCORE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + if (zscore == null) { + return null; + } + + return Double.parseDouble(String.valueOf(zscore)); + } + + public LinkedHashMap getZsetLongScore(String key, int offset, int limit) { + byte[][] bytes = Stream.of(key, offset, offset + limit - 1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + + LinkedHashMap map = new LinkedHashMap<>(); + for (int i = 0; i < vs.size(); i += 2) { + map.put((V) vs.get(i), (long) Double.parseDouble((String) vs.get(i + 1))); + } + return map; + } + + public LinkedHashMap getZsetDoubleScore(String key, int offset, int limit) { + byte[][] bytes = Stream.of(key, offset, offset + limit - 1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new); + List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join(); + + LinkedHashMap map = new LinkedHashMap<>(); + for (int i = 0; i < vs.size(); i += 2) { + map.put((V) vs.get(i), Double.parseDouble(vs.get(i + 1) + "")); + } + return map; + } +* */ + + // -------------------- + public void zadd(String key, Map kv) { + if (kv == null || kv.isEmpty()) { + return; + } + List args = new ArrayList(); + kv.forEach((k, v) -> { + args.add(k); + args.add(v); + }); + + sendAsync("ZADD", key, args.toArray(Serializable[]::new)).join(); + } + + public double zincr(String key, Serializable number, N n) { + return sendAsync("ZINCRBY", key, number, n).thenApply(x -> x.getDoubleValue(0d)).join(); + } + + public void zrem(String key, Serializable... vs) { + sendAsync("ZREM", key, vs).join(); + } + + /*public List zexists(String key, T... fields) { + if (fields == null || fields.length == 0) { + return new ArrayList<>(); + } + List para = new ArrayList<>(); + para.add("" + + " local key = KEYS[1];" + + " local args = ARGV;" + + " local result = {};" + + " for i,v in ipairs(args) do" + + " local inx = redis.call('ZREVRANK', key, v);" + + " if(inx) then" + + " table.insert(result,1,v);" + + " end" + + " end" + + " return result;"); + para.add("1"); + para.add(key); + for (Object field : fields) { + para.add(String.valueOf(field)); + } + + // todo: + //sendAsync("EVAL", null, para.toArray(Serializable[]::new)).thenApply(x -> x.).join(); + + return null; + }*/ //--------------------- bit ------------------------------ public boolean getBit(String key, int offset) { @@ -70,6 +243,10 @@ public class MyRedisCacheSource extends RedisCacheSource { return get(key, String.class); } + public void set(String key, Serializable value) { + sendAsync("SET", key, value).join(); + } + //--------------------- set ------------------------------ public void sadd(String key, Collection args) { saddAsync(key, args.toArray(Serializable[]::new)).join(); @@ -92,6 +269,19 @@ public class MyRedisCacheSource extends RedisCacheSource { } //--------------------- hm ------------------------------ + + /*public Long incrHm(String key, String field, int value) { + return sendAsync("HINCRBY", key, field, value).thenApply(x -> x.getLongValue(0l)).join(); + } + + public Double incrHm(String key, String field, double value) { + return sendAsync("HINCRBYFLOAT", key, field, value).thenApply(x -> x.getDoubleValue(0d)).join(); + }*/ + + public void setHm(String key, String field, Serializable value) { + setHmsAsync(key, Map.of(field, value)).join(); + } + public void setHms(String key, Map kv) { setHmsAsync(key, kv).join(); } @@ -130,8 +320,24 @@ public class MyRedisCacheSource extends RedisCacheSource { Map map = new HashMap<>(field.length); for (int i = 0; i < field.length; i++) { + if (list.get(i) == null) { + continue; + } map.put(field[i], (T) list.get(i)); } return map; } + + /*public Map getHmall(String key) { + List list = null; // TODO: + Map map = new HashMap<>(); + if (list.isEmpty()) { + return map; + } + + for (int i = 0; i + 1 < list.size(); i += 2) { + map.put((String) list.get(i), list.get(i + 1)); + } + return map; + }*/ } diff --git a/test/net.tccn/cache/RedisTest.java b/test/net.tccn/cache/RedisTest.java index a28bae5..1547d99 100644 --- a/test/net.tccn/cache/RedisTest.java +++ b/test/net.tccn/cache/RedisTest.java @@ -1,4 +1,4 @@ -package org.redkalex.cache.redis.test; +package net.tccn.cache; import org.redkale.net.AsyncIOGroup; import org.redkale.util.AnyValue; @@ -75,7 +75,7 @@ public class RedisTest { //source.getexLong() - source.setHms("hmx", Map.of("a", "5","b", "51", "c", "ads")); + source.setHms("hmx", Map.of("a", "5", "b", "51", "c", "ads")); List hmget = source.hmget("hmx", int.class, "a"); @@ -85,7 +85,23 @@ public class RedisTest { System.out.println(hm); Map hms = source.getHms("hmx", "a", "b"); - System.out.println(hms); + System.out.println("hmx:" + hms); + + /*System.out.println("======================================================"); + System.out.println(source.incrHm("hmx", "a", -6.0)); + hms = source.getHms("hmx", "a", "b"); + System.out.println("hmx:a+1后的结果 " + hms);*/ + System.out.println("======================================================"); + source.setHm("hmx", "c", 12); + hms = source.getHms("hmx", "a", "b", "c", "d", "a"); + System.out.println("hmx:设置 c=12 后的结果 " + hms); + System.out.println("======================================================"); + Double c = source.getHm("hmx", double.class, "c"); + System.out.println("hmx 中 c 值:" + c); + + /*Map hmx = source.getHmall("hmx"); + System.out.println("Hmall:" + hmx);*/ + diff --git a/test/net.tccn/mq/AppTest.java b/test/net.tccn/mq/AppTest.java index 428bb92..640f3d3 100644 --- a/test/net.tccn/mq/AppTest.java +++ b/test/net.tccn/mq/AppTest.java @@ -1,11 +1,8 @@ -package com.zdemo.test; +package net.tccn.mq; -import com.zdemo.Event; -import com.zdemo.IProducer; -import com.zdemo.zhub.Delays; +import net.tccn.Event; import net.tccn.timer.Timers; import org.junit.Test; -import org.redkale.boot.Application; import org.redkale.convert.json.JsonConvert; import java.util.ArrayList; @@ -33,13 +30,6 @@ public class AppTest { System.out.println(str.length()); - //启动并开启消费监听 - MyConsumer consumer = Application.singleton(MyConsumer.class); - - consumer.subscribe("a", strx -> { - logger.info("我收到了消息 a 事件:" + str); - }); - /*consumer.timer("a", () -> { System.out.println(Utility.now() + " timer a 执行了"); try { @@ -83,20 +73,7 @@ public class AppTest { @Test public void runProducer() { - try { - MyConsumer producer = Application.singleton(MyConsumer.class); - for (int i = 0; i < 10_0000; i++) { - producer.publish("a-1", i); - } - try { - Thread.sleep(1_000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } catch (Exception e) { - e.printStackTrace(); - } } private static LinkedBlockingQueue queue = new LinkedBlockingQueue(); @@ -148,21 +125,6 @@ public class AppTest { System.out.println(fun.toString()); } - @Test - public void yy() { - IProducer producer = null; - try { - producer = Application.singleton(MyConsumer.class); - - for (int i = 0; i < 100; i++) { - - producer.publish("x", "x"); - Thread.sleep(1000); - } - } catch (Exception e) { - e.printStackTrace(); - } - } // (27+5*23)/(63-59) // [27+5*23] [/] [63-59] @@ -380,10 +342,10 @@ public class AppTest { @Test public void delay() { - DelayQueue delayQueue = new DelayQueue<>(); + DelayQueue delayQueue = new DelayQueue<>(); logger.info("加入延时任务1"); - delayQueue.add(new Delays(5000, () -> { + delayQueue.add(new com.zdemo.zhub.Delays(5000, () -> { logger.info("任务1 延时任务执行了!"); })); @@ -394,13 +356,13 @@ public class AppTest { } logger.info("加入延时任务2"); - delayQueue.add(new Delays(5000, () -> { + delayQueue.add(new com.zdemo.zhub.Delays(5000, () -> { logger.info("任务2 延时任务执行了!"); })); try { while (true) { - Delays delay = delayQueue.take(); + com.zdemo.zhub.Delays delay = delayQueue.take(); delay.run(); } diff --git a/test/net.tccn/mq/HelloService.java b/test/net.tccn/mq/HelloService.java index 35c12d3..c5d0171 100644 --- a/test/net.tccn/mq/HelloService.java +++ b/test/net.tccn/mq/HelloService.java @@ -1,9 +1,8 @@ -package com.zdemo.test; +package net.tccn.mq; -import com.zdemo.IType; -import com.zdemo.ZhubProvider; -import com.zdemo.zhub.RpcResult; -import com.zdemo.zhub.ZHubClient; +import net.tccn.IType; +import net.tccn.zhub.RpcResult; +import net.tccn.zhub.ZHubClient; import org.redkale.net.http.RestMapping; import org.redkale.net.http.RestService; import org.redkale.service.Service; @@ -48,7 +47,7 @@ public class HelloService implements Service { zhub.rpcSubscribe("y", new TypeToken() { }, r -> { - return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok")); + return r.render(Map.of("v", r.getValue().toUpperCase() + ": Ok")); }); zhub.subscribe("sport:reqtime", x -> { diff --git a/test/net.tccn/mq/MyConsumer.java b/test/net.tccn/mq/MyConsumer.java deleted file mode 100644 index 0bfef54..0000000 --- a/test/net.tccn/mq/MyConsumer.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.zdemo.test; - -import com.zdemo.zhub.ZHubClient; - -public class MyConsumer extends ZHubClient { - - public String getGroupid() { - return "group-test"; //消费组名称 - } - - @Override - protected boolean preInit() { - return true; - } -}