新增:1、lock 锁客户端API 2、rpc调用请求/接收端;
修改:消息发送优化
This commit is contained in:
parent
3d0d2f8f81
commit
ee7154990b
@ -3,6 +3,7 @@ package com.zdemo;
|
|||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.util.TypeToken;
|
import org.redkale.util.TypeToken;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -14,6 +15,11 @@ import java.util.function.Consumer;
|
|||||||
*/
|
*/
|
||||||
public abstract class AbstractConsumer implements IConsumer {
|
public abstract class AbstractConsumer implements IConsumer {
|
||||||
|
|
||||||
|
protected JsonConvert convert = JsonConvert.root();
|
||||||
|
|
||||||
|
@Resource(name = "APP_NAME")
|
||||||
|
protected String APP_NAME = "";
|
||||||
|
|
||||||
private Map<String, EventType> eventMap = new HashMap<>();
|
private Map<String, EventType> eventMap = new HashMap<>();
|
||||||
|
|
||||||
protected abstract String getGroupid();
|
protected abstract String getGroupid();
|
||||||
@ -37,7 +43,7 @@ public abstract class AbstractConsumer implements IConsumer {
|
|||||||
if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) {
|
if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) {
|
||||||
data = value;
|
data = value;
|
||||||
} else {
|
} else {
|
||||||
data = JsonConvert.root().convertFrom(eventType.typeToken.getType(), value);
|
data = convert.convertFrom(eventType.typeToken.getType(), value);
|
||||||
}
|
}
|
||||||
|
|
||||||
eventType.accept(data);
|
eventType.accept(data);
|
||||||
|
@ -7,6 +7,7 @@ import org.redkale.util.AnyValue;
|
|||||||
import org.redkale.util.RedkaleClassLoader;
|
import org.redkale.util.RedkaleClassLoader;
|
||||||
import org.redkale.util.ResourceFactory;
|
import org.redkale.util.ResourceFactory;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -30,14 +31,11 @@ public class ZhubListener implements ApplicationListener {
|
|||||||
String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
|
String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
|
||||||
try {
|
try {
|
||||||
Class<?> aClass = classLoader.loadClass(clazz);
|
Class<?> aClass = classLoader.loadClass(clazz);
|
||||||
Service obj = (Service) aClass.newInstance();
|
Service obj = (Service) aClass.getDeclaredConstructor().newInstance();
|
||||||
|
application.getResourceFactory().inject(obj);
|
||||||
obj.init(zhub);
|
obj.init(zhub);
|
||||||
resourceFactory.register(zhub.get("name"), aClass, obj);
|
resourceFactory.register(zhub.get("name"), aClass, obj);
|
||||||
} catch (ClassNotFoundException e) {
|
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
|
||||||
e.printStackTrace();
|
|
||||||
} catch (IllegalAccessException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} catch (InstantiationException e) {
|
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
20
src/com/zdemo/zhub/Lock.java
Normal file
20
src/com/zdemo/zhub/Lock.java
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package com.zdemo.zhub;
|
||||||
|
|
||||||
|
// ================================================== lock ==================================================
|
||||||
|
public class Lock {
|
||||||
|
private String name;
|
||||||
|
private String uuid;
|
||||||
|
private int duration;
|
||||||
|
private ZHubClient hubClient;
|
||||||
|
|
||||||
|
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
|
||||||
|
this.name = name;
|
||||||
|
this.uuid = uuid;
|
||||||
|
this.duration = duration;
|
||||||
|
this.hubClient = hubClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unLock() {
|
||||||
|
hubClient.send("unlock", name, uuid);
|
||||||
|
}
|
||||||
|
}
|
105
src/com/zdemo/zhub/Rpc.java
Normal file
105
src/com/zdemo/zhub/Rpc.java
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package com.zdemo.zhub;
|
||||||
|
|
||||||
|
import org.redkale.convert.ConvertColumn;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
|
|
||||||
|
public class Rpc<T> {
|
||||||
|
private String ruk; // request unique key:
|
||||||
|
private String topic; // call topic
|
||||||
|
private T value; // call paras
|
||||||
|
|
||||||
|
private RpcResponse response;
|
||||||
|
|
||||||
|
public Rpc() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Rpc(String appname, String ruk, String topic, Object value) {
|
||||||
|
this.ruk = appname + "::" + ruk;
|
||||||
|
this.topic = topic;
|
||||||
|
this.value = (T) JsonConvert.root().convertTo(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRuk() {
|
||||||
|
return ruk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRuk(String ruk) {
|
||||||
|
this.ruk = ruk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTopic() {
|
||||||
|
return topic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTopic(String topic) {
|
||||||
|
this.topic = topic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public T getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValue(T value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> RpcResponse<R> getResponse() {
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResponse(RpcResponse response) {
|
||||||
|
this.response = response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ConvertColumn(ignore = true)
|
||||||
|
public String getBackTopic() {
|
||||||
|
return ruk.split("::")[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> RpcResponse<R> buildResp() {
|
||||||
|
RpcResponse<R> response = new RpcResponse<>();
|
||||||
|
response.setRuk(ruk);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> RpcResponse<R> buildResp(int retcode, String retinfo) {
|
||||||
|
RpcResponse<R> response = new RpcResponse<>();
|
||||||
|
response.setRuk(ruk);
|
||||||
|
response.setRetcode(retcode);
|
||||||
|
response.setRetinfo(retinfo);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> RpcResponse<R> buildError(String retinfo) {
|
||||||
|
RpcResponse<R> response = new RpcResponse<>();
|
||||||
|
response.setRuk(ruk);
|
||||||
|
response.setRetcode(100);
|
||||||
|
response.setRetinfo(retinfo);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> RpcResponse<R> buildResp(R result) {
|
||||||
|
RpcResponse<R> response = new RpcResponse<>();
|
||||||
|
response.setRuk(ruk);
|
||||||
|
response.setResult(result);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ConvertColumn(ignore = true)
|
||||||
|
public int getRetcode() {
|
||||||
|
if (this.response == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.getRetcode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ConvertColumn(ignore = true)
|
||||||
|
public String getRetinfo() {
|
||||||
|
if (this.response == null) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.getRetinfo();
|
||||||
|
}
|
||||||
|
}
|
40
src/com/zdemo/zhub/RpcResponse.java
Normal file
40
src/com/zdemo/zhub/RpcResponse.java
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package com.zdemo.zhub;
|
||||||
|
|
||||||
|
public class RpcResponse<R> {
|
||||||
|
private String ruk;
|
||||||
|
private int retcode;
|
||||||
|
private String retinfo;
|
||||||
|
private R result;
|
||||||
|
|
||||||
|
public String getRuk() {
|
||||||
|
return ruk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRuk(String ruk) {
|
||||||
|
this.ruk = ruk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRetcode() {
|
||||||
|
return retcode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetcode(int retcode) {
|
||||||
|
this.retcode = retcode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRetinfo() {
|
||||||
|
return retinfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetinfo(String retinfo) {
|
||||||
|
this.retinfo = retinfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public R getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResult(R result) {
|
||||||
|
this.result = result;
|
||||||
|
}
|
||||||
|
}
|
@ -4,10 +4,8 @@ import com.zdemo.AbstractConsumer;
|
|||||||
import com.zdemo.Event;
|
import com.zdemo.Event;
|
||||||
import com.zdemo.IConsumer;
|
import com.zdemo.IConsumer;
|
||||||
import com.zdemo.IProducer;
|
import com.zdemo.IProducer;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue;
|
import org.redkale.util.*;
|
||||||
import org.redkale.util.AutoLoad;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
@ -17,10 +15,15 @@ import java.io.OutputStream;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
@ -38,13 +41,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
@Resource(name = "property.zhub.groupid")
|
@Resource(name = "property.zhub.groupid")
|
||||||
private String groupid = "";
|
private String groupid = "";
|
||||||
|
|
||||||
private ReentrantLock lock = new ReentrantLock();
|
//private ReentrantLock lock = new ReentrantLock();
|
||||||
private Socket client;
|
private Socket client;
|
||||||
private OutputStream writer;
|
private OutputStream writer;
|
||||||
private BufferedReader reader;
|
private BufferedReader reader;
|
||||||
|
|
||||||
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
|
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
|
||||||
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
|
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
|
||||||
|
private final LinkedBlockingQueue<Event<String>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG
|
||||||
|
private final LinkedBlockingQueue<Event<String>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG
|
||||||
|
private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
|
||||||
|
|
||||||
private BiConsumer<Runnable, Integer> threadBuilder = (r, n) -> {
|
private BiConsumer<Runnable, Integer> threadBuilder = (r, n) -> {
|
||||||
for (int i = 0; i < n; i++) {
|
for (int i = 0; i < n; i++) {
|
||||||
@ -91,7 +97,30 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
reader.readLine(); //$n len(value)
|
reader.readLine(); //$n len(value)
|
||||||
String value = reader.readLine(); // value
|
String value = reader.readLine(); // value
|
||||||
|
|
||||||
topicQueue.put(Event.of(topic, value));
|
// lock msg
|
||||||
|
if ("lock".equals(topic)) {
|
||||||
|
Lock lock = lockTag.get(value);
|
||||||
|
if (lock != null) {
|
||||||
|
synchronized (lock) {
|
||||||
|
lock.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// rpc back msg
|
||||||
|
if (APP_NAME.equals(topic)) {
|
||||||
|
rpcBackQueue.add(Event.of(topic, value));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// rpc call msg
|
||||||
|
if (rpcTopics.contains(topic)) {
|
||||||
|
rpcCallQueue.add(Event.of(topic, value));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// oth msg
|
||||||
|
topicQueue.add(Event.of(topic, value));
|
||||||
}
|
}
|
||||||
|
|
||||||
// timer 消息
|
// timer 消息
|
||||||
@ -104,14 +133,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
reader.readLine(); //$n len(key)
|
reader.readLine(); //$n len(key)
|
||||||
String topic = reader.readLine(); // name
|
String topic = reader.readLine(); // name
|
||||||
|
|
||||||
timerQueue.put(timerMap.get(topic));
|
timerQueue.add(timerMap.get(topic));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (e instanceof SocketException) {
|
if (e instanceof SocketException) {
|
||||||
initSocket(Integer.MAX_VALUE);
|
initSocket(Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}).start();
|
}).start();
|
||||||
@ -153,11 +180,81 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
}
|
}
|
||||||
}, 1);
|
}, 1);
|
||||||
|
|
||||||
|
// rpc back
|
||||||
|
threadBuilder.accept(() -> {
|
||||||
|
while (true) {
|
||||||
|
Event<String> event = null;
|
||||||
|
try {
|
||||||
|
if ((event = rpcBackQueue.take()) == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
//if (event)
|
||||||
|
rpcAccept(event.value);
|
||||||
|
logger.info(String.format("rpc-back:[%s] => %s", event.topic, event.value));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 1);
|
||||||
|
|
||||||
|
// rpc call
|
||||||
|
threadBuilder.accept(() -> {
|
||||||
|
while (true) {
|
||||||
|
Event<String> event = null;
|
||||||
|
try {
|
||||||
|
if ((event = rpcCallQueue.take()) == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
logger.info(String.format("rpc-call:[%s] => %s", event.topic, event.value));
|
||||||
|
accept(event.topic, event.value);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 1);
|
||||||
|
|
||||||
|
// send msg
|
||||||
|
threadBuilder.accept(() -> {
|
||||||
|
while (true) {
|
||||||
|
String msg = null;
|
||||||
|
try {
|
||||||
|
if ((msg = sendMsgQueue.take()) == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
writer.write(msg.getBytes());
|
||||||
|
writer.flush();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 1);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------
|
// ---------------------
|
||||||
// 消息发送
|
// -- 消息发送 --
|
||||||
private boolean send(String... data) {
|
protected boolean send(String... data) {
|
||||||
|
if (data.length == 1) {
|
||||||
|
sendMsgQueue.add(data[0] + "\r\n");
|
||||||
|
} else if (data.length > 1) {
|
||||||
|
StringBuffer buf = new StringBuffer();
|
||||||
|
buf.append("*" + data.length + "\r\n");
|
||||||
|
for (String d : data) {
|
||||||
|
buf.append("$" + d.length() + "\r\n");
|
||||||
|
buf.append(d + "\r\n");
|
||||||
|
}
|
||||||
|
sendMsgQueue.add(buf.toString());
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*protected boolean send(String... data) {
|
||||||
try {
|
try {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
if (data.length == 1) {
|
if (data.length == 1) {
|
||||||
@ -177,13 +274,13 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}*/
|
||||||
|
|
||||||
private String toStr(Object v) {
|
private String toStr(Object v) {
|
||||||
if (v instanceof String) {
|
if (v instanceof String) {
|
||||||
return (String) v;
|
return (String) v;
|
||||||
}
|
}
|
||||||
return JsonConvert.root().convertTo(v);
|
return convert.convertTo(v);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean initSocket(int retry) {
|
protected boolean initSocket(int retry) {
|
||||||
@ -202,7 +299,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
}
|
}
|
||||||
send("groupid " + groupid);
|
send("groupid " + groupid);
|
||||||
|
|
||||||
StringBuffer buf = new StringBuffer("subscribe");
|
StringBuffer buf = new StringBuffer("subscribe lock " + APP_NAME);
|
||||||
for (String topic : getTopics()) {
|
for (String topic : getTopics()) {
|
||||||
buf.append(" ").append(topic);
|
buf.append(" ").append(topic);
|
||||||
}
|
}
|
||||||
@ -296,6 +393,25 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
send("subscribe " + topic); //新增订阅
|
send("subscribe " + topic); //新增订阅
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ================================================== lock ==================================================
|
||||||
|
private Map<String, Lock> lockTag = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public Lock tryLock(String key, int duration) {
|
||||||
|
String uuid = Utility.uuid();
|
||||||
|
Lock lock = new Lock(key, uuid, duration, this);
|
||||||
|
lockTag.put(uuid, lock);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// c.send("lock", key, uuid, strconv.Itoa(duration))
|
||||||
|
send("lock", key, uuid, String.valueOf(duration));
|
||||||
|
synchronized (lock) {
|
||||||
|
lock.wait();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return lock;
|
||||||
|
}
|
||||||
|
|
||||||
// ================================================== timer ==================================================
|
// ================================================== timer ==================================================
|
||||||
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
|
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
|
||||||
@ -329,4 +445,79 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
public void reloadTimer() {
|
public void reloadTimer() {
|
||||||
send("cmd", "reload-timer");
|
send("cmd", "reload-timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ================================================== rpc ==================================================
|
||||||
|
// -- 调用端 --
|
||||||
|
private Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
|
||||||
|
private Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Comment("rpc call")
|
||||||
|
public CompletableFuture<Rpc> rpc(String topic, Object v) {
|
||||||
|
return (CompletableFuture) rpc(topic, v, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Comment("rpc call")
|
||||||
|
public <T, R> CompletableFuture<Rpc<T>> rpc(String topic, T v, TypeToken<R> typeToken) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
|
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
|
||||||
|
rpcMap.put(rpc.getRuk(), rpc);
|
||||||
|
if (typeToken != null) {
|
||||||
|
rpcRetType.put(rpc.getRuk(), typeToken);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
publish(topic, rpc);
|
||||||
|
synchronized (rpc) {
|
||||||
|
rpc.wait();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
// todo: 设置请求失败
|
||||||
|
}
|
||||||
|
return rpc;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// RpcResponse: {ruk:xxx-xxxx, retcode:0}
|
||||||
|
@Comment("rpc call back consumer")
|
||||||
|
private void rpcAccept(String value) {
|
||||||
|
RpcResponse resp = convert.convertFrom(new TypeToken<RpcResponse<String>>() {
|
||||||
|
}.getType(), value);
|
||||||
|
|
||||||
|
String ruk = resp.getRuk();
|
||||||
|
Rpc rpc = rpcMap.get(ruk);
|
||||||
|
TypeToken typeToken = rpcRetType.get(ruk);
|
||||||
|
|
||||||
|
Object result = resp.getResult();
|
||||||
|
if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName())) {
|
||||||
|
result = convert.convertFrom(typeToken.getType(), (String) resp.getResult());
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.setResult(result);
|
||||||
|
rpc.setResponse(resp);
|
||||||
|
synchronized (rpc) {
|
||||||
|
rpc.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- 订阅端 --
|
||||||
|
private Set<String> rpcTopics = new HashSet();
|
||||||
|
|
||||||
|
@Comment("rpc call consumer")
|
||||||
|
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResponse<R>> fun) {
|
||||||
|
Consumer<String> consumer = v -> {
|
||||||
|
Rpc<T> rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
|
||||||
|
}.getType(), v);
|
||||||
|
|
||||||
|
// 参数转换
|
||||||
|
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
|
||||||
|
rpc.setValue(paras);
|
||||||
|
RpcResponse<R> response = fun.apply(rpc);
|
||||||
|
|
||||||
|
// back
|
||||||
|
publish(rpc.getBackTopic(), response);
|
||||||
|
};
|
||||||
|
|
||||||
|
rpcTopics.add(topic);
|
||||||
|
subscribe(topic, consumer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,16 +2,17 @@ package com.zdemo.test;
|
|||||||
|
|
||||||
import com.zdemo.Event;
|
import com.zdemo.Event;
|
||||||
import com.zdemo.IProducer;
|
import com.zdemo.IProducer;
|
||||||
|
import com.zdemo.zhub.Lock;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.util.Utility;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
@ -27,7 +28,7 @@ public class AppTest {
|
|||||||
//启动并开启消费监听
|
//启动并开启消费监听
|
||||||
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
||||||
|
|
||||||
consumer.subscribe("a", str -> {
|
/*consumer.subscribe("a", str -> {
|
||||||
logger.info("我收到了消息 a 事件:" + str);
|
logger.info("我收到了消息 a 事件:" + str);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -44,7 +45,27 @@ public class AppTest {
|
|||||||
System.out.println(Utility.now() + " ----------------- timer b 执行了");
|
System.out.println(Utility.now() + " ----------------- timer b 执行了");
|
||||||
});
|
});
|
||||||
//consumer.delay("a", "1", 200);
|
//consumer.delay("a", "1", 200);
|
||||||
consumer.delay("a", "1", "2000");
|
consumer.delay("a", "1", "2000");*/
|
||||||
|
|
||||||
|
Consumer<String> con = x -> {
|
||||||
|
logger.info("--->开始申请锁:" + System.currentTimeMillis());
|
||||||
|
Lock lock = consumer.tryLock("a", 20);
|
||||||
|
logger.info("===>成功申请锁:" + System.currentTimeMillis());
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
System.out.println(x + ":" + i);
|
||||||
|
}
|
||||||
|
lock.unLock();
|
||||||
|
};
|
||||||
|
|
||||||
|
new Thread(() -> con.accept("x")).start();
|
||||||
|
new Thread(() -> con.accept("y")).start();
|
||||||
|
new Thread(() -> con.accept("z")).start();
|
||||||
|
|
||||||
|
|
||||||
Thread.sleep(60_000 * 60);
|
Thread.sleep(60_000 * 60);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
Loading…
Reference in New Issue
Block a user