修复:appname 名称丢失,rpc调用失败bug

This commit is contained in:
梁显优 2023-05-23 13:24:18 +08:00
parent d40d4e6e5d
commit 4986fafdd2
8 changed files with 246 additions and 100 deletions

View File

@ -4,7 +4,6 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Resourcable;
import org.redkale.util.TypeToken;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -18,8 +17,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
protected JsonConvert convert = JsonConvert.root();
@Resource(name = "APP_NAME")
protected String APP_NAME = "";
protected static String APP_NAME = "";
private Map<String, EventType> eventMap = new ConcurrentHashMap<>();

View File

@ -58,15 +58,7 @@ public class Rpc<T> {
return ruk.split("::")[0];
}
@Deprecated(since = "2023.04.15")
public <R> RpcResult<R> buildResp() {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
return response;
}
@Deprecated(since = "2023.04.15")
public <R> RpcResult<R> buildResp(int retcode, String retinfo) {
public <R> RpcResult<R> render(int retcode, String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(retcode);
@ -74,23 +66,6 @@ public class Rpc<T> {
return response;
}
@Deprecated(since = "2023.04.15")
public <R> RpcResult<R> buildError(String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(100);
response.setRetinfo(retinfo);
return response;
}
@Deprecated(since = "2023.04.15")
public <R> RpcResult<R> buildResp(R result) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setResult(result);
return response;
}
public <R> RpcResult<R> render() {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);

View File

@ -56,9 +56,14 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
};*/
private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
/*
public ZHubClient() {
logger.info("ZHubClient:" + (application != null ? application.getName() : "NULL"));
}*/
@Override
public void init(AnyValue config) {
APP_NAME = application.getName();
/*if (!preInit()) {
return;
}*/
@ -617,7 +622,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return;
}
RpcResult rpcResult = rpc.buildResp(505, "请求超时");
RpcResult rpcResult = rpc.render(505, "请求超时");
rpc.setRpcResult(rpcResult);
logger.warning("rpc timeout: " + convert.convertTo(rpc));
rpc.notify();
@ -630,7 +635,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} catch (InterruptedException e) {
e.printStackTrace();
// call error
RpcResult rpcResult = rpc.buildResp(501, "请求失败");
RpcResult rpcResult = rpc.render(501, "请求失败");
rpc.setRpcResult(rpcResult);
}
return rpc.getRpcResult();
@ -701,7 +706,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
publish(rpc.getBackTopic(), result);
} catch (Exception e) {
logger.log(Level.WARNING, "rpc call consumer error: " + v, e);
publish(rpc.getBackTopic(), rpc.buildError("服务调用失败!"));
publish(rpc.getBackTopic(), rpc.retError("服务调用失败!"));
}
// back
};

View File

