修改:消费监听使用专门线程

This commit is contained in:
lxy 2020-08-22 19:17:06 +08:00
parent 23365ae237
commit d45b39a309
5 changed files with 7 additions and 12 deletions

View File

@ -16,7 +16,6 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture;
/** /**
* 消费 * 消费
@ -33,7 +32,7 @@ public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Se
@Override @Override
public void init(AnyValue config) { public void init(AnyValue config) {
CompletableFuture.runAsync(() -> { new Thread(() -> {
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
Properties props = new Properties(); Properties props = new Properties();
props.load(fis); props.load(fis);
@ -59,6 +58,6 @@ public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Se
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
}); }).start();
} }
} }

View File

@ -50,7 +50,6 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
@Override @Override
public void send(T... t) { public void send(T... t) {
for (T x : t) { for (T x : t) {
logger.finest("send message: " + JsonConvert.root().convertTo(x));
producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x))); producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x)));
} }
} }

View File

@ -11,7 +11,6 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.CompletableFuture;
public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Service { public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Service {
@ -28,7 +27,7 @@ public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Se
@Override @Override
public void init(AnyValue config) { public void init(AnyValue config) {
CompletableFuture.runAsync(() -> { new Thread(() -> {
try { try {
Socket client = new Socket(); Socket client = new Socket();
client.connect(new InetSocketAddress(host, port)); client.connect(new InetSocketAddress(host, port));
@ -72,6 +71,6 @@ public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Se
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
}); }).start();
} }
} }

View File

@ -5,8 +5,6 @@ import com.zdemo.redis.RedisProducer;
import org.junit.Test; import org.junit.Test;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import java.util.Map;
/** /**
* 消息发布订阅测试 * 消息发布订阅测试
*/ */

View File

@ -1,14 +1,14 @@
package com.zdemo.test; package com.zdemo.test;
import com.zdemo.Event; import com.zdemo.Event;
import com.zdemo.redis.RedisConsumer; import com.zdemo.kafak.KafakConsumer;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.TypeToken; import org.redkale.util.TypeToken;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
public class MyConsumer extends RedisConsumer<Event<String>> { public class MyConsumer extends KafakConsumer<Event<String>> {
public String getGroupid() { public String getGroupid() {
return "group-test"; //questuserimlive return "group-test"; //questuserimlive
@ -16,7 +16,7 @@ public class MyConsumer extends RedisConsumer<Event<String>> {
@Override @Override
public Collection<String> getSubscribes() { public Collection<String> getSubscribes() {
return List.of("a", "b", "c"); return List.of("a", "b", "c", "vis-log");
} }
@Override @Override