From a58c13cd9f25ee35b085801ac864d9bf3d28061a Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 9 Jun 2020 21:22:17 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 6 +- src/org/redkale/boot/NodeServer.java | 4 +- src/org/redkale/mq/HttpMessageClient.java | 107 +++++++++++++++++++ src/org/redkale/mq/MessageAgent.java | 123 ++++++++++++---------- src/org/redkale/mq/MessageClient.java | 93 ++++++++++++++++ src/org/redkale/mq/SncpMessageClient.java | 26 +++++ src/org/redkale/mq/SncpRespProcessor.java | 2 +- src/org/redkale/net/sncp/SncpClient.java | 5 +- 8 files changed, 303 insertions(+), 63 deletions(-) create mode 100644 src/org/redkale/mq/HttpMessageClient.java create mode 100644 src/org/redkale/mq/MessageClient.java create mode 100644 src/org/redkale/mq/SncpMessageClient.java diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index e385e8bb4..b72a7cf65 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -25,7 +25,7 @@ import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonFactory; import org.redkale.convert.json.*; -import org.redkale.mq.MessageAgent; +import org.redkale.mq.*; import org.redkale.net.*; import org.redkale.net.http.MimeType; import org.redkale.net.sncp.*; @@ -441,6 +441,8 @@ public final class Application { this.resourceFactory.inject(agent); agent.init(agent.getConfig()); this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); + this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient()); + this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); } } this.messageAgents = mqs; @@ -833,7 +835,7 @@ public final class Application { map.keySet().forEach(str -> { if (str.length() > maxlen.get()) maxlen.set(str.length()); }); - new TreeMap(map).forEach((topic, ms) -> sb.append("MessageConsumer(topic=").append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n") + new TreeMap<>(map).forEach((topic, ms) -> sb.append("MessageConsumer(topic=").append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n") ); } if (sb.length() > 0) logger.info(sb.toString().trim()); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index cf6ba1067..207fdc2c2 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -533,8 +533,8 @@ public abstract class NodeServer { } if (isSNCP() && !sncpRemoteAgents.isEmpty()) { sncpRemoteAgents.values().forEach(agent -> { - agent.putSncpResp((NodeSncpServer) this); - agent.startSncpRespConsumer(); + // agent.putSncpResp((NodeSncpServer) this); + // agent.startSncpRespConsumer(); }); } //----------------- init ----------------- diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java new file mode 100644 index 000000000..cd4067f2f --- /dev/null +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -0,0 +1,107 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import org.redkale.convert.ConvertType; +import org.redkale.net.http.*; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class HttpMessageClient extends MessageClient { + + protected HttpMessageClient(MessageAgent messageAgent) { + super(messageAgent); + this.respTopic = messageAgent.generateHttpRespTopic(); + } + + //格式: http.req.user + public String generateHttpReqTopic(String module) { + return messageAgent.generateHttpReqTopic(module); + } + + public String generateHttpReqTopic(HttpSimpleRequest request, String path) { + String module = request.getRequestURI(); + if (path != null && !path.isEmpty() && module.startsWith(path)) module = module.substring(path.length()); + module = module.substring(1); //去掉/ + module = module.substring(0, module.indexOf('/')); + return messageAgent.generateHttpReqTopic(module); + } + + public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request) { + return sendMessage(topic, ConvertType.JSON, 0, null, request, true, null); + } + + public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { + return sendMessage(topic, ConvertType.JSON, 0, null, request, true, counter); + } + + public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, boolean needresp) { + return sendMessage(topic, ConvertType.JSON, 0, null, request, needresp, null); + } + + public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { + return sendMessage(topic, ConvertType.JSON, 0, null, request, needresp, counter); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { + return sendMessage(topic, convertType, 0, null, request, true, null); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { + return sendMessage(topic, convertType, 0, null, request, true, counter); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, boolean needresp) { + return sendMessage(topic, convertType, 0, null, request, needresp, null); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { + return sendMessage(topic, convertType, 0, null, request, needresp, counter); + } + + public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { + return sendMessage(topic, ConvertType.JSON, userid, groupid, request, true, null); + } + + public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + return sendMessage(topic, ConvertType.JSON, userid, groupid, request, true, counter); + } + + public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, boolean needresp) { + return sendMessage(topic, ConvertType.JSON, userid, groupid, request, needresp, null); + } + + public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { + return sendMessage(topic, ConvertType.JSON, userid, groupid, request, needresp, counter); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { + return sendMessage(topic, convertType, userid, groupid, request, true, null); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + return sendMessage(topic, convertType, userid, groupid, request, true, counter); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, boolean needresp) { + return sendMessage(topic, convertType, userid, groupid, request, needresp, null); + } + + public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { + MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + message.userid(userid).groupid(groupid); + return sendMessage(message, needresp, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); + } +} diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 494fe3435..c96103e0c 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -7,12 +7,10 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import java.util.logging.Logger; import javax.annotation.Resource; import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NODEID; -import org.redkale.convert.ConvertType; import org.redkale.net.Servlet; import org.redkale.net.http.*; import org.redkale.net.sncp.*; @@ -42,41 +40,41 @@ public abstract class MessageAgent { protected MessageProducer producer; - protected String sncpRespTopic; + protected HttpMessageClient httpMessageClient; - protected MessageConsumer sncpRespConsumer; - - protected SncpRespProcessor sncpRespProcessor; + protected SncpMessageClient sncpMessageClient; + //protected MessageConsumer sncpRespConsumer; + //protected SncpRespProcessor sncpRespProcessor; //sncpRespConsumer启动耗时, 小于0表示未启动 - protected long sncpRespStartms = -1; - + //protected long sncpRespStartms = -1; //本地Service消息接收处理器, key:topic - protected HashMap messageNodes = new LinkedHashMap<>(); + protected HashMap messageNodes = new LinkedHashMap<>(); public void init(AnyValue config) { this.name = checkName(config.getValue("name", "")); + this.httpMessageClient = new HttpMessageClient(this); + this.sncpMessageClient = new SncpMessageClient(this); } //ServiceLoader时判断配置是否符合当前实现类 public abstract boolean match(AnyValue config); - public final CompletableFuture createSncpRespFuture(AtomicLong counter, MessageRecord message) { - return this.sncpRespProcessor.createFuture(message.getSeqid(), counter); - } - - public final synchronized void startSncpRespConsumer() { - if (this.sncpRespStartms >= 0) return; - long s = System.currentTimeMillis(); - if (this.sncpRespConsumer != null) { - this.sncpRespConsumer.startup().join(); - } - this.sncpRespStartms = System.currentTimeMillis() - s; - } - +// public final CompletableFuture createSncpRespFuture2(AtomicLong counter, MessageRecord message) { +// return this.sncpRespProcessor.createFuture2(message.getSeqid(), counter); +// } +// +// public final synchronized void startSncpRespConsumer() { +// if (this.sncpRespStartms >= 0) return; +// long s = System.currentTimeMillis(); +// if (this.sncpRespConsumer != null) { +// this.sncpRespConsumer.startup().join(); +// } +// this.sncpRespStartms = System.currentTimeMillis() - s; +// } public CompletableFuture> start() { final LinkedHashMap map = new LinkedHashMap<>(); - if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms); + //if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms); final List futures = new ArrayList<>(); this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); @@ -96,7 +94,9 @@ public abstract class MessageAgent { //Application.shutdown 在所有server.shutdown执行后执行 public void destroy(AnyValue config) { - if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join(); + //if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join(); + this.httpMessageClient.close().join(); + this.sncpMessageClient.close().join(); if (this.producer != null) this.producer.shutdown().join(); } @@ -112,6 +112,14 @@ public abstract class MessageAgent { this.config = config; } + public HttpMessageClient getHttpMessageClient() { + return httpMessageClient; + } + + public SncpMessageClient getSncpMessageClient() { + return sncpMessageClient; + } + protected String checkName(String name) { //不能含特殊字符 if (name.isEmpty()) return name; if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9"); @@ -124,10 +132,14 @@ public abstract class MessageAgent { } //获取指定topic的生产处理器 - public synchronized MessageProducer getProducer() { + public MessageProducer getProducer() { if (this.producer == null) { - this.producer = createProducer(); - this.producer.startup().join(); + synchronized (this) { + if (this.producer == null) { + this.producer = createProducer(); + this.producer.startup().join(); + } + } } return this.producer; } @@ -147,36 +159,35 @@ public abstract class MessageAgent { //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); - public final synchronized void putSncpResp(NodeSncpServer ns) { - if (this.sncpRespConsumer != null) return; - this.sncpRespProcessor = new SncpRespProcessor(this.logger, this); - this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); - } - - public CompletableFuture sendRemoteSncp(AtomicLong counter, MessageRecord message) { - if (this.sncpRespConsumer == null) { - CompletableFuture future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Not open sncp consumer")); - return future; - } - message.setFormat(ConvertType.BSON); - message.setResptopic(generateSncpRespTopic()); - getProducer().apply(message); - return this.sncpRespProcessor.createFuture(message.getSeqid(), counter); - } - +// public final synchronized void putSncpResp(NodeSncpServer ns) { +// if (this.sncpRespConsumer != null) return; +// this.sncpRespProcessor = new SncpRespProcessor(this.logger, this); +// this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); +// } +// +// public CompletableFuture sendRemoteSncp(AtomicLong counter, MessageRecord message) { +// if (this.sncpRespConsumer == null) { +// CompletableFuture future = new CompletableFuture(); +// future.completeExceptionally(new RuntimeException("Not open sncp consumer")); +// return future; +// } +// message.setFormat(ConvertType.BSON); +// message.setResptopic(generateSncpRespTopic()); +// getProducer().apply(message); +// return this.sncpRespProcessor.createFuture(message.getSeqid(), counter); +// } public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet); - this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); + this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor))); } public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { String topic = generateSncpReqTopic(service); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet); - this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); + this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor))); } //格式: sncp.req.user @@ -186,20 +197,18 @@ public abstract class MessageAgent { return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } - //格式: sncp.resp.node10 - private String generateSncpRespTopic() { - if (this.sncpRespTopic != null) return this.sncpRespTopic; - this.sncpRespTopic = "sncp.resp.node" + nodeid; - return this.sncpRespTopic; - } - //格式: http.req.user public String generateHttpReqTopic(String module) { return "http.req." + module.toLowerCase(); } + //格式: sncp.resp.node10 + protected String generateSncpRespTopic() { + return "sncp.resp.node" + nodeid; + } + //格式: http.resp.node10 - public String generateHttpRespTopic() { + protected String generateHttpRespTopic() { return "http.resp.node" + nodeid; } @@ -219,7 +228,7 @@ public abstract class MessageAgent { return protocol + ".resp.node" + nodeid; } - protected static class MessageNode { + protected static class MessageConsumerNode { public final NodeServer server; @@ -231,7 +240,7 @@ public abstract class MessageAgent { public final MessageConsumer consumer; - public MessageNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) { + public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) { this.server = server; this.service = service; this.servlet = servlet; diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java new file mode 100644 index 000000000..38a4056de --- /dev/null +++ b/src/org/redkale/mq/MessageClient.java @@ -0,0 +1,93 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import org.redkale.convert.ConvertType; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class MessageClient { + + protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + + protected final MessageAgent messageAgent; + + protected MessageConsumer consumer; + + protected String respTopic; + + protected ConvertType convertType; + + protected MessageClient(MessageAgent messageAgent) { + this.messageAgent = messageAgent; + } + + protected CompletableFuture close() { + if (this.consumer == null) return CompletableFuture.completedFuture(null); + return this.consumer.shutdown(); + } + + public String getRespTopic() { + return this.respTopic; + } + + public final CompletableFuture sendMessage(MessageRecord message) { + return sendMessage(message, true, null); + } + + public final CompletableFuture sendMessage(MessageRecord message, AtomicLong counter) { + return sendMessage(message, true, counter); + } + + public final CompletableFuture sendMessage(MessageRecord message, boolean needresp) { + return sendMessage(message, needresp, null); + } + + public final CompletableFuture sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) { + CompletableFuture future = new CompletableFuture<>(); + try { + if (this.consumer == null) { + synchronized (this) { + if (this.consumer == null) { + MessageProcessor processor = msg -> { + MessageRespFutureNode node = respNodes.get(msg.getSeqid()); + if (node == null) { + messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error"); + return; + } + if (node.getCounter() != null) node.getCounter().decrementAndGet(); + node.future.complete(msg); + }; + this.consumer = messageAgent.createConsumer(respTopic, processor); + this.consumer.startup().join(); + } + } + } + if (convertType != null) message.setFormat(convertType); + if (needresp && message.getResptopic() == null) message.setResptopic(respTopic); + messageAgent.getProducer().apply(message); + if (counter != null) counter.incrementAndGet(); + if (needresp) { + MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), counter, future); + respNodes.put(message.getSeqid(), node); + } + } catch (Exception ex) { + future.completeExceptionally(ex); + } finally { + return future; + } + } +} diff --git a/src/org/redkale/mq/SncpMessageClient.java b/src/org/redkale/mq/SncpMessageClient.java new file mode 100644 index 000000000..fe84cd09f --- /dev/null +++ b/src/org/redkale/mq/SncpMessageClient.java @@ -0,0 +1,26 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import org.redkale.convert.ConvertType; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class SncpMessageClient extends MessageClient { + + protected SncpMessageClient(MessageAgent messageAgent) { + super(messageAgent); + this.respTopic = messageAgent.generateSncpRespTopic(); + this.convertType = ConvertType.BSON; + } +} diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java index 271d563df..ddf51a59e 100644 --- a/src/org/redkale/mq/SncpRespProcessor.java +++ b/src/org/redkale/mq/SncpRespProcessor.java @@ -43,7 +43,7 @@ public class SncpRespProcessor implements MessageProcessor { node.future.complete(message); } - public CompletableFuture createFuture(long seqid, AtomicLong counter) { + public CompletableFuture createFuture2(long seqid, AtomicLong counter) { CompletableFuture future = new CompletableFuture<>(); MessageRespFutureNode node = new MessageRespFutureNode(seqid, counter, future); respNodes.put(seqid, node); diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 44adbb41f..6e176eae8 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -60,6 +60,8 @@ public final class SncpClient { protected final MessageAgent messageAgent; + protected final SncpMessageClient messageClient; + protected final String topic; protected final Supplier bufferSupplier; @@ -82,6 +84,7 @@ public final class SncpClient { this.executor = factory.getExecutor(); this.bufferSupplier = factory.getBufferSupplier(); this.messageAgent = messageAgent; + this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service); Class tn = serviceTypeOrImplClass; Version ver = tn.getAnnotation(Version.class); @@ -284,7 +287,7 @@ public final class SncpClient { String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; if (targetTopic == null) targetTopic = this.topic; MessageRecord message = new MessageRecord(ConvertType.BSON, targetTopic, null, reqbytes); - return messageAgent.sendRemoteSncp(null, message).thenApply(msg -> { + return messageClient.sendMessage(message).thenApply(msg -> { ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); checkResult(seqid, action, buffer);