From bd4343c5d81e371310fe92dfadb78b248c6fbeab Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 9 Jun 2020 21:31:08 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageAgent.java | 41 ++-------------------------- 1 file changed, 3 insertions(+), 38 deletions(-) diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index c96103e0c..90473987b 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -44,10 +44,6 @@ public abstract class MessageAgent { protected SncpMessageClient sncpMessageClient; - //protected MessageConsumer sncpRespConsumer; - //protected SncpRespProcessor sncpRespProcessor; - //sncpRespConsumer启动耗时, 小于0表示未启动 - //protected long sncpRespStartms = -1; //本地Service消息接收处理器, key:topic protected HashMap messageNodes = new LinkedHashMap<>(); @@ -57,24 +53,8 @@ public abstract class MessageAgent { this.sncpMessageClient = new SncpMessageClient(this); } - //ServiceLoader时判断配置是否符合当前实现类 - public abstract boolean match(AnyValue config); - -// public final CompletableFuture createSncpRespFuture2(AtomicLong counter, MessageRecord message) { -// return this.sncpRespProcessor.createFuture2(message.getSeqid(), counter); -// } -// -// public final synchronized void startSncpRespConsumer() { -// if (this.sncpRespStartms >= 0) return; -// long s = System.currentTimeMillis(); -// if (this.sncpRespConsumer != null) { -// this.sncpRespConsumer.startup().join(); -// } -// this.sncpRespStartms = System.currentTimeMillis() - s; -// } public CompletableFuture> start() { final LinkedHashMap map = new LinkedHashMap<>(); - //if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms); final List futures = new ArrayList<>(); this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); @@ -94,7 +74,6 @@ public abstract class MessageAgent { //Application.shutdown 在所有server.shutdown执行后执行 public void destroy(AnyValue config) { - //if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join(); this.httpMessageClient.close().join(); this.sncpMessageClient.close().join(); if (this.producer != null) this.producer.shutdown().join(); @@ -156,26 +135,12 @@ public abstract class MessageAgent { //查询所有topic public abstract List queryTopic(); + //ServiceLoader时判断配置是否符合当前实现类 + public abstract boolean match(AnyValue config); + //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); -// public final synchronized void putSncpResp(NodeSncpServer ns) { -// if (this.sncpRespConsumer != null) return; -// this.sncpRespProcessor = new SncpRespProcessor(this.logger, this); -// this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); -// } -// -// public CompletableFuture sendRemoteSncp(AtomicLong counter, MessageRecord message) { -// if (this.sncpRespConsumer == null) { -// CompletableFuture future = new CompletableFuture(); -// future.completeExceptionally(new RuntimeException("Not open sncp consumer")); -// return future; -// } -// message.setFormat(ConvertType.BSON); -// message.setResptopic(generateSncpRespTopic()); -// getProducer().apply(message); -// return this.sncpRespProcessor.createFuture(message.getSeqid(), counter); -// } public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");