@ -20,6 +20,179 @@ public class MyRedisCacheSource extends RedisCacheSource {
public void init(AnyValue conf) {
super.init(conf);
}
/*
//--------------------- zset ------------------------------
public int getZrank(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Long t = (Long) send("ZRANK", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return t == null ? -1 : (int) (long) t;
}
public int getZrevrank(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Long t = (Long) send("ZREVRANK", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return t == null ? -1 : (int) (long) t;
}
//ZRANGE/ZREVRANGE key start stop
public List<V> getZset(String key) {
byte[][] bytes = Stream.of(key, 0, -1).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List<V> vs = (List<V>) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
List<V> vs2 = new ArrayList(vs.size());
for (int i = 0; i < vs.size(); ++i) {
if (i % 2 == 1) {
vs2.add(this.convert.convertFrom(this.objValueType, String.valueOf(vs.get(i))));
} else {
vs2.add(vs.get(i));
}
}
return vs2;
}
public List<V> getZset(String key, int offset, int limit) {
byte[][] bytes = Stream.of(key, offset, offset + limit - 1).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List<V> vs = (List<V>) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
List<V> vs2 = new ArrayList(vs.size());
for (int i = 0; i < vs.size(); ++i) {
if (i % 2 == 1) {
vs2.add(this.convert.convertFrom(this.objValueType, String.valueOf(vs.get(i))));
} else {
vs2.add(vs.get(i));
}
}
return vs2;
}
public LinkedHashMap<V, Long> getZsetLongScore(String key) {
LinkedHashMap<V, Double> map = getZsetDoubleScore(key);
if (map.isEmpty()) {
return new LinkedHashMap<>();
}
LinkedHashMap<V, Long> map2 = new LinkedHashMap<>(map.size());
map.forEach((k, v) -> map2.put(k, (long) (double) v));
return map2;
}
public LinkedHashMap<V, Long> getZsetItemsLongScore(String key) {
byte[][] bytes = Stream.of(key, 0, -1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Long> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), (long) Double.parseDouble((String) vs.get(i + 1)));
}
return map;
}
public Long getZsetLongScore(String key, V v) {
Double score = getZsetDoubleScore(key, v);
if (score == null) {
return null;
}
return (long) (double) score;
}
public LinkedHashMap<V, Double> getZsetDoubleScore(String key) {
byte[][] bytes = Stream.of(key, 0, -1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Double> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), Double.parseDouble((String) vs.get(i + 1)));
}
return map;
}
public Double getZsetDoubleScore(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Serializable zscore = send("ZSCORE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
if (zscore == null) {
return null;
}
return Double.parseDouble(String.valueOf(zscore));
}
public LinkedHashMap<V, Long> getZsetLongScore(String key, int offset, int limit) {
byte[][] bytes = Stream.of(key, offset, offset + limit - 1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Long> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), (long) Double.parseDouble((String) vs.get(i + 1)));
}
return map;
}
public LinkedHashMap<V, Double> getZsetDoubleScore(String key, int offset, int limit) {
byte[][] bytes = Stream.of(key, offset, offset + limit - 1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Double> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), Double.parseDouble(vs.get(i + 1) + ""));
}
return map;
}
* */
// --------------------
public <N extends Number> void zadd(String key, Map<Serializable, N> kv) {
if (kv == null || kv.isEmpty()) {
return;
}
List<Serializable> args = new ArrayList();
kv.forEach((k, v) -> {
args.add(k);
args.add(v);
});
sendAsync("ZADD", key, args.toArray(Serializable[]::new)).join();
}
public <N extends Number> double zincr(String key, Serializable number, N n) {
return sendAsync("ZINCRBY", key, number, n).thenApply(x -> x.getDoubleValue(0d)).join();
}
public void zrem(String key, Serializable... vs) {
sendAsync("ZREM", key, vs).join();
}
/*public <T> List<T> zexists(String key, T... fields) {
if (fields == null || fields.length == 0) {
return new ArrayList<>();
}
List<String> para = new ArrayList<>();
para.add("" +
" local key = KEYS[1];" +
" local args = ARGV;" +
" local result = {};" +
" for i,v in ipairs(args) do" +
" local inx = redis.call('ZREVRANK', key, v);" +
" if(inx) then" +
" table.insert(result,1,v);" +
" end" +
" end" +
" return result;");
para.add("1");
para.add(key);
for (Object field : fields) {
para.add(String.valueOf(field));
}
// todo:
//sendAsync("EVAL", null, para.toArray(Serializable[]::new)).thenApply(x -> x.).join();
return null;
}*/
//--------------------- bit ------------------------------
public boolean getBit(String key, int offset) {
@ -70,6 +243,10 @@ public class MyRedisCacheSource extends RedisCacheSource {
return get(key, String.class);
}
public void set(String key, Serializable value) {
sendAsync("SET", key, value).join();
}
//--------------------- set ------------------------------
public <T> void sadd(String key, Collection<T> args) {
saddAsync(key, args.toArray(Serializable[]::new)).join();
@ -92,6 +269,19 @@ public class MyRedisCacheSource extends RedisCacheSource {
}
//--------------------- hm ------------------------------
/*public Long incrHm(String key, String field, int value) {
return sendAsync("HINCRBY", key, field, value).thenApply(x -> x.getLongValue(0l)).join();
}
public Double incrHm(String key, String field, double value) {
return sendAsync("HINCRBYFLOAT", key, field, value).thenApply(x -> x.getDoubleValue(0d)).join();
}*/
public void setHm(String key, String field, Serializable value) {
setHmsAsync(key, Map.of(field, value)).join();
}
public void setHms(String key, Map kv) {
setHmsAsync(key, kv).join();
}
@ -130,8 +320,24 @@ public class MyRedisCacheSource extends RedisCacheSource {
Map<String, T> map = new HashMap<>(field.length);
for (int i = 0; i < field.length; i++) {
if (list.get(i) == null) {
continue;
}
map.put(field[i], (T) list.get(i));
}
return map;
}
/*public Map<String, Object> getHmall(String key) {
List<String> list = null; // TODO:
Map<String, Object> map = new HashMap<>();
if (list.isEmpty()) {
return map;
}
for (int i = 0; i + 1 < list.size(); i += 2) {
map.put((String) list.get(i), list.get(i + 1));
}
return map;
}*/
}

View File

@ -1,4 +1,4 @@
package org.redkalex.cache.redis.test;
package net.tccn.cache;
import org.redkale.net.AsyncIOGroup;
import org.redkale.util.AnyValue;
@ -75,7 +75,7 @@ public class RedisTest {
//source.getexLong()
source.setHms("hmx", Map.of("a", "5","b", "51", "c", "ads"));
source.setHms("hmx", Map.of("a", "5", "b", "51", "c", "ads"));
List<Serializable> hmget = source.hmget("hmx", int.class, "a");
@ -85,7 +85,23 @@ public class RedisTest {
System.out.println(hm);
Map<String, String> hms = source.getHms("hmx", "a", "b");
System.out.println(hms);
System.out.println("hmx:" + hms);
/*System.out.println("======================================================");
System.out.println(source.incrHm("hmx", "a", -6.0));
hms = source.getHms("hmx", "a", "b");
System.out.println("hmxa+1后的结果 " + hms);*/
System.out.println("======================================================");
source.setHm("hmx", "c", 12);
hms = source.getHms("hmx", "a", "b", "c", "d", "a");
System.out.println("hmx设置 c=12 后的结果 " + hms);
System.out.println("======================================================");
Double c = source.getHm("hmx", double.class, "c");
System.out.println("hmx 中 c 值:" + c);
/*Map<String, Object> hmx = source.getHmall("hmx");
System.out.println("Hmall" + hmx);*/

View File

@ -1,11 +1,8 @@
package com.zdemo.test;
package net.tccn.mq;
import com.zdemo.Event;
import com.zdemo.IProducer;
import com.zdemo.zhub.Delays;
import net.tccn.Event;
import net.tccn.timer.Timers;
import org.junit.Test;
import org.redkale.boot.Application;
import org.redkale.convert.json.JsonConvert;
import java.util.ArrayList;
@ -33,13 +30,6 @@ public class AppTest {
System.out.println(str.length());
//启动并开启消费监听
MyConsumer consumer = Application.singleton(MyConsumer.class);
consumer.subscribe("a", strx -> {
logger.info("我收到了消息 a 事件:" + str);
});
/*consumer.timer("a", () -> {
System.out.println(Utility.now() + " timer a 执行了");
try {
@ -83,20 +73,7 @@ public class AppTest {
@Test
public void runProducer() {
try {
MyConsumer producer = Application.singleton(MyConsumer.class);
for (int i = 0; i < 10_0000; i++) {
producer.publish("a-1", i);
}
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();
@ -148,21 +125,6 @@ public class AppTest {
System.out.println(fun.toString());
}
@Test
public void yy() {
IProducer producer = null;
try {
producer = Application.singleton(MyConsumer.class);
for (int i = 0; i < 100; i++) {
producer.publish("x", "x");
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// (27+5*23)/(63-59)
// [27+5*23] [/] [63-59]
@ -380,10 +342,10 @@ public class AppTest {
@Test
public void delay() {
DelayQueue<Delays> delayQueue = new DelayQueue<>();
DelayQueue<com.zdemo.zhub.Delays> delayQueue = new DelayQueue<>();
logger.info("加入延时任务1");
delayQueue.add(new Delays(5000, () -> {
delayQueue.add(new com.zdemo.zhub.Delays(5000, () -> {
logger.info("任务1 延时任务执行了!");
}));
@ -394,13 +356,13 @@ public class AppTest {
}
logger.info("加入延时任务2");
delayQueue.add(new Delays(5000, () -> {
delayQueue.add(new com.zdemo.zhub.Delays(5000, () -> {
logger.info("任务2 延时任务执行了!");
}));
try {
while (true) {
Delays delay = delayQueue.take();
com.zdemo.zhub.Delays delay = delayQueue.take();
delay.run();
}

View File

@ -1,9 +1,8 @@
package com.zdemo.test;
package net.tccn.mq;
import com.zdemo.IType;
import com.zdemo.ZhubProvider;
import com.zdemo.zhub.RpcResult;
import com.zdemo.zhub.ZHubClient;
import net.tccn.IType;
import net.tccn.zhub.RpcResult;
import net.tccn.zhub.ZHubClient;
import org.redkale.net.http.RestMapping;
import org.redkale.net.http.RestService;
import org.redkale.service.Service;
@ -48,7 +47,7 @@ public class HelloService implements Service {
zhub.rpcSubscribe("y", new TypeToken<String>() {
}, r -> {
return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok"));
return r.render(Map.of("v", r.getValue().toUpperCase() + ": Ok"));
});
zhub.subscribe("sport:reqtime", x -> {

View File

@ -1,15 +0,0 @@
package com.zdemo.test;
import com.zdemo.zhub.ZHubClient;
public class MyConsumer extends ZHubClient {
public String getGroupid() {
return "group-test"; //消费组名称
}
@Override
protected boolean preInit() {
return true;
}
}