新增:pulsar 消费订阅实现
This commit is contained in:
@@ -2,9 +2,9 @@ package com.zdemo;
|
||||
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author Liang
|
||||
@@ -33,7 +33,7 @@ public abstract class AbstractConsumer implements IConsumer {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Collection<String> getSubscribes() {
|
||||
public final Set<String> getSubscribes() {
|
||||
return eventMap.keySet();
|
||||
}
|
||||
|
||||
|
93
src/com/zdemo/pulsar/PulsarConsumer.java
Normal file
93
src/com/zdemo/pulsar/PulsarConsumer.java
Normal file
@@ -0,0 +1,93 @@
|
||||
package com.zdemo.pulsar;
|
||||
|
||||
import com.zdemo.AbstractConsumer;
|
||||
import com.zdemo.EventType;
|
||||
import com.zdemo.IConsumer;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
|
||||
public abstract class PulsarConsumer extends AbstractConsumer implements IConsumer, Service {
|
||||
|
||||
@Resource(name = "property.pulsar.serviceurl")
|
||||
private String serviceurl = "pulsar://127.0.0.1:6650";
|
||||
private PulsarClient client;
|
||||
private Consumer consumer;
|
||||
|
||||
public abstract String getGroupid();
|
||||
|
||||
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>();
|
||||
|
||||
@Override
|
||||
public void addEventType(EventType... eventTypes) {
|
||||
super.addEventType(eventTypes);
|
||||
try {
|
||||
for (EventType eventType : eventTypes) {
|
||||
queue.put(eventType);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.log(Level.WARNING, "", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(AnyValue config) {
|
||||
new Thread(() -> {
|
||||
try {
|
||||
client = PulsarClient.builder()
|
||||
.serviceUrl(serviceurl)
|
||||
.build();
|
||||
|
||||
consumer = client.newConsumer()
|
||||
.topics(new ArrayList<>(getSubscribes()))
|
||||
.subscriptionName(getGroupid())
|
||||
.subscriptionType(SubscriptionType.Shared)
|
||||
.subscribe();
|
||||
|
||||
while (true) {
|
||||
// 动态新增订阅
|
||||
while (!queue.isEmpty()) {
|
||||
queue.clear();
|
||||
consumer.unsubscribe();
|
||||
consumer = client.newConsumer()
|
||||
.topics(new ArrayList<>(getSubscribes()))
|
||||
.subscriptionName(getGroupid())
|
||||
.subscriptionType(SubscriptionType.Shared)
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
// Wait for a message
|
||||
Message msg = consumer.receive(10, TimeUnit.SECONDS);
|
||||
if (msg == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String topic = msg.getTopicName().replace("persistent://public/default/", "");
|
||||
long offset = 0;
|
||||
String value = new String(msg.getData());
|
||||
try {
|
||||
accept(topic, value);
|
||||
|
||||
consumer.acknowledge(msg); // Acknowledge the message so that it can be deleted by the message broker
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e);
|
||||
consumer.negativeAcknowledge(msg); // Message failed to process, redeliver later
|
||||
}
|
||||
}
|
||||
} catch (PulsarClientException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy(AnyValue config) {
|
||||
|
||||
}
|
||||
}
|
82
src/com/zdemo/pulsar/PulsarProducer.java
Normal file
82
src/com/zdemo/pulsar/PulsarProducer.java
Normal file
@@ -0,0 +1,82 @@
|
||||
package com.zdemo.pulsar;
|
||||
|
||||
import com.zdemo.Event;
|
||||
import com.zdemo.IProducer;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
import org.apache.pulsar.client.api.PulsarClientException;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.Comment;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
|
||||
public class PulsarProducer<T extends Event> implements IProducer<T>, Service {
|
||||
|
||||
@Resource(name = "property.pulsar.serviceurl")
|
||||
private String serviceurl = "pulsar://127.0.0.1:6650";
|
||||
|
||||
@Comment("消息生产者")
|
||||
private Map<String, Producer<byte[]>> producerMap = new HashMap();
|
||||
private PulsarClient client;
|
||||
|
||||
@Override
|
||||
public void init(AnyValue config) {
|
||||
try {
|
||||
client = PulsarClient.builder()
|
||||
.serviceUrl(serviceurl)
|
||||
.build();
|
||||
} catch (PulsarClientException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public Producer<byte[]> getProducer(String topic) {
|
||||
Producer<byte[]> producer = producerMap.get(topic);
|
||||
if (producer != null) {
|
||||
return producer;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if ((producer = producerMap.get(topic)) == null) {
|
||||
try {
|
||||
producer = client.newProducer()
|
||||
.topic(topic)
|
||||
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
|
||||
.sendTimeout(10, TimeUnit.SECONDS)
|
||||
.blockIfQueueFull(true)
|
||||
.create();
|
||||
producerMap.put(topic, producer);
|
||||
|
||||
return producer;
|
||||
} catch (PulsarClientException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
return producer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(T t) {
|
||||
try {
|
||||
Producer<byte[]> producer = getProducer(t.topic);
|
||||
|
||||
String v = JsonConvert.root().convertTo(t.value);
|
||||
if (v.startsWith("\"") && v.endsWith("\"")) {
|
||||
v = v.substring(1, v.length() - 1);
|
||||
}
|
||||
producer.newMessage()
|
||||
.key("")
|
||||
.value(v.getBytes())
|
||||
.send();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "", e);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user