This commit is contained in:
lxy 2021-01-08 18:17:14 +08:00
parent d201cd3917
commit f12679c5a9
2 changed files with 202 additions and 0 deletions

View File

@ -0,0 +1,133 @@
package com.zdemo.zdb;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.logging.Level;
public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "property.zdb.host")
private String host = "39.108.56.246";
@Resource(name = "property.zdb.password")
private String password = "";
@Resource(name = "property.zdb.port")
private int port = 1216;
private Socket client;
private OutputStream os;
private BufferedReader reader;
@Override
public void init(AnyValue config) {
boolean flag = initSocket();
new Thread(() -> {
while (flag) {
String topic = "";
String value = "";
try {
String readLine = reader.readLine();
String type = "";
if ("*3".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"message".equals(type)) {
continue;
}
reader.readLine(); //$n len(key)
topic = reader.readLine(); // topic
reader.readLine(); //$n len(value)
value = reader.readLine(); // value
accept(topic, value);
}
} catch (IOException e) {
logger.log(Level.WARNING, "reconnection ", e.getMessage());
if (e instanceof SocketException) {
while (!initSocket()) {
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
}
}
}).start();
}
public boolean initSocket() {
try {
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
os = client.getOutputStream();
StringBuffer buf = new StringBuffer("subscribe");
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
buf.append("\r\n");
os.write(buf.toString().getBytes());
os.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
} catch (IOException e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
return false;
}
return true;
}
@Override
public String getGroupid() {
return null;
}
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
//新增订阅
try {
os.write(("subscribe " + topic + "\r\n").getBytes());
os.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}
}
@Override
public void unsubscribe(String topic) {
try {
os.write(("unsubscribe " + topic + "\r\n").getBytes());
os.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}

View File

@ -0,0 +1,69 @@
package com.zdemo.zdb;
import com.zdemo.Event;
import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.logging.Level;
public class ZdbProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.zdb.host")
private String host = "39.108.56.246";
@Resource(name = "property.zdb.password")
private String password = "";
@Resource(name = "property.zdb.port")
private int port = 1216;
private OutputStream os;
@Override
public void init(AnyValue config) {
Socket client = new Socket();
try {
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
os = client.getOutputStream();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void send(T t) {
try {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
os.write("*3\r\n".getBytes());
os.write("$7\r\n".getBytes());
os.write("publish\r\n".getBytes());
os.write(("$" + t.topic.length() + "\r\n").getBytes());
os.write((t.topic + "\r\n").getBytes());
os.write(("$" + v.length() + "\r\n").getBytes());
os.write((v + "\r\n").getBytes());
os.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
private byte[] toBytes(int v) {
byte[] result = new byte[4];
result[0] = (byte) ((v >> 24) & 0xFF);
result[1] = (byte) ((v >> 16) & 0xFF);
result[2] = (byte) ((v >> 8) & 0xFF);
result[3] = (byte) (v & 0xFF);
return result;
}
}