新增:1、lock 锁客户端API 2、rpc调用请求/接收端;

修改:消息发送优化
This commit is contained in:
lxy 2021-04-04 23:40:28 +08:00
parent 6a4a6bbf7e
commit 3760f01b51
7 changed files with 405 additions and 24 deletions

View File

@ -3,6 +3,7 @@ package com.zdemo;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.TypeToken;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -14,6 +15,11 @@ import java.util.function.Consumer;
*/
public abstract class AbstractConsumer implements IConsumer {
protected JsonConvert convert = JsonConvert.root();
@Resource(name = "APP_NAME")
protected String APP_NAME = "";
private Map<String, EventType> eventMap = new HashMap<>();
protected abstract String getGroupid();
@ -37,7 +43,7 @@ public abstract class AbstractConsumer implements IConsumer {
if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) {
data = value;
} else {
data = JsonConvert.root().convertFrom(eventType.typeToken.getType(), value);
data = convert.convertFrom(eventType.typeToken.getType(), value);
}
eventType.accept(data);

View File

@ -7,6 +7,7 @@ import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.ResourceFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CompletableFuture;
/**
@ -30,14 +31,11 @@ public class ZhubListener implements ApplicationListener {
String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
try {
Class<?> aClass = classLoader.loadClass(clazz);
Service obj = (Service) aClass.newInstance();
Service obj = (Service) aClass.getDeclaredConstructor().newInstance();
application.getResourceFactory().inject(obj);
obj.init(zhub);
resourceFactory.register(zhub.get("name"), aClass, obj);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
e.printStackTrace();
}
}

View File

@ -0,0 +1,20 @@
package com.zdemo.zhub;
// ================================================== lock ==================================================
public class Lock {
private String name;
private String uuid;
private int duration;
private ZHubClient hubClient;
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
this.name = name;
this.uuid = uuid;
this.duration = duration;
this.hubClient = hubClient;
}
public void unLock() {
hubClient.send("unlock", name, uuid);
}
}

105
src/com/zdemo/zhub/Rpc.java Normal file
View File

@ -0,0 +1,105 @@
package com.zdemo.zhub;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
public class Rpc<T> {
private String ruk; // request unique key:
private String topic; // call topic
private T value; // call paras
private RpcResponse response;
public Rpc() {
}
public Rpc(String appname, String ruk, String topic, Object value) {
this.ruk = appname + "::" + ruk;
this.topic = topic;
this.value = (T) JsonConvert.root().convertTo(value);
}
public String getRuk() {
return ruk;
}
public void setRuk(String ruk) {
this.ruk = ruk;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
public <R> RpcResponse<R> getResponse() {
return response;
}
public void setResponse(RpcResponse response) {
this.response = response;
}
@ConvertColumn(ignore = true)
public String getBackTopic() {
return ruk.split("::")[0];
}
public <R> RpcResponse<R> buildResp() {
RpcResponse<R> response = new RpcResponse<>();
response.setRuk(ruk);
return response;
}
public <R> RpcResponse<R> buildResp(int retcode, String retinfo) {
RpcResponse<R> response = new RpcResponse<>();
response.setRuk(ruk);
response.setRetcode(retcode);
response.setRetinfo(retinfo);
return response;
}
public <R> RpcResponse<R> buildError(String retinfo) {
RpcResponse<R> response = new RpcResponse<>();
response.setRuk(ruk);
response.setRetcode(100);
response.setRetinfo(retinfo);
return response;
}
public <R> RpcResponse<R> buildResp(R result) {
RpcResponse<R> response = new RpcResponse<>();
response.setRuk(ruk);
response.setResult(result);
return response;
}
@ConvertColumn(ignore = true)
public int getRetcode() {
if (this.response == null) {
return -1;
}
return response.getRetcode();
}
@ConvertColumn(ignore = true)
public String getRetinfo() {
if (this.response == null) {
return "";
}
return response.getRetinfo();
}
}

View File

@ -0,0 +1,40 @@
package com.zdemo.zhub;
public class RpcResponse<R> {
private String ruk;
private int retcode;
private String retinfo;
private R result;
public String getRuk() {
return ruk;
}
public void setRuk(String ruk) {
this.ruk = ruk;
}
public int getRetcode() {
return retcode;
}
public void setRetcode(int retcode) {
this.retcode = retcode;
}
public String getRetinfo() {
return retinfo;
}
public void setRetinfo(String retinfo) {
this.retinfo = retinfo;
}
public R getResult() {
return result;
}
public void setResult(R result) {
this.result = result;
}
}

View File

@ -4,10 +4,8 @@ import com.zdemo.AbstractConsumer;
import com.zdemo.Event;
import com.zdemo.IConsumer;
import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.AutoLoad;
import org.redkale.util.*;
import javax.annotation.Resource;
import java.io.BufferedReader;
@ -17,10 +15,15 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -38,13 +41,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
@Resource(name = "property.zhub.groupid")
private String groupid = "";
private ReentrantLock lock = new ReentrantLock();
//private ReentrantLock lock = new ReentrantLock();
private Socket client;
private OutputStream writer;
private BufferedReader reader;
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG
private final LinkedBlockingQueue<Event<String>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG
private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
private BiConsumer<Runnable, Integer> threadBuilder = (r, n) -> {
for (int i = 0; i < n; i++) {
@ -91,7 +97,30 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
reader.readLine(); //$n len(value)
String value = reader.readLine(); // value
topicQueue.put(Event.of(topic, value));
// lock msg
if ("lock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.notifyAll();
}
}
continue;
}
// rpc back msg
if (APP_NAME.equals(topic)) {
rpcBackQueue.add(Event.of(topic, value));
continue;
}
// rpc call msg
if (rpcTopics.contains(topic)) {
rpcCallQueue.add(Event.of(topic, value));
continue;
}
// oth msg
topicQueue.add(Event.of(topic, value));
}
// timer 消息
@ -104,14 +133,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // name
timerQueue.put(timerMap.get(topic));
timerQueue.add(timerMap.get(topic));
}
} catch (IOException e) {
if (e instanceof SocketException) {
initSocket(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
@ -153,11 +180,81 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
}, 1);
// rpc back
threadBuilder.accept(() -> {
while (true) {
Event<String> event = null;
try {
if ((event = rpcBackQueue.take()) == null) {
continue;
}
//if (event)
rpcAccept(event.value);
logger.info(String.format("rpc-back:[%s] => %s", event.topic, event.value));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
}
}
}, 1);
// rpc call
threadBuilder.accept(() -> {
while (true) {
Event<String> event = null;
try {
if ((event = rpcCallQueue.take()) == null) {
continue;
}
logger.info(String.format("rpc-call:[%s] => %s", event.topic, event.value));
accept(event.topic, event.value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
}
}
}, 1);
// send msg
threadBuilder.accept(() -> {
while (true) {
String msg = null;
try {
if ((msg = sendMsgQueue.take()) == null) {
continue;
}
writer.write(msg.getBytes());
writer.flush();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
}
}
}, 1);
}
// ---------------------
// 消息发送
private boolean send(String... data) {
// -- 消息发送 --
protected boolean send(String... data) {
if (data.length == 1) {
sendMsgQueue.add(data[0] + "\r\n");
} else if (data.length > 1) {
StringBuffer buf = new StringBuffer();
buf.append("*" + data.length + "\r\n");
for (String d : data) {
buf.append("$" + d.length() + "\r\n");
buf.append(d + "\r\n");
}
sendMsgQueue.add(buf.toString());
}
return true;
}
/*protected boolean send(String... data) {
try {
lock.lock();
if (data.length == 1) {
@ -177,13 +274,13 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
lock.unlock();
}
return false;
}
}*/
private String toStr(Object v) {
if (v instanceof String) {
return (String) v;
}
return JsonConvert.root().convertTo(v);
return convert.convertTo(v);
}
protected boolean initSocket(int retry) {
@ -202,7 +299,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe");
StringBuffer buf = new StringBuffer("subscribe lock " + APP_NAME);
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
@ -296,6 +393,25 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
send("subscribe " + topic); //新增订阅
}
// ================================================== lock ==================================================
private Map<String, Lock> lockTag = new ConcurrentHashMap<>();
public Lock tryLock(String key, int duration) {
String uuid = Utility.uuid();
Lock lock = new Lock(key, uuid, duration, this);
lockTag.put(uuid, lock);
try {
// c.send("lock", key, uuid, strconv.Itoa(duration))
send("lock", key, uuid, String.valueOf(duration));
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return lock;
}
// ================================================== timer ==================================================
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
@ -329,4 +445,79 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
public void reloadTimer() {
send("cmd", "reload-timer");
}
// ================================================== rpc ==================================================
// -- 调用端 --
private Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
private Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
@Comment("rpc call")
public CompletableFuture<Rpc> rpc(String topic, Object v) {
return (CompletableFuture) rpc(topic, v, null);
}
@Comment("rpc call")
public <T, R> CompletableFuture<Rpc<T>> rpc(String topic, T v, TypeToken<R> typeToken) {
return CompletableFuture.supplyAsync(() -> {
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
rpcMap.put(rpc.getRuk(), rpc);
if (typeToken != null) {
rpcRetType.put(rpc.getRuk(), typeToken);
}
try {
publish(topic, rpc);
synchronized (rpc) {
rpc.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
// todo: 设置请求失败
}
return rpc;
});
}
// RpcResponse: {ruk:xxx-xxxx, retcode:0}
@Comment("rpc call back consumer")
private void rpcAccept(String value) {
RpcResponse resp = convert.convertFrom(new TypeToken<RpcResponse<String>>() {
}.getType(), value);
String ruk = resp.getRuk();
Rpc rpc = rpcMap.get(ruk);
TypeToken typeToken = rpcRetType.get(ruk);
Object result = resp.getResult();
if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName())) {
result = convert.convertFrom(typeToken.getType(), (String) resp.getResult());
}
resp.setResult(result);
rpc.setResponse(resp);
synchronized (rpc) {
rpc.notify();
}
}
// -- 订阅端 --
private Set<String> rpcTopics = new HashSet();
@Comment("rpc call consumer")
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResponse<R>> fun) {
Consumer<String> consumer = v -> {
Rpc<T> rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
}.getType(), v);
// 参数转换
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
rpc.setValue(paras);
RpcResponse<R> response = fun.apply(rpc);
// back
publish(rpc.getBackTopic(), response);
};
rpcTopics.add(topic);
subscribe(topic, consumer);
}
}

