From d45b39a30977f600a17854db161fcd733b01c060 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Sat, 22 Aug 2020 19:17:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=9B=91=E5=90=AC=E4=BD=BF=E7=94=A8=E4=B8=93=E9=97=A8=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/zdemo/kafak/KafakConsumer.java | 5 ++--- src/com/zdemo/kafak/KafakProducer.java | 1 - src/com/zdemo/redis/RedisConsumer.java | 5 ++--- test/com/zdemo/test/AppTest.java | 2 -- test/com/zdemo/test/MyConsumer.java | 6 +++--- 5 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index a2aec9b..e35a33b 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -16,7 +16,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; import java.util.Properties; -import java.util.concurrent.CompletableFuture; /** * 消费 @@ -33,7 +32,7 @@ public abstract class KafakConsumer implements IConsumer, Se @Override public void init(AnyValue config) { - CompletableFuture.runAsync(() -> { + new Thread(() -> { try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { Properties props = new Properties(); props.load(fis); @@ -59,6 +58,6 @@ public abstract class KafakConsumer implements IConsumer, Se } catch (IOException e) { e.printStackTrace(); } - }); + }).start(); } } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index 43f45cc..580ca5c 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -50,7 +50,6 @@ public class KafakProducer implements IProducer, Service { @Override public void send(T... t) { for (T x : t) { - logger.finest("send message: " + JsonConvert.root().convertTo(x)); producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x))); } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 6e54bf5..6379e31 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -11,7 +11,6 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.concurrent.CompletableFuture; public abstract class RedisConsumer implements IConsumer, Service { @@ -28,7 +27,7 @@ public abstract class RedisConsumer implements IConsumer, Se @Override public void init(AnyValue config) { - CompletableFuture.runAsync(() -> { + new Thread(() -> { try { Socket client = new Socket(); client.connect(new InetSocketAddress(host, port)); @@ -72,6 +71,6 @@ public abstract class RedisConsumer implements IConsumer, Se } catch (Exception e) { e.printStackTrace(); } - }); + }).start(); } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 21bb8a2..641795c 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -5,8 +5,6 @@ import com.zdemo.redis.RedisProducer; import org.junit.Test; import org.redkale.boot.Application; -import java.util.Map; - /** * 消息发布订阅测试 */ diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 18aef05..6e4727a 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,14 +1,14 @@ package com.zdemo.test; import com.zdemo.Event; -import com.zdemo.redis.RedisConsumer; +import com.zdemo.kafak.KafakConsumer; import org.redkale.convert.json.JsonConvert; import org.redkale.util.TypeToken; import java.util.Collection; import java.util.List; -public class MyConsumer extends RedisConsumer> { +public class MyConsumer extends KafakConsumer> { public String getGroupid() { return "group-test"; //quest、user、im、live @@ -16,7 +16,7 @@ public class MyConsumer extends RedisConsumer> { @Override public Collection getSubscribes() { - return List.of("a", "b", "c"); + return List.of("a", "b", "c", "vis-log"); } @Override