新增:ZHub consumer 网络连接中断处理
This commit is contained in:
parent
c457e8094d
commit
d70d9ad244
@ -36,15 +36,25 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
|
if (!preInit()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
boolean flag = initSocket();
|
boolean flag = initSocket();
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
while (flag) {
|
while (flag) {
|
||||||
String topic = "";
|
|
||||||
String value = "";
|
|
||||||
try {
|
try {
|
||||||
String readLine = reader.readLine();
|
String readLine = reader.readLine();
|
||||||
String type = "";
|
if (readLine == null) { // 连接中断 处理
|
||||||
|
while (!initSocket()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000 * 5);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String type = "";
|
||||||
// 主题订阅消息
|
// 主题订阅消息
|
||||||
if ("*3".equals(readLine)) {
|
if ("*3".equals(readLine)) {
|
||||||
readLine = reader.readLine(); // $7 len()
|
readLine = reader.readLine(); // $7 len()
|
||||||
@ -53,11 +63,15 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
reader.readLine(); //$n len(key)
|
reader.readLine(); //$n len(key)
|
||||||
topic = reader.readLine(); // topic
|
String topic = reader.readLine(); // topic
|
||||||
|
|
||||||
reader.readLine(); //$n len(value)
|
reader.readLine(); //$n len(value)
|
||||||
value = reader.readLine(); // value
|
String value = reader.readLine(); // value
|
||||||
|
try {
|
||||||
accept(topic, value);
|
accept(topic, value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// timer 消息
|
// timer 消息
|
||||||
@ -68,7 +82,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
reader.readLine(); //$n len(key)
|
reader.readLine(); //$n len(key)
|
||||||
topic = reader.readLine(); // name
|
String topic = reader.readLine(); // name
|
||||||
|
|
||||||
Timer timer = timerMap.get(topic);
|
Timer timer = timerMap.get(topic);
|
||||||
if (timer != null) {
|
if (timer != null) {
|
||||||
@ -80,14 +94,12 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
if (e instanceof SocketException) {
|
if (e instanceof SocketException) {
|
||||||
while (!initSocket()) {
|
while (!initSocket()) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000 * 3);
|
Thread.sleep(1000 * 5);
|
||||||
} catch (InterruptedException interruptedException) {
|
} catch (InterruptedException interruptedException) {
|
||||||
interruptedException.printStackTrace();
|
interruptedException.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}).start();
|
}).start();
|
||||||
@ -130,13 +142,12 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
for (String topic : getTopics()) {
|
for (String topic : getTopics()) {
|
||||||
buf.append(" ").append(topic);
|
buf.append(" ").append(topic);
|
||||||
}
|
}
|
||||||
buf.append("\r\n");
|
send(buf.toString());
|
||||||
|
|
||||||
// 重连 timer 订阅
|
// 重连 timer 订阅
|
||||||
timerMap.forEach((name, timer) -> {
|
timerMap.forEach((name, timer) -> {
|
||||||
send("timer", name, timer.expr, timer.single ? "a" : "x");
|
send("timer", name, timer.expr, timer.single ? "a" : "x");
|
||||||
});
|
});
|
||||||
send(buf.toString());
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
||||||
return false;
|
return false;
|
||||||
@ -203,6 +214,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
|
|
||||||
public Timer(String name, String expr, Runnable runnable, boolean single) {
|
public Timer(String name, String expr, Runnable runnable, boolean single) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
this.expr = expr;
|
||||||
this.runnable = runnable;
|
this.runnable = runnable;
|
||||||
this.single = single;
|
this.single = single;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user