.
This commit is contained in:
parent
1ce4ccdbc7
commit
f06af4e31f
@ -2,7 +2,9 @@ package com.zdemo;
|
|||||||
|
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Liang
|
* @author Liang
|
||||||
@ -32,11 +34,11 @@ public abstract class AbstractConsumer implements IConsumer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final Set<String> getTopics() {
|
public final Set<String> getTopics() {
|
||||||
Set<String> keySet = eventMap.keySet();
|
if (!eventMap.isEmpty()) {
|
||||||
if (keySet.isEmpty()) {
|
return eventMap.keySet();
|
||||||
keySet.add("-");
|
|
||||||
}
|
}
|
||||||
return keySet;
|
|
||||||
|
return Set.of("-");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -42,7 +42,11 @@ public class RedisProducer<T extends Event> implements IProducer<T>, Service {
|
|||||||
@Override
|
@Override
|
||||||
public void send(T t) {
|
public void send(T t) {
|
||||||
try {
|
try {
|
||||||
osw.write("PUBLISH " + t.topic + " '" + JsonConvert.root().convertTo(t.value) + "' \r\n");
|
String v = JsonConvert.root().convertTo(t.value);
|
||||||
|
if (v.startsWith("\"") && v.endsWith("\"")) {
|
||||||
|
v = v.substring(1, v.length() - 1);
|
||||||
|
}
|
||||||
|
osw.write("PUBLISH " + t.topic + " '" + v + "' \r\n");
|
||||||
osw.flush();
|
osw.flush();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.log(Level.WARNING, "", e);
|
logger.log(Level.WARNING, "", e);
|
||||||
|
@ -4,11 +4,9 @@ import com.zdemo.Event;
|
|||||||
import com.zdemo.EventType;
|
import com.zdemo.EventType;
|
||||||
import com.zdemo.IConsumer;
|
import com.zdemo.IConsumer;
|
||||||
import com.zdemo.IProducer;
|
import com.zdemo.IProducer;
|
||||||
import com.zdemo.pulsar.PulsarProducer;
|
import com.zdemo.kafak.KafakProducer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
|
||||||
import org.redkale.util.TypeToken;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -27,29 +25,11 @@ public class AppTest {
|
|||||||
IConsumer consumer = Application.singleton(MyConsumer.class);
|
IConsumer consumer = Application.singleton(MyConsumer.class);
|
||||||
|
|
||||||
consumer.addEventType(
|
consumer.addEventType(
|
||||||
EventType.of("a1", new TypeToken<Float>() {
|
EventType.of("a", str -> {
|
||||||
}, r -> {
|
System.out.println("我收到了消息 a 事件:" + str);
|
||||||
System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r));
|
|
||||||
})
|
})
|
||||||
|
|
||||||
, EventType.of("game-update", str -> {
|
, EventType.of("bx", str -> {
|
||||||
System.out.println("我收到了消息 主题game-update 事件:" + str);
|
|
||||||
})
|
|
||||||
|
|
||||||
, EventType.of("http.req.hello", str -> {
|
|
||||||
System.out.println("我收到了消息 主题http.req.hello 事件:" + str);
|
|
||||||
})
|
|
||||||
|
|
||||||
, EventType.of("http.resp.node2004", str -> {
|
|
||||||
System.out.println("我收到了消息 主题http.resp.node2004 事件:" + str);
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
// 10s 后加入 bx主题
|
|
||||||
Thread.sleep(1_000 * 10);
|
|
||||||
System.out.println("加入新的主题订阅");
|
|
||||||
consumer.addEventType(
|
|
||||||
EventType.of("bx", str -> {
|
|
||||||
System.out.println("我收到了消息 主题bx 事件:" + str);
|
System.out.println("我收到了消息 主题bx 事件:" + str);
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
@ -64,7 +44,7 @@ public class AppTest {
|
|||||||
@Test
|
@Test
|
||||||
public void runProducer() {
|
public void runProducer() {
|
||||||
try {
|
try {
|
||||||
IProducer producer = Application.singleton(PulsarProducer.class);
|
IProducer producer = Application.singleton(KafakProducer.class);
|
||||||
|
|
||||||
// 发送不同的 事件
|
// 发送不同的 事件
|
||||||
float v0 = 1f;
|
float v0 = 1f;
|
||||||
@ -75,14 +55,17 @@ public class AppTest {
|
|||||||
/*producer.send(Event.of("b1", v1));
|
/*producer.send(Event.of("b1", v1));
|
||||||
producer.send(Event.of("c1", v2));*/
|
producer.send(Event.of("c1", v2));*/
|
||||||
|
|
||||||
producer.send(Event.of("game-update", 23256));
|
/*producer.send(Event.of("game-update", 23256));
|
||||||
producer.send(Event.of("bx", 23256));
|
producer.send(Event.of("bx", 23256));*/
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
producer.send(Event.of("a", i + ""));
|
||||||
|
}
|
||||||
|
|
||||||
/*try {
|
try {
|
||||||
Thread.sleep(1_000);
|
Thread.sleep(1_000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}*/
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package com.zdemo.test;
|
package com.zdemo.test;
|
||||||
|
|
||||||
import com.zdemo.pulsar.PulsarConsumer;
|
import com.zdemo.kafak.KafakConsumer;
|
||||||
|
|
||||||
public class MyConsumer extends PulsarConsumer {
|
public class MyConsumer extends KafakConsumer {
|
||||||
|
|
||||||
public String getGroupid() {
|
public String getGroupid() {
|
||||||
return "group-test"; //消费组名称
|
return "group-test"; //消费组名称
|
||||||
|
Loading…
Reference in New Issue
Block a user