修改:1.定时调度调整 2.zhub 服务 producer、consumer 合并为 ZHubClient

This commit is contained in:
lxy
2021-01-12 19:00:00 +08:00
parent d49bc65538
commit b458186ed8
7 changed files with 30 additions and 111 deletions

View File

@@ -5,7 +5,6 @@ import org.redkale.convert.json.JsonConvert;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
/**
* @author Liang
@@ -13,8 +12,6 @@ import java.util.logging.Logger;
*/
public abstract class AbstractConsumer implements IConsumer {
public Logger logger = Logger.getLogger(this.getClass().getSimpleName());
public final Map<String, EventType> eventMap = new HashMap<>();
public abstract String getGroupid();

View File

@@ -18,6 +18,7 @@ import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 消费
@@ -25,6 +26,8 @@ import java.util.logging.Level;
@RestService
public abstract class KafakConsumer extends AbstractConsumer implements IConsumer, Service {
public Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Resource(name = "APP_HOME")
protected File APP_HOME;

View File

@@ -14,9 +14,12 @@ import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
public class RedisConsumer extends AbstractConsumer implements IConsumer, Service {
public Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Resource(name = "property.redis.host")
private String host = "127.0.0.1";
@Resource(name = "property.redis.password")

View File

@@ -1,8 +1,7 @@
package com.zdemo.zdb;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import com.zdemo.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
@@ -17,8 +16,11 @@ import java.net.SocketException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service {
public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
Logger logger = Logger.getLogger(IProducer.class.getSimpleName());
@Resource(name = "property.zhub.host")
private String host = "127.0.0.1";
@@ -106,7 +108,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
}
// ---------------------
// 消息发送
// 消息发送
private void send(String... data) {
try {
lock.lock();
@@ -127,6 +129,14 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
}
}
public void send(Event t) {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
send("publish", t.topic, v);
}
public boolean initSocket() {
try {
client = new Socket();

View File

@@ -1,79 +0,0 @@
package com.zdemo.zdb;
import com.zdemo.Event;
import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
public class ZHubProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.zdb.host")
private String host = "39.108.56.246";
@Resource(name = "property.zdb.password")
private String password = "";
@Resource(name = "property.zdb.port")
private int port = 1216;
private ReentrantLock lock = new ReentrantLock();
private OutputStream os;
@Override
public void init(AnyValue config) {
Socket client = new Socket();
try {
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
os = client.getOutputStream();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void send(T t) {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
send("publish", t.topic, v);
}
private void send(String... data) {
try {
lock.lock();
if (data.length == 1) {
os.write((data[0] + "\r\n").getBytes());
} else if (data.length > 1) {
os.write(("*" + data.length + "\r\n").getBytes());
for (String d : data) {
os.write(("$" + d.length() + "\r\n").getBytes());
os.write((d + "\r\n").getBytes());
}
}
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
} finally {
lock.unlock();
}
}
private byte[] toBytes(int v) {
byte[] result = new byte[4];
result[0] = (byte) ((v >> 24) & 0xFF);
result[1] = (byte) ((v >> 16) & 0xFF);
result[2] = (byte) ((v >> 8) & 0xFF);
result[3] = (byte) (v & 0xFF);
return result;
}
}