diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index dfcf47c..33c05f5 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -21,6 +21,9 @@ public interface IConsumer { String _value = value.split("\"value\":")[1]; _value = _value.substring(0, _value.length() - 1); Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value.replace(_value, "’‘")); + if (_value.startsWith("\"") && _value.endsWith("\"")) { + _value = _value.substring(1, _value.length() -1); + } t.setValue(_value); accept((T) t); } else { diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 0b1f592..6e54bf5 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -11,6 +11,7 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.CompletableFuture; public abstract class RedisConsumer implements IConsumer, Service { @@ -27,48 +28,50 @@ public abstract class RedisConsumer implements IConsumer, Se @Override public void init(AnyValue config) { - try { - Socket client = new Socket(); - client.connect(new InetSocketAddress(host, port)); - client.setKeepAlive(true); + CompletableFuture.runAsync(() -> { + try { + Socket client = new Socket(); + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); - OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream()); - oswSub.write("AUTH " + password + "\r\n"); - oswSub.flush(); + OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream()); + oswSub.write("AUTH " + password + "\r\n"); + oswSub.flush(); - StringBuffer buf = new StringBuffer("SUBSCRIBE"); - for (String topic : getSubscribes()) { - buf.append(" ").append(topic); - } - buf.append(" _ping\r\n"); - oswSub.write(buf.toString()); - oswSub.flush(); + StringBuffer buf = new StringBuffer("SUBSCRIBE"); + for (String topic : getSubscribes()) { + buf.append(" ").append(topic); + } + buf.append(" _ping\r\n"); + oswSub.write(buf.toString()); + oswSub.flush(); - BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream())); - String type = ""; - String readLine; - while ((readLine = br.readLine()) != null) { - if ("*3".equals(readLine)) { - br.readLine(); // $7 len() - type = br.readLine(); // message - if (!"message".equals(type)) { - continue; - } - br.readLine(); //$n len(key) - String topic = br.readLine(); // topic + BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream())); + String type = ""; + String readLine; + while ((readLine = br.readLine()) != null) { + if ("*3".equals(readLine)) { + br.readLine(); // $7 len() + type = br.readLine(); // message + if (!"message".equals(type)) { + continue; + } + br.readLine(); //$n len(key) + String topic = br.readLine(); // topic - br.readLine(); //$n len(value) - String value = br.readLine(); // value - try { - accept(value); - } catch (Exception e) { - logger.warning("event accept error :" + value); - e.printStackTrace(); + br.readLine(); //$n len(value) + String value = br.readLine(); // value + try { + accept(value); + } catch (Exception e) { + logger.warning("event accept error :" + value); + e.printStackTrace(); + } } } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); - } + }); } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index e5e1bed..21bb8a2 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -33,10 +33,10 @@ public class AppTest { try { RedisProducer producer = Application.singleton(RedisProducer.class); - Event event = new Event<>(); + Event event = new Event<>(); event.setTopic("c"); event.setKey("abx"); - event.setValue(Map.of("A", "a")); + event.setValue(1f); producer.send(event);