diff --git a/src/com/zdemo/zdb/ZHubConsumer.java b/src/com/zdemo/zdb/ZHubConsumer.java index 0fa8e1f..d72486e 100644 --- a/src/com/zdemo/zdb/ZHubConsumer.java +++ b/src/com/zdemo/zdb/ZHubConsumer.java @@ -36,15 +36,25 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer @Override public void init(AnyValue config) { + if (!preInit()) { + return; + } boolean flag = initSocket(); new Thread(() -> { while (flag) { - String topic = ""; - String value = ""; try { String readLine = reader.readLine(); - String type = ""; + if (readLine == null) { // 连接中断 处理 + while (!initSocket()) { + try { + Thread.sleep(1000 * 5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + String type = ""; // 主题订阅消息 if ("*3".equals(readLine)) { readLine = reader.readLine(); // $7 len() @@ -53,11 +63,15 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer continue; } reader.readLine(); //$n len(key) - topic = reader.readLine(); // topic + String topic = reader.readLine(); // topic reader.readLine(); //$n len(value) - value = reader.readLine(); // value - accept(topic, value); + String value = reader.readLine(); // value + try { + accept(topic, value); + } catch (Exception e) { + logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e); + } } // timer 消息 @@ -68,7 +82,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer continue; } reader.readLine(); //$n len(key) - topic = reader.readLine(); // name + String topic = reader.readLine(); // name Timer timer = timerMap.get(topic); if (timer != null) { @@ -80,14 +94,12 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer if (e instanceof SocketException) { while (!initSocket()) { try { - Thread.sleep(1000 * 3); + Thread.sleep(1000 * 5); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } - } catch (Exception e) { - logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e); } } }).start(); @@ -130,13 +142,12 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer for (String topic : getTopics()) { buf.append(" ").append(topic); } - buf.append("\r\n"); + send(buf.toString()); // 重连 timer 订阅 timerMap.forEach((name, timer) -> { send("timer", name, timer.expr, timer.single ? "a" : "x"); }); - send(buf.toString()); } catch (IOException e) { logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); return false; @@ -203,6 +214,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer public Timer(String name, String expr, Runnable runnable, boolean single) { this.name = name; + this.expr = expr; this.runnable = runnable; this.single = single; }