This commit is contained in:
lxy 2020-08-22 11:52:21 +08:00
parent 17b28a9124
commit 23365ae237
3 changed files with 44 additions and 38 deletions

View File

@ -21,6 +21,9 @@ public interface IConsumer<T extends Event> {
String _value = value.split("\"value\":")[1]; String _value = value.split("\"value\":")[1];
_value = _value.substring(0, _value.length() - 1); _value = _value.substring(0, _value.length() - 1);
Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value.replace(_value, "")); Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value.replace(_value, ""));
if (_value.startsWith("\"") && _value.endsWith("\"")) {
_value = _value.substring(1, _value.length() -1);
}
t.setValue(_value); t.setValue(_value);
accept((T) t); accept((T) t);
} else { } else {

View File

@ -11,6 +11,7 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.CompletableFuture;
public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Service { public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Service {
@ -27,48 +28,50 @@ public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Se
@Override @Override
public void init(AnyValue config) { public void init(AnyValue config) {
try { CompletableFuture.runAsync(() -> {
Socket client = new Socket(); try {
client.connect(new InetSocketAddress(host, port)); Socket client = new Socket();
client.setKeepAlive(true); client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream()); OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream());
oswSub.write("AUTH " + password + "\r\n"); oswSub.write("AUTH " + password + "\r\n");
oswSub.flush(); oswSub.flush();
StringBuffer buf = new StringBuffer("SUBSCRIBE"); StringBuffer buf = new StringBuffer("SUBSCRIBE");
for (String topic : getSubscribes()) { for (String topic : getSubscribes()) {
buf.append(" ").append(topic); buf.append(" ").append(topic);
} }
buf.append(" _ping\r\n"); buf.append(" _ping\r\n");
oswSub.write(buf.toString()); oswSub.write(buf.toString());
oswSub.flush(); oswSub.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream())); BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
String type = ""; String type = "";
String readLine; String readLine;
while ((readLine = br.readLine()) != null) { while ((readLine = br.readLine()) != null) {
if ("*3".equals(readLine)) { if ("*3".equals(readLine)) {
br.readLine(); // $7 len() br.readLine(); // $7 len()
type = br.readLine(); // message type = br.readLine(); // message
if (!"message".equals(type)) { if (!"message".equals(type)) {
continue; continue;
} }
br.readLine(); //$n len(key) br.readLine(); //$n len(key)
String topic = br.readLine(); // topic String topic = br.readLine(); // topic
br.readLine(); //$n len(value) br.readLine(); //$n len(value)
String value = br.readLine(); // value String value = br.readLine(); // value
try { try {
accept(value); accept(value);
} catch (Exception e) { } catch (Exception e) {
logger.warning("event accept error :" + value); logger.warning("event accept error :" + value);
e.printStackTrace(); e.printStackTrace();
}
} }
} }
} catch (Exception e) {
e.printStackTrace();
} }
} catch (Exception e) { });
e.printStackTrace();
}
} }
} }

View File

@ -33,10 +33,10 @@ public class AppTest {
try { try {
RedisProducer producer = Application.singleton(RedisProducer.class); RedisProducer producer = Application.singleton(RedisProducer.class);
Event<Map> event = new Event<>(); Event event = new Event<>();
event.setTopic("c"); event.setTopic("c");
event.setKey("abx"); event.setKey("abx");
event.setValue(Map.of("A", "a")); event.setValue(1f);
producer.send(event); producer.send(event);