From 4d5fb83b3c1252a0d1de1c436f4df16e2dba8ccf Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Fri, 8 Jan 2021 18:17:14 +0800 Subject: [PATCH] . --- src/com/zdemo/zdb/ZdbConsumer.java | 133 +++++++++++++++++++++++++++++ src/com/zdemo/zdb/ZdbProducer.java | 69 +++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 src/com/zdemo/zdb/ZdbConsumer.java create mode 100644 src/com/zdemo/zdb/ZdbProducer.java diff --git a/src/com/zdemo/zdb/ZdbConsumer.java b/src/com/zdemo/zdb/ZdbConsumer.java new file mode 100644 index 0000000..af34976 --- /dev/null +++ b/src/com/zdemo/zdb/ZdbConsumer.java @@ -0,0 +1,133 @@ +package com.zdemo.zdb; + +import com.zdemo.AbstractConsumer; +import com.zdemo.EventType; +import com.zdemo.IConsumer; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; + +import javax.annotation.Resource; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.util.logging.Level; + +public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service { + + @Resource(name = "property.zdb.host") + private String host = "39.108.56.246"; + @Resource(name = "property.zdb.password") + private String password = ""; + @Resource(name = "property.zdb.port") + private int port = 1216; + + private Socket client; + private OutputStream os; + private BufferedReader reader; + + @Override + public void init(AnyValue config) { + boolean flag = initSocket(); + new Thread(() -> { + while (flag) { + String topic = ""; + String value = ""; + try { + String readLine = reader.readLine(); + String type = ""; + if ("*3".equals(readLine)) { + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message + if (!"message".equals(type)) { + continue; + } + reader.readLine(); //$n len(key) + topic = reader.readLine(); // topic + + reader.readLine(); //$n len(value) + value = reader.readLine(); // value + accept(topic, value); + } + } catch (IOException e) { + logger.log(Level.WARNING, "reconnection ", e.getMessage()); + if (e instanceof SocketException) { + while (!initSocket()) { + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } + } + } catch (Exception e) { + logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e); + } + } + }).start(); + } + + public boolean initSocket() { + try { + client = new Socket(); + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); + + os = client.getOutputStream(); + + StringBuffer buf = new StringBuffer("subscribe"); + for (String topic : getTopics()) { + buf.append(" ").append(topic); + } + buf.append("\r\n"); + os.write(buf.toString().getBytes()); + os.flush(); + + reader = new BufferedReader(new InputStreamReader(client.getInputStream())); + } catch (IOException e) { + logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); + return false; + } + + return true; + } + + @Override + public String getGroupid() { + return null; + } + + @Override + public void addEventType(EventType... eventType) { + for (EventType type : eventType) { + String[] topics = type.topic.split(","); + for (String topic : topics) { + if (topic.isEmpty()) { + continue; + } + eventMap.put(topic, type); + + //新增订阅 + try { + os.write(("subscribe " + topic + "\r\n").getBytes()); + os.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } + } + } + } + + @Override + public void unsubscribe(String topic) { + try { + os.write(("unsubscribe " + topic + "\r\n").getBytes()); + os.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } + } +} diff --git a/src/com/zdemo/zdb/ZdbProducer.java b/src/com/zdemo/zdb/ZdbProducer.java new file mode 100644 index 0000000..98bc2d3 --- /dev/null +++ b/src/com/zdemo/zdb/ZdbProducer.java @@ -0,0 +1,69 @@ +package com.zdemo.zdb; + +import com.zdemo.Event; +import com.zdemo.IProducer; +import org.redkale.convert.json.JsonConvert; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; + +import javax.annotation.Resource; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.logging.Level; + +public class ZdbProducer implements IProducer, Service { + + @Resource(name = "property.zdb.host") + private String host = "39.108.56.246"; + @Resource(name = "property.zdb.password") + private String password = ""; + @Resource(name = "property.zdb.port") + private int port = 1216; + + private OutputStream os; + + @Override + public void init(AnyValue config) { + Socket client = new Socket(); + try { + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); + os = client.getOutputStream(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void send(T t) { + try { + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + + os.write("*3\r\n".getBytes()); + os.write("$7\r\n".getBytes()); + os.write("publish\r\n".getBytes()); + os.write(("$" + t.topic.length() + "\r\n").getBytes()); + os.write((t.topic + "\r\n").getBytes()); + os.write(("$" + v.length() + "\r\n").getBytes()); + os.write((v + "\r\n").getBytes()); + os.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } + } + + private byte[] toBytes(int v) { + byte[] result = new byte[4]; + result[0] = (byte) ((v >> 24) & 0xFF); + result[1] = (byte) ((v >> 16) & 0xFF); + result[2] = (byte) ((v >> 8) & 0xFF); + result[3] = (byte) (v & 0xFF); + return result; + } + +}