新增:主题退订

This commit is contained in:
lxy 2020-12-08 19:31:15 +08:00
parent f06af4e31f
commit ddebb8c7d6
4 changed files with 42 additions and 31 deletions

View File

@ -18,4 +18,10 @@ public interface IConsumer<T extends Event> {
void addEventType(EventType... eventType);
void accept(String topic, String record);
/**
* 取消订阅
* @param topic
*/
void unsubscribe(String topic);
}

View File

@ -35,18 +35,14 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override
public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) {
queue.put(eventType);
}
} catch (InterruptedException e) {
logger.log(Level.WARNING, "", e);
}
// 增加变更标记
queue.add(() -> logger.info("KafakConsumer starting..."));
}
@Override
@ -58,8 +54,6 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
props = new Properties();
props.load(fis);
if (logger.isLoggable(Level.INFO)) logger.info(getGroupid() + " consumer started!");
new Thread(() -> {
try {
props.put("group.id", getGroupid());
@ -78,9 +72,12 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
}
});
// 动态新增订阅
while (!queue.isEmpty()) {
queue.clear();
if (!queue.isEmpty()) {
Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
}
consumer.unsubscribe();
consumer.subscribe(getTopics());
}
@ -95,5 +92,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
}
}
@Override
public void unsubscribe(String topic) {
queue.add(() -> eventMap.remove(topic)); // 加入延时执行队列下一次订阅变更检查周期执行
}
}

View File

@ -22,18 +22,14 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum
public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override
public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) {
queue.put(eventType);
}
} catch (InterruptedException e) {
logger.log(Level.WARNING, "", e);
}
// 增加变更标记
queue.add(() -> logger.info("PulsarConsumer add new topic!"));
}
@Override
@ -41,22 +37,21 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum
if (!preInit()) {
return;
}
queue.add(() -> logger.info("PulsarConsumer starting ..."));
new Thread(() -> {
try {
client = PulsarClient.builder()
.serviceUrl(serviceurl)
.build();
consumer = client.newConsumer()
.topics(new ArrayList<>(getTopics()))
.subscriptionName(getGroupid())
.subscriptionType(SubscriptionType.Shared)
.subscribe();
while (true) {
// 动态新增订阅
while (!queue.isEmpty()) {
queue.clear();
if (!queue.isEmpty()) {
Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
}
consumer.unsubscribe();
consumer = client.newConsumer()
.topics(new ArrayList<>(getTopics()))
@ -90,7 +85,7 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum
}
@Override
public void destroy(AnyValue config) {
public void unsubscribe(String topic) {
}
}

View File

@ -101,4 +101,14 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume
}
}
}
@Override
public void unsubscribe(String topic) {
try {
writer.write("UNSUBSCRIBE " + topic + "\r\n");
writer.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}