View File

@ -2,16 +2,17 @@ package com.zdemo.test;
import com.zdemo.Event;
import com.zdemo.IProducer;
import com.zdemo.zhub.Lock;
import org.junit.Test;
import org.redkale.boot.Application;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Logger;
@ -27,7 +28,7 @@ public class AppTest {
//启动并开启消费监听
MyConsumer consumer = Application.singleton(MyConsumer.class);
consumer.subscribe("a", str -> {
/*consumer.subscribe("a", str -> {
logger.info("我收到了消息 a 事件:" + str);
});
@ -44,7 +45,27 @@ public class AppTest {
System.out.println(Utility.now() + " ----------------- timer b 执行了");
});
//consumer.delay("a", "1", 200);
consumer.delay("a", "1", "2000");
consumer.delay("a", "1", "2000");*/
Consumer<String> con = x -> {
logger.info("--->开始申请锁:" + System.currentTimeMillis());
Lock lock = consumer.tryLock("a", 20);
logger.info("===>成功申请锁:" + System.currentTimeMillis());
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x + ":" + i);
}
lock.unLock();
};
new Thread(() -> con.accept("x")).start();
new Thread(() -> con.accept("y")).start();
new Thread(() -> con.accept("z")).start();
Thread.sleep(60_000 * 60);
} catch (Exception e) {