新增:ztimer 客户端

This commit is contained in:
lxy 2021-01-08 19:51:36 +08:00
parent 4d5fb83b3c
commit 26a1fc4971
4 changed files with 96 additions and 44 deletions

View File

@ -14,9 +14,11 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service {
public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "property.zdb.host")
private String host = "39.108.56.246";
@ -25,8 +27,11 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service
@Resource(name = "property.zdb.port")
private int port = 1216;
private ReentrantLock lock = new ReentrantLock();
private Socket client;
private OutputStream os;
private OutputStream writer;
private BufferedReader reader;
@Override
@ -39,6 +44,8 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service
try {
String readLine = reader.readLine();
String type = "";
// 主题订阅消息
if ("*3".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
@ -52,6 +59,20 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service
value = reader.readLine(); // value
accept(topic, value);
}
// timer 消息
if ("*2".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"timer".equals(type)) {
continue;
}
reader.readLine(); //$n len(key)
topic = reader.readLine(); // name
accept(topic, value);
}
} catch (IOException e) {
logger.log(Level.WARNING, "reconnection ", e.getMessage());
if (e instanceof SocketException) {
@ -70,23 +91,48 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service
}).start();
}
// ---------------------
// 消息发送类
private void send(String... data) {
try {
lock.lock();
if (data.length == 1) {
writer.write((data[0] + "\r\n").getBytes());
} else if (data.length > 1) {
writer.write(("*" + data.length + "\r\n").getBytes());
for (String d : data) {
writer.write(("$" + d.length() + "\r\n").getBytes());
writer.write((d + "\r\n").getBytes());
}
}
writer.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
} finally {
lock.unlock();
}
}
public boolean initSocket() {
try {
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
os = client.getOutputStream();
writer = client.getOutputStream();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
send("groupid " + getGroupid());
StringBuffer buf = new StringBuffer("subscribe");
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
buf.append("\r\n");
os.write(buf.toString().getBytes());
os.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
// todo: 重连 timer 订阅 需要
send(buf.toString());
} catch (IOException e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
return false;
@ -95,11 +141,6 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service
return true;
}
@Override
public String getGroupid() {
return null;
}
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
@ -111,23 +152,21 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service
eventMap.put(topic, type);
//新增订阅
try {
os.write(("subscribe " + topic + "\r\n").getBytes());
os.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
send("subscribe " + topic);
}
}
}
@Override
public void unsubscribe(String topic) {
try {
os.write(("unsubscribe " + topic + "\r\n").getBytes());
os.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
send("unsubscribe " + topic);
}
// timer
private ConcurrentHashMap<String, Runnable> timerMap = new ConcurrentHashMap();
public void timer(String name, String expr, Runnable run) {
timerMap.put(name, run);
send("timer", name, expr);
}
}

View File

@ -11,9 +11,10 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
public class ZdbProducer<T extends Event> implements IProducer<T>, Service {
public class ZHubProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.zdb.host")
private String host = "39.108.56.246";
@ -22,6 +23,8 @@ public class ZdbProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.zdb.port")
private int port = 1216;
private ReentrantLock lock = new ReentrantLock();
private OutputStream os;
@Override
@ -38,22 +41,29 @@ public class ZdbProducer<T extends Event> implements IProducer<T>, Service {
@Override
public void send(T t) {
try {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
send("publish", t.topic, v);
}
os.write("*3\r\n".getBytes());
os.write("$7\r\n".getBytes());
os.write("publish\r\n".getBytes());
os.write(("$" + t.topic.length() + "\r\n").getBytes());
os.write((t.topic + "\r\n").getBytes());
os.write(("$" + v.length() + "\r\n").getBytes());
os.write((v + "\r\n").getBytes());
os.flush();
private void send(String... data) {
try {
lock.lock();
if (data.length == 1) {
os.write((data[0] + "\r\n").getBytes());
} else if (data.length > 1) {
os.write(("*" + data.length + "\r\n").getBytes());
for (String d : data) {
os.write(("$" + d.length() + "\r\n").getBytes());
os.write((d + "\r\n").getBytes());
}
}
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
} finally {
lock.unlock();
}
}

View File

@ -4,7 +4,7 @@ import com.zdemo.Event;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import com.zdemo.IProducer;
import com.zdemo.zdb.ZdbProducer;
import com.zdemo.zdb.ZHubProducer;
import org.junit.Test;
import org.redkale.boot.Application;
@ -26,7 +26,7 @@ public class AppTest {
public void runConsumer() {
try {
//启动并开启消费监听
IConsumer consumer = Application.singleton(MyConsumer.class);
MyConsumer consumer = Application.singleton(MyConsumer.class);
consumer.addEventType(
EventType.of("a", str -> {
@ -38,6 +38,10 @@ public class AppTest {
})
);
consumer.timer("a", "* * * * * *", () -> {
System.out.println("timer a 执行了");
});
Thread.sleep(60_000 * 60);
} catch (Exception e) {
@ -48,7 +52,7 @@ public class AppTest {
@Test
public void runProducer() {
try {
IProducer producer = Application.singleton(ZdbProducer.class);
IProducer producer = Application.singleton(ZHubProducer.class);
// 发送不同的 事件
float v0 = 1f;
@ -79,7 +83,6 @@ public class AppTest {
@Test
public void t() {
List<String> list = new ArrayList<>();
list.toArray(String[]::new);
@ -129,7 +132,7 @@ public class AppTest {
public void yy() {
IProducer producer = null;
try {
producer = Application.singleton(ZdbProducer.class);
producer = Application.singleton(ZHubProducer.class);
for (int i = 0; i < 100; i++) {

View File

@ -1,8 +1,8 @@
package com.zdemo.test;
import com.zdemo.zdb.ZdbConsumer;
import com.zdemo.zdb.ZHubConsumer;
public class MyConsumer extends ZdbConsumer {
public class MyConsumer extends ZHubConsumer {
public String getGroupid() {
return "group-test"; //消费组名称