新增:1、ZhubClient 构造方法,支持 new 的方式初始化实例(一般在测试时使用)2、initClient 方法逻辑微调,确保实例完成初始化在执行队列数据处理

This commit is contained in:
梁显优 2023-05-25 17:53:37 +08:00
parent 4986fafdd2
commit c0a1c60cb6

View File

@ -56,10 +56,19 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
};*/ };*/
private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
/*
public ZHubClient() { public ZHubClient() {
logger.info("ZHubClient:" + (application != null ? application.getName() : "NULL"));
}*/ }
public ZHubClient(String name, Map<String, String> attr) {
this.APP_NAME = name;
this.addr = attr.get("addr");
this.groupid = attr.get("groupid");
this.auth = attr.get("auth");
this.initClient(null);
}
@Override @Override
public void init(AnyValue config) { public void init(AnyValue config) {
@ -99,215 +108,217 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
mainHub.put(addr, this); mainHub.put(addr, this);
} }
// 消息 事件接收 CompletableFuture.runAsync(() -> {
new Thread(() -> { // 消息 事件接收
if (!initSocket(0)) { new Thread(() -> {
return; if (!initSocket(0)) {
} return;
}
while (true) { while (true) {
try { try {
String readLine = reader.readLine(); String readLine = reader.readLine();
if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理 if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理
continue;
}
String type = "";
// +ping
if ("+ping".equals(readLine)) {
send("+pong");
continue;
}
// 主题订阅消息
if ("*3".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"message".equals(type)) {
continue; continue;
} }
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // topic
String lenStr = reader.readLine();//$n len(value) String type = "";
int clen = 0;
if (lenStr.startsWith("$")) { // +ping
clen = Integer.parseInt(lenStr.replace("$", "")); if ("+ping".equals(readLine)) {
send("+pong");
continue;
} }
String value = ""; // 主题订阅消息
do { if ("*3".equals(readLine)) {
if (value.length() > 0) { readLine = reader.readLine(); // $7 len()
value += "\r\n"; type = reader.readLine(); // message
if (!"message".equals(type)) {
continue;
} }
String s = reader.readLine(); reader.readLine(); //$n len(key)
value += s; // value String topic = reader.readLine(); // topic
} while (clen > 0 && clen > strLength(value));
logger.finest("topic[" + topic + "]: " + value); String lenStr = reader.readLine();//$n len(value)
int clen = 0;
if (lenStr.startsWith("$")) {
clen = Integer.parseInt(lenStr.replace("$", ""));
}
// lock msg String value = "";
if ("lock".equals(topic)) { do {
Lock lock = lockTag.get(value); if (value.length() > 0) {
if (lock != null) { value += "\r\n";
synchronized (lock) {
lock.notifyAll();
} }
String s = reader.readLine();
value += s; // value
} while (clen > 0 && clen > strLength(value));
logger.finest("topic[" + topic + "]: " + value);
// lock msg
if ("lock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.notifyAll();
}
}
continue;
} }
continue; // rpc back msg
} if (APP_NAME.equals(topic)) {
// rpc back msg rpcBackQueue.add(Event.of(topic, value));
if (APP_NAME.equals(topic)) { continue;
rpcBackQueue.add(Event.of(topic, value)); }
// rpc call msg
if (rpcTopics.contains(topic)) {
rpcCallQueue.add(Event.of(topic, value));
continue;
}
// oth msg
topicQueue.add(Event.of(topic, value));
continue; continue;
} }
// rpc call msg // timer 消息
if (rpcTopics.contains(topic)) { if ("*2".equals(readLine)) {
rpcCallQueue.add(Event.of(topic, value)); readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"timer".equals(type)) {
continue;
}
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // name
logger.finest("timer[" + topic + "]: ");
timerQueue.add(timerMap.get(topic));
continue; continue;
} }
// oth msg logger.finest(readLine);
topicQueue.add(Event.of(topic, value)); } catch (IOException e) {
continue; if (e instanceof SocketException) {
initSocket(Integer.MAX_VALUE);
}
e.printStackTrace();
} }
}
}).start();
}).thenAcceptAsync(x -> {
// 定时调度事件已加入耗时监控
new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) {
Timer timer = null;
try {
if ((timer = timerQueue.take()) == null) {
return;
}
long start = System.currentTimeMillis();
pool.submit(timer.runnable).get(5, TimeUnit.SECONDS);
long end = System.currentTimeMillis();
logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
}
}
}).start();
// timer 消息 // topic msg已加入耗时监控
if ("*2".equals(readLine)) { new Thread(() -> {
readLine = reader.readLine(); // $7 len() ExecutorService pool = Executors.newFixedThreadPool(1);
type = reader.readLine(); // message while (true) {
if (!"timer".equals(type)) { Event<String> event = null;
try {
if ((event = topicQueue.take()) == null) {
continue; continue;
} }
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // name
logger.finest("timer[" + topic + "]: "); String topic = event.topic;
timerQueue.add(timerMap.get(topic)); String value = event.value;
continue; pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
} }
logger.finest(readLine);
} catch (IOException e) {
if (e instanceof SocketException) {
initSocket(Integer.MAX_VALUE);
}
e.printStackTrace();
} }
} }).start();
}).start();
// 定时调度事件已加入耗时监控 // rpc back ,仅做数据解析暂无耗时监控
new Thread(() -> { new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1); while (true) {
while (true) { Event<String> event = null;
Timer timer = null; try {
try { if ((event = rpcBackQueue.take()) == null) {
if ((timer = timerQueue.take()) == null) { continue;
return; }
//if (event)
logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value));
rpcAccept(event.value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
} }
long start = System.currentTimeMillis();
pool.submit(timer.runnable).get(5, TimeUnit.SECONDS);
long end = System.currentTimeMillis();
logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
} }
} }).start();
}).start();
// topic msg已加入耗时监控 // rpc call已加入耗时监控
new Thread(() -> { new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1); ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) { while (true) {
Event<String> event = null; Event<String> event = null;
try { try {
if ((event = topicQueue.take()) == null) { if ((event = rpcCallQueue.take()) == null) {
continue; continue;
}
logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value));
String topic = event.topic;
String value = event.value;
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
} }
String topic = event.topic;
String value = event.value;
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
} }
} }).start();
}).start();
// rpc back ,仅做数据解析暂无耗时监控 // send msg
new Thread(() -> { new Thread(() -> {
while (true) { while (true) {
Event<String> event = null; String msg = null;
try { try {
if ((event = rpcBackQueue.take()) == null) { if ((msg = sendMsgQueue.take()) == null) {
continue; continue;
}
// logger.log(Level.FINEST, "send-msg: [" + msg + "]");
writer.write(msg.getBytes());
writer.flush();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
} }
//if (event)
logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value));
rpcAccept(event.value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
} }
} }).start();
}).start(); });
// rpc call已加入耗时监控
new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) {
Event<String> event = null;
try {
if ((event = rpcCallQueue.take()) == null) {
continue;
}
logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value));
String topic = event.topic;
String value = event.value;
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
}
}
}).start();
// send msg
new Thread(() -> {
while (true) {
String msg = null;
try {
if ((msg = sendMsgQueue.take()) == null) {
continue;
}
// logger.log(Level.FINEST, "send-msg: [" + msg + "]");
writer.write(msg.getBytes());
writer.flush();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
}
}
}).start();
return this; return this;
} }