diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index a2aec9b..e35a33b 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -16,7 +16,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; import java.util.Properties; -import java.util.concurrent.CompletableFuture; /** * 消费 @@ -33,7 +32,7 @@ public abstract class KafakConsumer implements IConsumer, Se @Override public void init(AnyValue config) { - CompletableFuture.runAsync(() -> { + new Thread(() -> { try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { Properties props = new Properties(); props.load(fis); @@ -59,6 +58,6 @@ public abstract class KafakConsumer implements IConsumer, Se } catch (IOException e) { e.printStackTrace(); } - }); + }).start(); } } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index 43f45cc..580ca5c 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -50,7 +50,6 @@ public class KafakProducer implements IProducer, Service { @Override public void send(T... t) { for (T x : t) { - logger.finest("send message: " + JsonConvert.root().convertTo(x)); producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x))); } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 6e54bf5..6379e31 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -11,7 +11,6 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.concurrent.CompletableFuture; public abstract class RedisConsumer implements IConsumer, Service { @@ -28,7 +27,7 @@ public abstract class RedisConsumer implements IConsumer, Se @Override public void init(AnyValue config) { - CompletableFuture.runAsync(() -> { + new Thread(() -> { try { Socket client = new Socket(); client.connect(new InetSocketAddress(host, port)); @@ -72,6 +71,6 @@ public abstract class RedisConsumer implements IConsumer, Se } catch (Exception e) { e.printStackTrace(); } - }); + }).start(); } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 21bb8a2..641795c 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -5,8 +5,6 @@ import com.zdemo.redis.RedisProducer; import org.junit.Test; import org.redkale.boot.Application; -import java.util.Map; - /** * 消息发布订阅测试 */ diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 18aef05..6e4727a 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,14 +1,14 @@ package com.zdemo.test; import com.zdemo.Event; -import com.zdemo.redis.RedisConsumer; +import com.zdemo.kafak.KafakConsumer; import org.redkale.convert.json.JsonConvert; import org.redkale.util.TypeToken; import java.util.Collection; import java.util.List; -public class MyConsumer extends RedisConsumer> { +public class MyConsumer extends KafakConsumer> { public String getGroupid() { return "group-test"; //quest、user、im、live @@ -16,7 +16,7 @@ public class MyConsumer extends RedisConsumer> { @Override public Collection getSubscribes() { - return List.of("a", "b", "c"); + return List.of("a", "b", "c", "vis-log"); } @Override