新增:ZHub consumer 网络连接中断处理

This commit is contained in:
lxy 2021-01-12 11:50:21 +08:00
parent 1d6ae86cbd
commit 687dd924c2

View File

@ -36,15 +36,25 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
@Override
public void init(AnyValue config) {
if (!preInit()) {
return;
}
boolean flag = initSocket();
new Thread(() -> {
while (flag) {
String topic = "";
String value = "";
try {
String readLine = reader.readLine();
String type = "";
if (readLine == null) { // 连接中断 处理
while (!initSocket()) {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
String type = "";
// 主题订阅消息
if ("*3".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
@ -53,11 +63,15 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
continue;
}
reader.readLine(); //$n len(key)
topic = reader.readLine(); // topic
String topic = reader.readLine(); // topic
reader.readLine(); //$n len(value)
value = reader.readLine(); // value
String value = reader.readLine(); // value
try {
accept(topic, value);
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
}
}
// timer 消息
@ -68,7 +82,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
continue;
}
reader.readLine(); //$n len(key)
topic = reader.readLine(); // name
String topic = reader.readLine(); // name
Timer timer = timerMap.get(topic);
if (timer != null) {
@ -80,14 +94,12 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
if (e instanceof SocketException) {
while (!initSocket()) {
try {
Thread.sleep(1000 * 3);
Thread.sleep(1000 * 5);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
}
}
}).start();
@ -130,13 +142,12 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
buf.append("\r\n");
send(buf.toString());
// 重连 timer 订阅
timerMap.forEach((name, timer) -> {
send("timer", name, timer.expr, timer.single ? "a" : "x");
});
send(buf.toString());
} catch (IOException e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
return false;
@ -203,6 +214,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
public Timer(String name, String expr, Runnable runnable, boolean single) {
this.name = name;
this.expr = expr;
this.runnable = runnable;
this.single = single;
}