修改:zhub-client 重连逻辑

This commit is contained in:
lxy 2021-03-01 18:50:15 +08:00
parent cb5d701e8c
commit b9c6360241
6 changed files with 77 additions and 65 deletions

View File

@ -2,16 +2,15 @@
<application port="2001">
<zhubs>
<zhub name="zhub" addr="127.0.0.1" port="1216" groupid="group-zhub"/>
</zhubs>
<resources>
<properties load="config.properties"></properties>
<listener value="com.zdemo.ZhubListener"/>
</resources>
<zhubs>
<zhub name="zhub" addr="47.111.150.118" port="6066" groupid="platf-zhub"/>
<zhub name="zhub2" addr="47.111.150.118" port="6066" groupid="platf-chat"/>
</zhubs>
<server protocol="HTTP" port="80">
<request>
<remoteaddr value="request.headers.X-Real-IP"/>

View File

@ -5,5 +5,5 @@ import java.util.logging.Logger;
public interface IProducer {
Logger logger = Logger.getLogger(IProducer.class.getSimpleName());
<V> void publish(String topic, V v);
boolean publish(String topic, Object v);
}

View File

@ -40,8 +40,9 @@ public class KafakProducer implements IProducer, Service {
}
@Override
public <V> void publish(String topic, V v) {
public boolean publish(String topic, Object v) {
producer.send(new ProducerRecord(topic, toStr(v)));
return true;
}
@Override

View File

@ -39,17 +39,18 @@ public class RedisProducer implements IProducer, Service {
}
@Override
public <V> void publish(String topic, V v) {
public boolean publish(String topic, Object v) {
try {
osw.write("PUBLISH " + topic + " '" + toStr(v) + "' \r\n");
osw.flush();
return true;
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
return false;
}
private <V> String toStr(V v) {
private String toStr(Object v) {
if (v instanceof String) {
return (String) v;
}

View File

@ -65,7 +65,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
groupid = config.getValue("groupid", groupid);
}
if (!initSocket()) {
if (!initSocket(0)) {
return;
}
// 消息 事件接收
@ -73,14 +73,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
try {
String readLine = reader.readLine();
if (readLine == null) { // 连接中断 处理
while (!initSocket()) {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理
continue;
}
String type = "";
@ -113,15 +107,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
timerQueue.put(timerMap.get(topic));
}
} catch (IOException e) {
logger.log(Level.WARNING, "reconnection ", e.getMessage());
if (e instanceof SocketException) {
while (!initSocket()) {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
initSocket(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
e.printStackTrace();
@ -170,7 +157,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ---------------------
// 消息发送
private void send(String... data) {
private boolean send(String... data) {
try {
lock.lock();
if (data.length == 1) {
@ -183,54 +170,68 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
}
writer.flush();
return true;
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
} finally {
lock.unlock();
}
return false;
}
private <V> String toStr(V v) {
private String toStr(Object v) {
if (v instanceof String) {
return (String) v;
}
return JsonConvert.root().convertTo(v);
}
protected boolean initSocket() {
try {
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
protected boolean initSocket(int retry) {
for (int i = 0; i <= retry; i++) {
try {
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
writer = client.getOutputStream();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
writer = client.getOutputStream();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
String groupid = getGroupid();
if (groupid == null || groupid.isEmpty()) {
throw new RuntimeException("ZHubClient groupid can not is empty");
String groupid = getGroupid();
if (groupid == null || groupid.isEmpty()) {
throw new RuntimeException("ZHubClient groupid can not is empty");
}
send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe");
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
send(buf.toString());
// 重连 timer 订阅
timerMap.forEach((name, timer) -> {
send("timer", name);
});
if (retry > 0) {
logger.log(Level.WARNING, String.format("ZHubClient[%s][%s] %s Succeed", getGroupid(), i + 1, retry > 0 ? "reconnection" : "init"));
} else {
logger.log(Level.FINE, String.format("ZHubClient[%s] %s Succeed", getGroupid(), retry > 0 ? "reconnection" : "init"));
}
return true;
} catch (Exception e) {
if (retry == 0 || i > 0) {
logger.log(Level.WARNING, String.format("ZHubClient[%s] %s Failed 初始化失败!", getGroupid(), retry == 0 ? "init" : "reconnection"), e);
} else {
logger.log(Level.WARNING, String.format("ZHubClient[%s][%s] reconnection Failed", getGroupid(), i + 1));
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe");
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
send(buf.toString());
// 重连 timer 订阅
timerMap.forEach((name, timer) -> {
send("timer", name);
});
} catch (IOException e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
return false;
} catch (Exception e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
return false;
}
return true;
return false;
}
@Override
@ -239,21 +240,21 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
super.removeEventType(topic);
}
public <V> void publish(String topic, V v) {
send("publish", topic, toStr(v));
public boolean publish(String topic, Object v) {
return send("publish", topic, toStr(v));
}
public <V> void broadcast(String topic, V v) {
public void broadcast(String topic, Object v) {
send("broadcast", topic, toStr(v));
}
// 发送 publish 主题消息若多次发送的 topic + "-" + value 相同将会做延时重置
public <V> void delay(String topic, V v, int delay) {
public void delay(String topic, Object v, int delay) {
send("delay", topic, toStr(v), String.valueOf(delay));
}
// 表达式支持d+[d,H,m,s]
public <V> void delay(String topic, V v, String delayExpr) {
public void delay(String topic, Object v, String delayExpr) {
String endchar = "";
int delay;
if (delayExpr.matches("^\\d+[d,H,m,s]$")) {
@ -326,6 +327,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
public void reloadTimer() {
send("cmd", "reload-timer-config");
send("cmd", "reload-timer");
}
}

View File

@ -1,12 +1,15 @@
package com.zdemo.test;
import com.zdemo.Event;
import com.zdemo.IProducer;
import org.junit.Test;
import org.redkale.boot.Application;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ -305,4 +308,11 @@ public class AppTest {
}
return 0;
}
@Test
public void testxx() {
Event of = Event.of("A", Map.of("b", 1));
System.out.println(JsonConvert.root().convertTo(of));
}
}