diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index 9a3577a..a15e4b7 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -3,6 +3,7 @@ package com.zdemo; import org.redkale.convert.json.JsonConvert; import org.redkale.util.TypeToken; +import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -14,6 +15,11 @@ import java.util.function.Consumer; */ public abstract class AbstractConsumer implements IConsumer { + protected JsonConvert convert = JsonConvert.root(); + + @Resource(name = "APP_NAME") + protected String APP_NAME = ""; + private Map eventMap = new HashMap<>(); protected abstract String getGroupid(); @@ -37,7 +43,7 @@ public abstract class AbstractConsumer implements IConsumer { if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) { data = value; } else { - data = JsonConvert.root().convertFrom(eventType.typeToken.getType(), value); + data = convert.convertFrom(eventType.typeToken.getType(), value); } eventType.accept(data); diff --git a/src/com/zdemo/ZhubListener.java b/src/com/zdemo/ZhubListener.java index f46ae68..8fc79bf 100644 --- a/src/com/zdemo/ZhubListener.java +++ b/src/com/zdemo/ZhubListener.java @@ -7,6 +7,7 @@ import org.redkale.util.AnyValue; import org.redkale.util.RedkaleClassLoader; import org.redkale.util.ResourceFactory; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.CompletableFuture; /** @@ -30,14 +31,11 @@ public class ZhubListener implements ApplicationListener { String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient"); try { Class aClass = classLoader.loadClass(clazz); - Service obj = (Service) aClass.newInstance(); + Service obj = (Service) aClass.getDeclaredConstructor().newInstance(); + application.getResourceFactory().inject(obj); obj.init(zhub); resourceFactory.register(zhub.get("name"), aClass, obj); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (InstantiationException e) { + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { e.printStackTrace(); } } diff --git a/src/com/zdemo/zhub/Lock.java b/src/com/zdemo/zhub/Lock.java new file mode 100644 index 0000000..141cc3c --- /dev/null +++ b/src/com/zdemo/zhub/Lock.java @@ -0,0 +1,20 @@ +package com.zdemo.zhub; + +// ================================================== lock ================================================== +public class Lock { + private String name; + private String uuid; + private int duration; + private ZHubClient hubClient; + + protected Lock(String name, String uuid, int duration, ZHubClient hubClient) { + this.name = name; + this.uuid = uuid; + this.duration = duration; + this.hubClient = hubClient; + } + + public void unLock() { + hubClient.send("unlock", name, uuid); + } +} diff --git a/src/com/zdemo/zhub/Rpc.java b/src/com/zdemo/zhub/Rpc.java new file mode 100644 index 0000000..f17f4cf --- /dev/null +++ b/src/com/zdemo/zhub/Rpc.java @@ -0,0 +1,105 @@ +package com.zdemo.zhub; + +import org.redkale.convert.ConvertColumn; +import org.redkale.convert.json.JsonConvert; + +public class Rpc { + private String ruk; // request unique key: + private String topic; // call topic + private T value; // call paras + + private RpcResponse response; + + public Rpc() { + } + + public Rpc(String appname, String ruk, String topic, Object value) { + this.ruk = appname + "::" + ruk; + this.topic = topic; + this.value = (T) JsonConvert.root().convertTo(value); + } + + public String getRuk() { + return ruk; + } + + public void setRuk(String ruk) { + this.ruk = ruk; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + public RpcResponse getResponse() { + return response; + } + + public void setResponse(RpcResponse response) { + this.response = response; + } + + @ConvertColumn(ignore = true) + public String getBackTopic() { + return ruk.split("::")[0]; + } + + public RpcResponse buildResp() { + RpcResponse response = new RpcResponse<>(); + response.setRuk(ruk); + return response; + } + + public RpcResponse buildResp(int retcode, String retinfo) { + RpcResponse response = new RpcResponse<>(); + response.setRuk(ruk); + response.setRetcode(retcode); + response.setRetinfo(retinfo); + return response; + } + + public RpcResponse buildError(String retinfo) { + RpcResponse response = new RpcResponse<>(); + response.setRuk(ruk); + response.setRetcode(100); + response.setRetinfo(retinfo); + return response; + } + + public RpcResponse buildResp(R result) { + RpcResponse response = new RpcResponse<>(); + response.setRuk(ruk); + response.setResult(result); + return response; + } + + @ConvertColumn(ignore = true) + public int getRetcode() { + if (this.response == null) { + return -1; + } + + return response.getRetcode(); + } + + @ConvertColumn(ignore = true) + public String getRetinfo() { + if (this.response == null) { + return ""; + } + + return response.getRetinfo(); + } +} diff --git a/src/com/zdemo/zhub/RpcResponse.java b/src/com/zdemo/zhub/RpcResponse.java new file mode 100644 index 0000000..6cc5bb7 --- /dev/null +++ b/src/com/zdemo/zhub/RpcResponse.java @@ -0,0 +1,40 @@ +package com.zdemo.zhub; + +public class RpcResponse { + private String ruk; + private int retcode; + private String retinfo; + private R result; + + public String getRuk() { + return ruk; + } + + public void setRuk(String ruk) { + this.ruk = ruk; + } + + public int getRetcode() { + return retcode; + } + + public void setRetcode(int retcode) { + this.retcode = retcode; + } + + public String getRetinfo() { + return retinfo; + } + + public void setRetinfo(String retinfo) { + this.retinfo = retinfo; + } + + public R getResult() { + return result; + } + + public void setResult(R result) { + this.result = result; + } +} diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 203235a..fda403a 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -4,10 +4,8 @@ import com.zdemo.AbstractConsumer; import com.zdemo.Event; import com.zdemo.IConsumer; import com.zdemo.IProducer; -import org.redkale.convert.json.JsonConvert; import org.redkale.service.Service; -import org.redkale.util.AnyValue; -import org.redkale.util.AutoLoad; +import org.redkale.util.*; import javax.annotation.Resource; import java.io.BufferedReader; @@ -17,10 +15,15 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,13 +41,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer @Resource(name = "property.zhub.groupid") private String groupid = ""; - private ReentrantLock lock = new ReentrantLock(); + //private ReentrantLock lock = new ReentrantLock(); private Socket client; private OutputStream writer; private BufferedReader reader; private final LinkedBlockingQueue timerQueue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG + private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG + private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG private BiConsumer threadBuilder = (r, n) -> { for (int i = 0; i < n; i++) { @@ -91,7 +97,30 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer reader.readLine(); //$n len(value) String value = reader.readLine(); // value - topicQueue.put(Event.of(topic, value)); + // lock msg + if ("lock".equals(topic)) { + Lock lock = lockTag.get(value); + if (lock != null) { + synchronized (lock) { + lock.notifyAll(); + } + } + continue; + } + // rpc back msg + if (APP_NAME.equals(topic)) { + rpcBackQueue.add(Event.of(topic, value)); + continue; + } + + // rpc call msg + if (rpcTopics.contains(topic)) { + rpcCallQueue.add(Event.of(topic, value)); + continue; + } + + // oth msg + topicQueue.add(Event.of(topic, value)); } // timer 消息 @@ -104,14 +133,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer reader.readLine(); //$n len(key) String topic = reader.readLine(); // name - timerQueue.put(timerMap.get(topic)); + timerQueue.add(timerMap.get(topic)); } } catch (IOException e) { if (e instanceof SocketException) { initSocket(Integer.MAX_VALUE); } - } catch (InterruptedException e) { - e.printStackTrace(); } } }).start(); @@ -153,11 +180,81 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } }, 1); + // rpc back + threadBuilder.accept(() -> { + while (true) { + Event event = null; + try { + if ((event = rpcBackQueue.take()) == null) { + continue; + } + //if (event) + rpcAccept(event.value); + logger.info(String.format("rpc-back:[%s] => %s", event.topic, event.value)); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); + } + } + }, 1); + + // rpc call + threadBuilder.accept(() -> { + while (true) { + Event event = null; + try { + if ((event = rpcCallQueue.take()) == null) { + continue; + } + logger.info(String.format("rpc-call:[%s] => %s", event.topic, event.value)); + accept(event.topic, event.value); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); + } + } + }, 1); + + // send msg + threadBuilder.accept(() -> { + while (true) { + String msg = null; + try { + if ((msg = sendMsgQueue.take()) == null) { + continue; + } + writer.write(msg.getBytes()); + writer.flush(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); + } + } + }, 1); + } // --------------------- - // 消息发送 - private boolean send(String... data) { + // -- 消息发送 -- + protected boolean send(String... data) { + if (data.length == 1) { + sendMsgQueue.add(data[0] + "\r\n"); + } else if (data.length > 1) { + StringBuffer buf = new StringBuffer(); + buf.append("*" + data.length + "\r\n"); + for (String d : data) { + buf.append("$" + d.length() + "\r\n"); + buf.append(d + "\r\n"); + } + sendMsgQueue.add(buf.toString()); + } + return true; + } + + /*protected boolean send(String... data) { try { lock.lock(); if (data.length == 1) { @@ -177,13 +274,13 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer lock.unlock(); } return false; - } + }*/ private String toStr(Object v) { if (v instanceof String) { return (String) v; } - return JsonConvert.root().convertTo(v); + return convert.convertTo(v); } protected boolean initSocket(int retry) { @@ -202,7 +299,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } send("groupid " + groupid); - StringBuffer buf = new StringBuffer("subscribe"); + StringBuffer buf = new StringBuffer("subscribe lock " + APP_NAME); for (String topic : getTopics()) { buf.append(" ").append(topic); } @@ -296,6 +393,25 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer send("subscribe " + topic); //新增订阅 } + // ================================================== lock ================================================== + private Map lockTag = new ConcurrentHashMap<>(); + + public Lock tryLock(String key, int duration) { + String uuid = Utility.uuid(); + Lock lock = new Lock(key, uuid, duration, this); + lockTag.put(uuid, lock); + + try { + // c.send("lock", key, uuid, strconv.Itoa(duration)) + send("lock", key, uuid, String.valueOf(duration)); + synchronized (lock) { + lock.wait(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + return lock; + } // ================================================== timer ================================================== private ConcurrentHashMap timerMap = new ConcurrentHashMap(); @@ -329,4 +445,79 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer public void reloadTimer() { send("cmd", "reload-timer"); } + + // ================================================== rpc ================================================== + // -- 调用端 -- + private Map rpcMap = new ConcurrentHashMap<>(); + private Map rpcRetType = new ConcurrentHashMap<>(); + + @Comment("rpc call") + public CompletableFuture rpc(String topic, Object v) { + return (CompletableFuture) rpc(topic, v, null); + } + + @Comment("rpc call") + public CompletableFuture> rpc(String topic, T v, TypeToken typeToken) { + return CompletableFuture.supplyAsync(() -> { + Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); + rpcMap.put(rpc.getRuk(), rpc); + if (typeToken != null) { + rpcRetType.put(rpc.getRuk(), typeToken); + } + try { + publish(topic, rpc); + synchronized (rpc) { + rpc.wait(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + // todo: 设置请求失败 + } + return rpc; + }); + } + + // RpcResponse: {ruk:xxx-xxxx, retcode:0} + @Comment("rpc call back consumer") + private void rpcAccept(String value) { + RpcResponse resp = convert.convertFrom(new TypeToken>() { + }.getType(), value); + + String ruk = resp.getRuk(); + Rpc rpc = rpcMap.get(ruk); + TypeToken typeToken = rpcRetType.get(ruk); + + Object result = resp.getResult(); + if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName())) { + result = convert.convertFrom(typeToken.getType(), (String) resp.getResult()); + } + + resp.setResult(result); + rpc.setResponse(resp); + synchronized (rpc) { + rpc.notify(); + } + } + + // -- 订阅端 -- + private Set rpcTopics = new HashSet(); + + @Comment("rpc call consumer") + public void rpcSubscribe(String topic, TypeToken typeToken, Function, RpcResponse> fun) { + Consumer consumer = v -> { + Rpc rpc = convert.convertFrom(new TypeToken>() { + }.getType(), v); + + // 参数转换 + T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue()); + rpc.setValue(paras); + RpcResponse response = fun.apply(rpc); + + // back + publish(rpc.getBackTopic(), response); + }; + + rpcTopics.add(topic); + subscribe(topic, consumer); + } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 0d0ad40..e141d1c 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -2,16 +2,17 @@ package com.zdemo.test; import com.zdemo.Event; import com.zdemo.IProducer; +import com.zdemo.zhub.Lock; import org.junit.Test; import org.redkale.boot.Application; import org.redkale.convert.json.JsonConvert; -import org.redkale.util.Utility; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Logger; @@ -27,7 +28,7 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - consumer.subscribe("a", str -> { + /*consumer.subscribe("a", str -> { logger.info("我收到了消息 a 事件:" + str); }); @@ -44,7 +45,27 @@ public class AppTest { System.out.println(Utility.now() + " ----------------- timer b 执行了"); }); //consumer.delay("a", "1", 200); - consumer.delay("a", "1", "2000"); + consumer.delay("a", "1", "2000");*/ + + Consumer con = x -> { + logger.info("--->开始申请锁:" + System.currentTimeMillis()); + Lock lock = consumer.tryLock("a", 20); + logger.info("===>成功申请锁:" + System.currentTimeMillis()); + for (int i = 0; i < 20; i++) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(x + ":" + i); + } + lock.unLock(); + }; + + new Thread(() -> con.accept("x")).start(); + new Thread(() -> con.accept("y")).start(); + new Thread(() -> con.accept("z")).start(); + Thread.sleep(60_000 * 60); } catch (Exception e) {