新增:pulsar 消费订阅实现

This commit is contained in:
lxy 2020-10-29 12:29:08 +08:00
parent 5ddf349c4f
commit 3c084723ed
8 changed files with 212 additions and 18 deletions

View File

@ -3,7 +3,7 @@
<application port="2001"> <application port="2001">
<resources> <resources>
<properties load="redis.properties"></properties> <properties load="config.properties"></properties>
</resources> </resources>
<server protocol="HTTP" port="80"> <server protocol="HTTP" port="80">

7
conf/config.properties Normal file
View File

@ -0,0 +1,7 @@
# redis
redis.host=47.111.150.118
redis.password=*Zhong9307!
redis.port=6064
# pulsar
pulsar.serviceurl=pulsar://47.113.228.247:6650

View File

@ -1,4 +0,0 @@
# redis
redis.host=47.111.150.118
redis.password=*Zhong9307!
redis.port=6064

View File

@ -2,9 +2,9 @@ package com.zdemo;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* @author Liang * @author Liang
@ -33,7 +33,7 @@ public abstract class AbstractConsumer implements IConsumer {
} }
@Override @Override
public final Collection<String> getSubscribes() { public final Set<String> getSubscribes() {
return eventMap.keySet(); return eventMap.keySet();
} }

View File

@ -0,0 +1,93 @@
package com.zdemo.pulsar;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import org.apache.pulsar.client.api.*;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
public abstract class PulsarConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "property.pulsar.serviceurl")
private String serviceurl = "pulsar://127.0.0.1:6650";
private PulsarClient client;
private Consumer consumer;
public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>();
@Override
public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) {
queue.put(eventType);
}
} catch (InterruptedException e) {
logger.log(Level.WARNING, "", e);
}
}
@Override
public void init(AnyValue config) {
new Thread(() -> {
try {
client = PulsarClient.builder()
.serviceUrl(serviceurl)
.build();
consumer = client.newConsumer()
.topics(new ArrayList<>(getSubscribes()))
.subscriptionName(getGroupid())
.subscriptionType(SubscriptionType.Shared)
.subscribe();
while (true) {
// 动态新增订阅
while (!queue.isEmpty()) {
queue.clear();
consumer.unsubscribe();
consumer = client.newConsumer()
.topics(new ArrayList<>(getSubscribes()))
.subscriptionName(getGroupid())
.subscriptionType(SubscriptionType.Shared)
.subscribe();
}
// Wait for a message
Message msg = consumer.receive(10, TimeUnit.SECONDS);
if (msg == null) {
continue;
}
String topic = msg.getTopicName().replace("persistent://public/default/", "");
long offset = 0;
String value = new String(msg.getData());
try {
accept(topic, value);
consumer.acknowledge(msg); // Acknowledge the message so that it can be deleted by the message broker
} catch (Exception e) {
logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e);
consumer.negativeAcknowledge(msg); // Message failed to process, redeliver later
}
}
} catch (PulsarClientException e) {
e.printStackTrace();
}
}).start();
}
@Override
public void destroy(AnyValue config) {
}
}

View File

@ -0,0 +1,82 @@
package com.zdemo.pulsar;
import com.zdemo.Event;
import com.zdemo.IProducer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.Comment;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
public class PulsarProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.pulsar.serviceurl")
private String serviceurl = "pulsar://127.0.0.1:6650";
@Comment("消息生产者")
private Map<String, Producer<byte[]>> producerMap = new HashMap();
private PulsarClient client;
@Override
public void init(AnyValue config) {
try {
client = PulsarClient.builder()
.serviceUrl(serviceurl)
.build();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
public Producer<byte[]> getProducer(String topic) {
Producer<byte[]> producer = producerMap.get(topic);
if (producer != null) {
return producer;
}
synchronized (this) {
if ((producer = producerMap.get(topic)) == null) {
try {
producer = client.newProducer()
.topic(topic)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
producerMap.put(topic, producer);
return producer;
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
return producer;
}
@Override
public void send(T t) {
try {
Producer<byte[]> producer = getProducer(t.topic);
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
producer.newMessage()
.key("")
.value(v.getBytes())
.send();
} catch (Exception e) {
logger.log(Level.WARNING, "", e);
}
}
}

View File

@ -2,7 +2,8 @@ package com.zdemo.test;
import com.zdemo.Event; import com.zdemo.Event;
import com.zdemo.EventType; import com.zdemo.EventType;
import com.zdemo.kafak.KafakProducer; import com.zdemo.IProducer;
import com.zdemo.pulsar.PulsarProducer;
import org.junit.Test; import org.junit.Test;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
@ -28,17 +29,31 @@ public class AppTest {
EventType.of("a1", new TypeToken<Float>() { EventType.of("a1", new TypeToken<Float>() {
}, r -> { }, r -> {
System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r)); System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r));
}),
EventType.of("bx", str -> {
System.out.println("我收到了消息 主题bx 事件:" + str);
}) })
, EventType.of("game-update", str -> { , EventType.of("game-update", str -> {
System.out.println("我收到了消息 主题game-update 事件:" + str); System.out.println("我收到了消息 主题game-update 事件:" + str);
}) })
, EventType.of("http.req.hello", str -> {
System.out.println("我收到了消息 主题http.req.hello 事件:" + str);
})
, EventType.of("http.resp.node2004", str -> {
System.out.println("我收到了消息 主题http.resp.node2004 事件:" + str);
})
); );
// 10s 后加入 bx主题
Thread.sleep(1_000 * 10);
System.out.println("加入新的主题订阅");
consumer.addEventType(
EventType.of("bx", str -> {
System.out.println("我收到了消息 主题bx 事件:" + str);
})
);
Thread.sleep(60_000 * 60); Thread.sleep(60_000 * 60);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@ -48,7 +63,7 @@ public class AppTest {
@Test @Test
public void runProducer() { public void runProducer() {
try { try {
KafakProducer producer = Application.singleton(KafakProducer.class); IProducer producer = Application.singleton(PulsarProducer.class);
// 发送不同的 事件 // 发送不同的 事件
float v0 = 1f; float v0 = 1f;
@ -60,12 +75,13 @@ public class AppTest {
producer.send(Event.of("c1", v2));*/ producer.send(Event.of("c1", v2));*/
producer.send(Event.of("game-update", 23256)); producer.send(Event.of("game-update", 23256));
producer.send(Event.of("bx", 23256));
try { /*try {
Thread.sleep(10_000); Thread.sleep(1_000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }*/
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -1,8 +1,8 @@
package com.zdemo.test; package com.zdemo.test;
import com.zdemo.kafak.KafakConsumer; import com.zdemo.pulsar.PulsarConsumer;
public class MyConsumer extends KafakConsumer { public class MyConsumer extends PulsarConsumer {
public String getGroupid() { public String getGroupid() {
return "group-test"; //消费组名称 return "group-test"; //消费组名称