This commit is contained in:
lxy 2020-11-30 09:25:23 +08:00
parent 9138790fd2
commit 10472078b0
4 changed files with 26 additions and 37 deletions

View File

@ -2,7 +2,9 @@ package com.zdemo;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import java.util.*; import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/** /**
* @author Liang * @author Liang
@ -32,11 +34,11 @@ public abstract class AbstractConsumer implements IConsumer {
@Override @Override
public final Set<String> getTopics() { public final Set<String> getTopics() {
Set<String> keySet = eventMap.keySet(); if (!eventMap.isEmpty()) {
if (keySet.isEmpty()) { return eventMap.keySet();
keySet.add("-");
} }
return keySet;
return Set.of("-");
} }
@Override @Override

View File

@ -42,7 +42,11 @@ public class RedisProducer<T extends Event> implements IProducer<T>, Service {
@Override @Override
public void send(T t) { public void send(T t) {
try { try {
osw.write("PUBLISH " + t.topic + " '" + JsonConvert.root().convertTo(t.value) + "' \r\n"); String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
osw.write("PUBLISH " + t.topic + " '" + v + "' \r\n");
osw.flush(); osw.flush();
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.WARNING, "", e); logger.log(Level.WARNING, "", e);

View File

@ -4,11 +4,9 @@ import com.zdemo.Event;
import com.zdemo.EventType; import com.zdemo.EventType;
import com.zdemo.IConsumer; import com.zdemo.IConsumer;
import com.zdemo.IProducer; import com.zdemo.IProducer;
import com.zdemo.pulsar.PulsarProducer; import com.zdemo.kafak.KafakProducer;
import org.junit.Test; import org.junit.Test;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.TypeToken;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -27,29 +25,11 @@ public class AppTest {
IConsumer consumer = Application.singleton(MyConsumer.class); IConsumer consumer = Application.singleton(MyConsumer.class);
consumer.addEventType( consumer.addEventType(
EventType.of("a1", new TypeToken<Float>() { EventType.of("a", str -> {
}, r -> { System.out.println("我收到了消息 a 事件:" + str);
System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r));
}) })
, EventType.of("game-update", str -> { , EventType.of("bx", str -> {
System.out.println("我收到了消息 主题game-update 事件:" + str);
})
, EventType.of("http.req.hello", str -> {
System.out.println("我收到了消息 主题http.req.hello 事件:" + str);
})
, EventType.of("http.resp.node2004", str -> {
System.out.println("我收到了消息 主题http.resp.node2004 事件:" + str);
})
);
// 10s 后加入 bx主题
Thread.sleep(1_000 * 10);
System.out.println("加入新的主题订阅");
consumer.addEventType(
EventType.of("bx", str -> {
System.out.println("我收到了消息 主题bx 事件:" + str); System.out.println("我收到了消息 主题bx 事件:" + str);
}) })
); );
@ -64,7 +44,7 @@ public class AppTest {
@Test @Test
public void runProducer() { public void runProducer() {
try { try {
IProducer producer = Application.singleton(PulsarProducer.class); IProducer producer = Application.singleton(KafakProducer.class);
// 发送不同的 事件 // 发送不同的 事件
float v0 = 1f; float v0 = 1f;
@ -75,14 +55,17 @@ public class AppTest {
/*producer.send(Event.of("b1", v1)); /*producer.send(Event.of("b1", v1));
producer.send(Event.of("c1", v2));*/ producer.send(Event.of("c1", v2));*/
producer.send(Event.of("game-update", 23256)); /*producer.send(Event.of("game-update", 23256));
producer.send(Event.of("bx", 23256)); producer.send(Event.of("bx", 23256));*/
for (int i = 0; i < 5; i++) {
producer.send(Event.of("a", i + ""));
}
/*try { try {
Thread.sleep(1_000); Thread.sleep(1_000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
}*/ }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -1,8 +1,8 @@
package com.zdemo.test; package com.zdemo.test;
import com.zdemo.pulsar.PulsarConsumer; import com.zdemo.kafak.KafakConsumer;
public class MyConsumer extends PulsarConsumer { public class MyConsumer extends KafakConsumer {
public String getGroupid() { public String getGroupid() {
return "group-test"; //消费组名称 return "group-test"; //消费组名称