This commit is contained in:
lxy
2020-10-10 15:34:16 +08:00
parent 459b31e750
commit 7124938714
12 changed files with 99 additions and 1708 deletions

View File

@@ -1,15 +1,10 @@
package com.zdemo;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
public interface IProducer<T extends Event> {
Logger logger = Logger.getLogger(IProducer.class.getSimpleName());
default CompletableFuture sendAsync(T... t) {
return CompletableFuture.runAsync(() -> send(t));
}
void send(T... t);
void send(T t);
}

View File

@@ -41,14 +41,12 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
}
@Override
public void send(T... t) {
for (T x : t) {
String v = JsonConvert.root().convertTo(x.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
producer.send(new ProducerRecord(x.topic, v));
public void send(T t) {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
producer.send(new ProducerRecord(t.topic, v));
}
@Override

View File

@@ -1,12 +1,14 @@
package com.zdemo.redis;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
@@ -22,41 +24,50 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume
@Resource(name = "property.redis.port")
private int port = 6379;
private Socket client;
private OutputStreamWriter writer;
private BufferedReader reader;
@Override
public void init(AnyValue config) {
try {
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
writer = new OutputStreamWriter(client.getOutputStream());
writer.write("AUTH " + password + "\r\n");
writer.flush();
StringBuffer buf = new StringBuffer("SUBSCRIBE");
for (String topic : getSubscribes()) {
buf.append(" ").append(topic);
}
buf.append(" _\r\n");
writer.write(buf.toString());
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
} catch (IOException e) {
logger.log(Level.WARNING, "Redis Consumer 初始化失败!", e);
}
new Thread(() -> {
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream());
oswSub.write("AUTH " + password + "\r\n");
oswSub.flush();
StringBuffer buf = new StringBuffer("SUBSCRIBE");
for (String topic : getSubscribes()) {
buf.append(" ").append(topic);
}
buf.append(" _ping\r\n");
oswSub.write(buf.toString());
oswSub.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
String type = "";
String readLine;
while ((readLine = br.readLine()) != null) {
while (true) {
String readLine = reader.readLine();
String type = "";
if ("*3".equals(readLine)) {
br.readLine(); // $7 len()
type = br.readLine(); // message
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"message".equals(type)) {
continue;
}
br.readLine(); //$n len(key)
String topic = br.readLine(); // topic
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // topic
br.readLine(); //$n len(value)
String value = br.readLine(); // value
reader.readLine(); //$n len(value)
String value = reader.readLine(); // value
try {
accept(topic, value);
} catch (Exception e) {
@@ -64,9 +75,30 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume
}
}
}
} catch (Exception e) {
logger.log(Level.WARNING, "Redis Consumer 初始化失败!", e);
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}).start();
}
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
//新增订阅
try {
writer.write("SUBSCRIBE " + topic + "\r\n");
writer.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}
}
}

View File

@@ -22,7 +22,7 @@ public class RedisProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.redis.port")
private int port = 6379;
private OutputStreamWriter oswPub;
private OutputStreamWriter osw;
@Override
public void init(AnyValue config) {
@@ -31,23 +31,22 @@ public class RedisProducer<T extends Event> implements IProducer<T>, Service {
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
oswPub = new OutputStreamWriter(client.getOutputStream());
oswPub.write("AUTH " + password + "\r\n");
oswPub.flush();
osw = new OutputStreamWriter(client.getOutputStream());
osw.write("AUTH " + password + "\r\n");
osw.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
@Override
public void send(T... t) {
for (T x : t) {
try {
oswPub.write("PUBLISH " + x.topic + " '" + JsonConvert.root().convertTo(x.value) + "' \r\n");
oswPub.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
public void send(T t) {
try {
osw.write("PUBLISH " + t.topic + " '" + JsonConvert.root().convertTo(t.value) + "' \r\n");
osw.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}