From 98ff3cdfcfa5dce4394367b38fe0b0961539e443 Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 13 Oct 2023 21:37:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 42 +- .../java/org/redkale/boot/NodeServer.java | 3 +- .../org/redkale/cluster/ClusterRpcClient.java | 36 ++ .../HttpClusterRpcClient.java} | 492 +++++++++--------- .../HttpLocalRpcClient.java} | 33 +- .../HttpRpcClient.java} | 161 +++--- .../mq/HttpMessageClientProcessor.java | 140 ----- .../org/redkale/mq/HttpMessageRequest.java | 7 + .../org/redkale/mq/HttpMessageResponse.java | 74 +-- .../org/redkale/mq/HttpMessageServlet.java | 52 ++ .../org/redkale/mq/HttpRpcMessageClient.java | 49 ++ .../java/org/redkale/mq/MessageAgent.java | 273 ++++------ .../java/org/redkale/mq/MessageClient.java | 254 ++++----- .../org/redkale/mq/MessageClientConsumer.java | 41 +- .../redkale/mq/MessageClientProcessor.java | 27 - .../java/org/redkale/mq/MessageConsumer.java | 2 +- .../java/org/redkale/mq/MessageProcessor.java | 13 + .../java/org/redkale/mq/MessageRecord.java | 14 +- .../redkale/mq/MessageRecordSerializer.java | 8 +- ...FutureNode.java => MessageRespFuture.java} | 25 +- .../org/redkale/mq/MessageRespProcessor.java | 54 ++ .../java/org/redkale/mq/MessageServlet.java | 102 ++++ .../org/redkale/mq/SncpMessageClient.java | 52 -- .../mq/SncpMessageClientProcessor.java | 124 ----- .../org/redkale/mq/SncpMessageResponse.java | 17 +- .../org/redkale/mq/SncpMessageServlet.java | 49 ++ src/main/java/org/redkale/net/Context.java | 15 +- src/main/java/org/redkale/net/Server.java | 1 + .../java/org/redkale/net/http/HttpServer.java | 2 +- src/main/java/org/redkale/net/http/Rest.java | 24 + .../org/redkale/net/http/WebSocketNode.java | 9 +- src/main/java/org/redkale/net/sncp/Sncp.java | 22 + .../java/org/redkale/net/sncp/SncpClient.java | 9 +- .../org/redkale/net/sncp/SncpRemoteInfo.java | 6 +- .../redkale/test/service/ABMainService.java | 4 +- .../test/sncp/SncpClientCodecTest.java | 2 +- .../test/sncp/SncpRequestParseTest.java | 2 +- .../org/redkale/test/sncp/SncpSleepTest.java | 2 +- .../java/org/redkale/test/sncp/SncpTest.java | 2 +- .../test/sncp/SncpTestServiceImpl.java | 2 +- 40 files changed, 1092 insertions(+), 1154 deletions(-) create mode 100644 src/main/java/org/redkale/cluster/ClusterRpcClient.java rename src/main/java/org/redkale/{mq/HttpMessageClusterClient.java => cluster/HttpClusterRpcClient.java} (90%) rename src/main/java/org/redkale/{mq/HttpMessageLocalClient.java => cluster/HttpLocalRpcClient.java} (88%) rename src/main/java/org/redkale/{mq/HttpMessageClient.java => cluster/HttpRpcClient.java} (55%) delete mode 100644 src/main/java/org/redkale/mq/HttpMessageClientProcessor.java create mode 100644 src/main/java/org/redkale/mq/HttpMessageServlet.java create mode 100644 src/main/java/org/redkale/mq/HttpRpcMessageClient.java delete mode 100644 src/main/java/org/redkale/mq/MessageClientProcessor.java create mode 100644 src/main/java/org/redkale/mq/MessageProcessor.java rename src/main/java/org/redkale/mq/{MessageRespFutureNode.java => MessageRespFuture.java} (59%) create mode 100644 src/main/java/org/redkale/mq/MessageRespProcessor.java create mode 100644 src/main/java/org/redkale/mq/MessageServlet.java delete mode 100644 src/main/java/org/redkale/mq/SncpMessageClient.java delete mode 100644 src/main/java/org/redkale/mq/SncpMessageClientProcessor.java create mode 100644 src/main/java/org/redkale/mq/SncpMessageServlet.java diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 06d81bd56..2176b4042 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -1171,10 +1171,6 @@ public final class Application { this.resourceFactory.inject(agent); agent.init(agent.getConfig()); this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); - if (this.clusterAgent == null) { - this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient()); - //this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); //不需要给开发者使用 - } } logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); } @@ -1207,29 +1203,39 @@ public final class Application { return ResourceProducer.class; } }); - //------------------------------------ 注册 HttpMessageClient ------------------------------------ + //------------------------------------ 注册 HttpRpcClient ------------------------------------ resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { try { if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) { return null; } - if (clusterAgent == null) { - HttpMessageClient messageClient = new HttpMessageLocalClient(application, resourceName); - field.set(srcObj, messageClient); - rf.inject(resourceName, messageClient, null); // 给其可能包含@Resource的字段赋值; - rf.register(resourceName, HttpMessageClient.class, messageClient); - return messageClient; + if (this.messageAgents != null) { + MessageAgent messageAgent = this.resourceFactory.find(resourceName, MessageAgent.class); + if (messageAgent != null) { + HttpRpcClient rpcClient = messageAgent.getHttpRpcClient(); + field.set(srcObj, rpcClient); + rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值; + rf.register(resourceName, HttpRpcClient.class, rpcClient); + return rpcClient; + } } - HttpMessageClient messageClient = new HttpMessageClusterClient(application, resourceName, clusterAgent); - field.set(srcObj, messageClient); - rf.inject(resourceName, messageClient, null); // 给其可能包含@Resource的字段赋值; - rf.register(resourceName, HttpMessageClient.class, messageClient); - return messageClient; + if (clusterAgent == null) { + HttpRpcClient rpcClient = new HttpLocalRpcClient(application, resourceName); + field.set(srcObj, rpcClient); + rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值; + rf.register(resourceName, HttpRpcClient.class, rpcClient); + return rpcClient; + } + HttpRpcClient rpcClient = new HttpClusterRpcClient(application, resourceName, clusterAgent); + field.set(srcObj, rpcClient); + rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值; + rf.register(resourceName, HttpRpcClient.class, rpcClient); + return rpcClient; } catch (Exception e) { - logger.log(Level.SEVERE, "HttpMessageClient inject error", e); + logger.log(Level.SEVERE, HttpRpcClient.class.getSimpleName() + " inject error", e); return null; } - }, HttpMessageClient.class); + }, HttpRpcClient.class); initResources(); } diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 3293b7dd5..828e8c6cf 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -167,7 +167,8 @@ public abstract class NodeServer { server.init(this.serverConf); if (this.sncpAddress != null) { //初始化SncpClient this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); - this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); + this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, application.getNodeid(), + this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); } initResource(); //给DataSource、CacheSource注册依赖注入时的监听回调事件。 diff --git a/src/main/java/org/redkale/cluster/ClusterRpcClient.java b/src/main/java/org/redkale/cluster/ClusterRpcClient.java new file mode 100644 index 000000000..f12413d09 --- /dev/null +++ b/src/main/java/org/redkale/cluster/ClusterRpcClient.java @@ -0,0 +1,36 @@ +/* + * + */ +package org.redkale.cluster; + +import java.util.concurrent.CompletableFuture; + +/** + * cluster模式下的rpc client + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public interface ClusterRpcClient { + + /** + * 发送消息,需要响应 + * + * @param message 消息体 + * + * @return 应答消息 + */ + public CompletableFuture

sendMessage(final R message); + + /** + * 发送消息,无需响应 + * + * @param message 消息体 + */ + public void produceMessage(R message); + +} diff --git a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java similarity index 90% rename from src/main/java/org/redkale/mq/HttpMessageClusterClient.java rename to src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index 83bb7a117..8e8d8ac4f 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -1,246 +1,246 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.io.Serializable; -import java.net.*; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.LongAdder; -import java.util.logging.Level; -import org.redkale.annotation.Resource; -import org.redkale.boot.Application; -import org.redkale.cluster.ClusterAgent; -import org.redkale.net.http.*; -import org.redkale.util.Utility; - -/** - * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class HttpMessageClusterClient extends HttpMessageClient { - - //jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET - private static final Set DISALLOWED_HEADERS_SET = Utility.ofSet("connection", "content-length", - "date", "expect", "from", "host", "origin", "referer", "upgrade", "via", "warning"); - - protected final HttpMessageLocalClient localClient; - - protected final ConcurrentHashMap topicServletMap = new ConcurrentHashMap<>(); - - protected ClusterAgent clusterAgent; - - @Resource(name = "cluster.httpClient", required = false) - protected java.net.http.HttpClient httpClient; - - @Resource(name = "cluster.httpClient", required = false) - protected HttpSimpleClient httpSimpleClient; - - public HttpMessageClusterClient(Application application, String resourceName, ClusterAgent clusterAgent) { - super(null); - Objects.requireNonNull(clusterAgent); - this.localClient = new HttpMessageLocalClient(application, resourceName); - this.clusterAgent = clusterAgent; - } - - @Override - protected CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { - return localClient.sendMessage(topic, userid, groupid, request, counter); - } else { - return httpAsync(false, userid, request); - } - } - - @Override - protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { - localClient.produceMessage(topic, userid, groupid, request, counter); - } else { - httpAsync(true, userid, request); - } - } - - private CompletableFuture> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { - String module = req.getRequestURI(); - module = module.substring(1); //去掉/ - module = module.substring(0, module.indexOf('/')); - Map headers = req.getHeaders(); - String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); - final String localModule = module; - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); - } - return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { - if (addrs == null || addrs.isEmpty()) { - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty"); - } - return new HttpResult().status(404).toFuture(); - } - final Map clientHeaders = new LinkedHashMap<>(); - byte[] clientBody = null; - if (req.isRpc()) { - clientHeaders.put(Rest.REST_HEADER_RPC, "true"); - } - if (req.isFrombody()) { - clientHeaders.put(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); - } - if (req.getReqConvertType() != null) { - clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); - } - if (req.getRespConvertType() != null) { - clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); - } - if (userid != null) { - clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid); - } - if (headers != null) { - boolean ws = headers.containsKey("Sec-WebSocket-Key"); - headers.forEach((n, v) -> { - if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase()) - && (!ws || (!"Connection".equals(n) && !"Sec-WebSocket-Key".equals(n) - && !"Sec-WebSocket-Version".equals(n)))) { - clientHeaders.put(n, v); - } - }); - } - clientHeaders.put("Content-Type", "x-www-form-urlencoded"); - if (req.getBody() != null && req.getBody().length > 0) { - String paramstr = req.getParametersToString(); - if (paramstr != null) { - if (req.getRequestURI().indexOf('?') > 0) { - req.setRequestURI(req.getRequestURI() + "&" + paramstr); - } else { - req.setRequestURI(req.getRequestURI() + "?" + paramstr); - } - } - clientBody = req.getBody(); - } else { - String paramstr = req.getParametersToString(); - if (paramstr != null) { - clientBody = paramstr.getBytes(StandardCharsets.UTF_8); - } - } - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); - } - return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, - (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), - clientHeaders, clientBody, addrs.iterator()); - }); - } - - private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, - HttpSimpleRequest req, String requesturi, final Map clientHeaders, byte[] clientBody, Iterator it) { - if (!it.hasNext()) { - return CompletableFuture.completedFuture(null); - } - InetSocketAddress addr = it.next(); - String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; - if (finest) { - logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); - } - if (httpSimpleClient != null) { - return httpSimpleClient.postAsync(url, clientHeaders, clientBody); - } - java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().uri(URI.create(url)) - .timeout(Duration.ofMillis(10_000)) - //存在sendHeader后不发送body数据的问题, java.net.http.HttpRequest的bug? - .method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody)); - if (clientHeaders != null) { - clientHeaders.forEach((n, v) -> builder.header(n, v)); - } - return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) - .thenApply((java.net.http.HttpResponse resp) -> { - final int rs = resp.statusCode(); - if (rs != 200) { - return new HttpResult().status(rs); - } - return new HttpResult(resp.body()); - }); - } - -// -// private CompletableFuture> httpAsync(Serializable userid, HttpSimpleRequest req) { -// final boolean finest = logger.isLoggable(Level.FINEST); -// String module = req.getRequestURI(); -// module = module.substring(1); //去掉/ -// module = module.substring(0, module.indexOf('/')); -// Map headers = req.getHeaders(); -// String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); -// return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { -// if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); -// java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); -// if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); -// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); -// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); -// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); -// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid); -// if (headers != null) headers.forEach((n, v) -> { -// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); -// }); -// builder.header("Content-Type", "x-www-form-urlencoded"); -// if (req.getBody() != null && req.getBody().length > 0) { -// String paramstr = req.getParametersToString(); -// if (paramstr != null) { -// if (req.getRequestURI().indexOf('?') > 0) { -// req.setRequestURI(req.getRequestURI() + "&" + paramstr); -// } else { -// req.setRequestURI(req.getRequestURI() + "?" + paramstr); -// } -// } -// builder.POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(req.getBody())); -// } else { -// String paramstr = req.getParametersToString(); -// if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); -// } -// return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator()); -// }); -// } -// -// private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator it) { -// if (!it.hasNext()) return CompletableFuture.completedFuture(null); -// InetSocketAddress addr = it.next(); -// String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; -// return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> { -// if (resp.statusCode() != 200) return forEachCollectionFuture(finest, userid, req, requesturi, builder, it); -// HttpResult rs = new HttpResult(); -// java.net.http.HttpHeaders hs = resp.headers(); -// if (hs != null) { -// Map> hm = hs.map(); -// if (hm != null) { -// for (Map.Entry> en : hm.entrySet()) { -// if ("date".equals(en.getKey()) || "content-type".equals(en.getKey()) -// || "server".equals(en.getKey()) || "connection".equals(en.getKey())) continue; -// List val = en.getValue(); -// if (val != null && val.size() == 1) { -// rs.header(en.getKey(), val.get(0)); -// } -// } -// } -// } -// rs.setResult(resp.body()); -// if (finest) { -// StringBuilder sb = new StringBuilder(); -// Map params = req.getParams(); -// if (params != null && !params.isEmpty()) { -// params.forEach((n, v) -> sb.append('&').append(n).append('=').append(v)); -// } -// logger.log(Level.FINEST, url + "?userid=" + userid + sb + ", result = " + new String(resp.body(), StandardCharsets.UTF_8)); -// } -// return CompletableFuture.completedFuture(rs); -// }); -// } -} +package org.redkale.cluster; + +import java.io.Serializable; +import java.net.*; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.redkale.annotation.Resource; +import org.redkale.boot.Application; +import org.redkale.net.http.*; +import org.redkale.util.Utility; + +/** + * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class HttpClusterRpcClient extends HttpRpcClient { + + //jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET + private static final Set DISALLOWED_HEADERS_SET = Utility.ofSet("connection", "content-length", + "date", "expect", "from", "host", "origin", "referer", "upgrade", "via", "warning"); + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected final HttpLocalRpcClient localClient; + + protected final ConcurrentHashMap topicServletMap = new ConcurrentHashMap<>(); + + protected ClusterAgent clusterAgent; + + @Resource(name = "cluster.httpClient", required = false) + protected java.net.http.HttpClient httpClient; + + @Resource(name = "cluster.httpClient", required = false) + protected HttpSimpleClient httpSimpleClient; + + public HttpClusterRpcClient(Application application, String resourceName, ClusterAgent clusterAgent) { + Objects.requireNonNull(clusterAgent); + this.localClient = new HttpLocalRpcClient(application, resourceName); + this.clusterAgent = clusterAgent; + } + + @Override + protected int getNodeid() { + return localClient.getNodeid(); + } + + @Override + public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { + return localClient.sendMessage(topic, userid, groupid, request); + } else { + return httpAsync(false, userid, request); + } + } + + @Override + public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { + localClient.produceMessage(topic, userid, groupid, request); + } else { + httpAsync(true, userid, request); + } + } + + private CompletableFuture> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { + String module = req.getRequestURI(); + module = module.substring(1); //去掉/ + module = module.substring(0, module.indexOf('/')); + Map headers = req.getHeaders(); + String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); + final String localModule = module; + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); + } + return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { + if (addrs == null || addrs.isEmpty()) { + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty"); + } + return new HttpResult().status(404).toFuture(); + } + final Map clientHeaders = new LinkedHashMap<>(); + byte[] clientBody = null; + if (req.isRpc()) { + clientHeaders.put(Rest.REST_HEADER_RPC, "true"); + } + if (req.isFrombody()) { + clientHeaders.put(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); + } + if (req.getReqConvertType() != null) { + clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); + } + if (req.getRespConvertType() != null) { + clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); + } + if (userid != null) { + clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid); + } + if (headers != null) { + boolean ws = headers.containsKey("Sec-WebSocket-Key"); + headers.forEach((n, v) -> { + if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase()) + && (!ws || (!"Connection".equals(n) && !"Sec-WebSocket-Key".equals(n) + && !"Sec-WebSocket-Version".equals(n)))) { + clientHeaders.put(n, v); + } + }); + } + clientHeaders.put("Content-Type", "x-www-form-urlencoded"); + if (req.getBody() != null && req.getBody().length > 0) { + String paramstr = req.getParametersToString(); + if (paramstr != null) { + if (req.getRequestURI().indexOf('?') > 0) { + req.setRequestURI(req.getRequestURI() + "&" + paramstr); + } else { + req.setRequestURI(req.getRequestURI() + "?" + paramstr); + } + } + clientBody = req.getBody(); + } else { + String paramstr = req.getParametersToString(); + if (paramstr != null) { + clientBody = paramstr.getBytes(StandardCharsets.UTF_8); + } + } + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); + } + return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, + (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), + clientHeaders, clientBody, addrs.iterator()); + }); + } + + private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, + HttpSimpleRequest req, String requesturi, final Map clientHeaders, byte[] clientBody, Iterator it) { + if (!it.hasNext()) { + return CompletableFuture.completedFuture(null); + } + InetSocketAddress addr = it.next(); + String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; + if (finest) { + logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); + } + if (httpSimpleClient != null) { + return httpSimpleClient.postAsync(url, clientHeaders, clientBody); + } + java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().uri(URI.create(url)) + .timeout(Duration.ofMillis(10_000)) + //存在sendHeader后不发送body数据的问题, java.net.http.HttpRequest的bug? + .method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody)); + if (clientHeaders != null) { + clientHeaders.forEach((n, v) -> builder.header(n, v)); + } + return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) + .thenApply((java.net.http.HttpResponse resp) -> { + final int rs = resp.statusCode(); + if (rs != 200) { + return new HttpResult().status(rs); + } + return new HttpResult(resp.body()); + }); + } + +// +// private CompletableFuture> httpAsync(Serializable userid, HttpSimpleRequest req) { +// final boolean finest = logger.isLoggable(Level.FINEST); +// String module = req.getRequestURI(); +// module = module.substring(1); //去掉/ +// module = module.substring(0, module.indexOf('/')); +// Map headers = req.getHeaders(); +// String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); +// return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { +// if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); +// java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); +// if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); +// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); +// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); +// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); +// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid); +// if (headers != null) headers.forEach((n, v) -> { +// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); +// }); +// builder.header("Content-Type", "x-www-form-urlencoded"); +// if (req.getBody() != null && req.getBody().length > 0) { +// String paramstr = req.getParametersToString(); +// if (paramstr != null) { +// if (req.getRequestURI().indexOf('?') > 0) { +// req.setRequestURI(req.getRequestURI() + "&" + paramstr); +// } else { +// req.setRequestURI(req.getRequestURI() + "?" + paramstr); +// } +// } +// builder.POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(req.getBody())); +// } else { +// String paramstr = req.getParametersToString(); +// if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); +// } +// return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator()); +// }); +// } +// +// private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator it) { +// if (!it.hasNext()) return CompletableFuture.completedFuture(null); +// InetSocketAddress addr = it.next(); +// String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; +// return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> { +// if (resp.statusCode() != 200) return forEachCollectionFuture(finest, userid, req, requesturi, builder, it); +// HttpResult rs = new HttpResult(); +// java.net.http.HttpHeaders hs = resp.headers(); +// if (hs != null) { +// Map> hm = hs.map(); +// if (hm != null) { +// for (Map.Entry> en : hm.entrySet()) { +// if ("date".equals(en.getKey()) || "content-type".equals(en.getKey()) +// || "server".equals(en.getKey()) || "connection".equals(en.getKey())) continue; +// List val = en.getValue(); +// if (val != null && val.size() == 1) { +// rs.header(en.getKey(), val.get(0)); +// } +// } +// } +// } +// rs.setResult(resp.body()); +// if (finest) { +// StringBuilder sb = new StringBuilder(); +// Map params = req.getParams(); +// if (params != null && !params.isEmpty()) { +// params.forEach((n, v) -> sb.append('&').append(n).append('=').append(v)); +// } +// logger.log(Level.FINEST, url + "?userid=" + userid + sb + ", result = " + new String(resp.body(), StandardCharsets.UTF_8)); +// } +// return CompletableFuture.completedFuture(rs); +// }); +// } +} diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java similarity index 88% rename from src/main/java/org/redkale/mq/HttpMessageLocalClient.java rename to src/main/java/org/redkale/cluster/HttpLocalRpcClient.java index 1875b6afc..3db102b71 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java @@ -3,16 +3,17 @@ * To change this template file, choose Tools | Templates * and open the template in the editor. */ -package org.redkale.mq; +package org.redkale.cluster; import java.io.Serializable; import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; +import java.util.logging.Logger; import org.redkale.boot.*; +import org.redkale.cluster.HttpRpcClient; import org.redkale.convert.Convert; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; @@ -28,22 +29,23 @@ import org.redkale.util.RedkaleException; * * @since 2.4.0 */ -public class HttpMessageLocalClient extends HttpMessageClient { +public class HttpLocalRpcClient extends HttpRpcClient { + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Application application; protected final String resourceName; - protected HttpServer server; + protected HttpServer currServer; - public HttpMessageLocalClient(Application application, String resourceName) { - super(null); + public HttpLocalRpcClient(Application application, String resourceName) { this.application = application; this.resourceName = resourceName; } private HttpServer httpServer() { - if (this.server == null) { + if (this.currServer == null) { NodeHttpServer nodeHttpServer = null; List nodeServers = application.getNodeServers(); for (NodeServer n : nodeServers) { @@ -60,9 +62,14 @@ public class HttpMessageLocalClient extends HttpMessageClient { } } } - this.server = nodeHttpServer.getServer(); + this.currServer = nodeHttpServer.getServer(); } - return this.server; + return this.currServer; + } + + @Override + protected int getNodeid() { + return application.getNodeid(); } protected HttpContext context() { @@ -73,11 +80,11 @@ public class HttpMessageLocalClient extends HttpMessageClient { return (HttpDispatcherServlet) httpServer().getDispatcherServlet(); } - protected HttpServlet findHttpServlet(String topic) { + public HttpServlet findHttpServlet(String topic) { return dispatcherServlet().findServletByTopic(topic); } - protected HttpServlet findHttpServlet(HttpSimpleRequest request) { + public HttpServlet findHttpServlet(HttpSimpleRequest request) { return dispatcherServlet().findServletByTopic(generateHttpReqTopic(request, request.getPath())); } @@ -114,7 +121,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - protected CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { HttpServlet servlet = findHttpServlet(topic); if (servlet == null) { if (logger.isLoggable(Level.FINE)) { @@ -146,7 +153,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { HttpDispatcherServlet ps = dispatcherServlet(); HttpServlet servlet = ps.findServletByTopic(topic); if (servlet == null) { diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/cluster/HttpRpcClient.java similarity index 55% rename from src/main/java/org/redkale/mq/HttpMessageClient.java rename to src/main/java/org/redkale/cluster/HttpRpcClient.java index fd3b81e5d..9c4914037 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/cluster/HttpRpcClient.java @@ -3,16 +3,13 @@ * To change this template file, choose Tools | Templates * and open the template in the editor. */ -package org.redkale.mq; +package org.redkale.cluster; import java.io.Serializable; import java.lang.reflect.Type; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.LongAdder; -import java.util.logging.Logger; import org.redkale.convert.json.JsonConvert; -import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST; import org.redkale.net.http.*; /** @@ -25,25 +22,77 @@ import org.redkale.net.http.*; * * @since 2.1.0 */ -public class HttpMessageClient extends MessageClient { +public abstract class HttpRpcClient implements ClusterRpcClient> { - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + @Override + public final void produceMessage(HttpSimpleRequest request) { + produceMessage(generateHttpReqTopic(request, null), 0, null, request); + } - protected HttpMessageClient(MessageAgent messageAgent) { - super(messageAgent); - if (messageAgent != null) { // //RPC方式下无messageAgent - this.appRespTopic = messageAgent.generateAppHttpRespTopic(); - } + public final void produceMessage(Serializable userid, HttpSimpleRequest request) { + produceMessage(generateHttpReqTopic(request, null), userid, null, request); + } + + public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) { + produceMessage(generateHttpReqTopic(request, null), userid, groupid, request); + } + + public final void produceMessage(String topic, HttpSimpleRequest request) { + produceMessage(topic, 0, null, request); + } + + @Override + public final CompletableFuture> sendMessage(HttpSimpleRequest request) { + return sendMessage(generateHttpReqTopic(request, null), 0, null, request); + } + + public final CompletableFuture> sendMessage(Serializable userid, HttpSimpleRequest request) { + return sendMessage(generateHttpReqTopic(request, null), userid, null, request); + } + + public final CompletableFuture> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request) { + return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request); + } + + public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request) { + return sendMessage(topic, 0, null, request); + } + + public CompletableFuture sendMessage(HttpSimpleRequest request, Type type) { + return sendMessage(generateHttpReqTopic(request, null), 0, null, request).thenApply((HttpResult httbs) -> { + if (httbs == null || httbs.getResult() == null) { + return null; + } + return JsonConvert.root().convertFrom(type, httbs.getResult()); + }); + } + + public CompletableFuture sendMessage(Serializable userid, HttpSimpleRequest request, Type type) { + return sendMessage(generateHttpReqTopic(request, null), userid, null, request).thenApply((HttpResult httbs) -> { + if (httbs == null || httbs.getResult() == null) { + return null; + } + return JsonConvert.root().convertFrom(type, httbs.getResult()); + }); + } + + public CompletableFuture sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) { + return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request).thenApply((HttpResult httbs) -> { + if (httbs == null || httbs.getResult() == null) { + return null; + } + return JsonConvert.root().convertFrom(type, httbs.getResult()); + }); } //格式: http.req.user public String generateHttpReqTopic(String module) { - return MessageAgent.generateHttpReqTopic(module); + return Rest.generateHttpReqTopic(module, getNodeid()); } //格式: http.req.user-n10 public String generateHttpReqTopic(String module, String resname) { - return MessageAgent.generateHttpReqTopic(module, resname); + return Rest.generateHttpReqTopic(module, resname, getNodeid()); } public String generateHttpReqTopic(HttpSimpleRequest request, String path) { @@ -55,91 +104,13 @@ public class HttpMessageClient extends MessageClient { module = module.substring(0, module.indexOf('/')); Map headers = request.getHeaders(); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); - return MessageAgent.generateHttpReqTopic(module, resname); + return Rest.generateHttpReqTopic(module, resname, getNodeid()); } - public final void produceMessage(HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), 0, null, request, null); - } + public abstract CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request); - public final void produceMessage(Serializable userid, HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), userid, null, request, null); - } + public abstract void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request); - public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); - } + protected abstract int getNodeid(); - public final void produceMessage(String topic, HttpSimpleRequest request) { - produceMessage(topic, 0, null, request, null); - } - - public final void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { - produceMessage(topic, userid, groupid, request, null); - } - - public CompletableFuture sendMessage(HttpSimpleRequest request, Type type) { - return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult httbs) -> { - if (httbs == null || httbs.getResult() == null) { - return null; - } - return JsonConvert.root().convertFrom(type, httbs.getResult()); - }); - } - - public CompletableFuture sendMessage(Serializable userid, HttpSimpleRequest request, Type type) { - return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null).thenApply((HttpResult httbs) -> { - if (httbs == null || httbs.getResult() == null) { - return null; - } - return JsonConvert.root().convertFrom(type, httbs.getResult()); - }); - } - - public CompletableFuture sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) { - return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null).thenApply((HttpResult httbs) -> { - if (httbs == null || httbs.getResult() == null) { - return null; - } - return JsonConvert.root().convertFrom(type, httbs.getResult()); - }); - } - - public final CompletableFuture> sendMessage(HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null); - } - - public final CompletableFuture> sendMessage(Serializable userid, HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null); - } - - public final CompletableFuture> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); - } - - public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request) { - return sendMessage(topic, 0, null, request, null); - } - - public final CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { - return sendMessage(topic, userid, null, request, (LongAdder) null); - } - - protected CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); - message.userid(userid).groupid(groupid); - //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); - return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); - } - - protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); - message.userid(userid).groupid(groupid); - sendMessage(message, false, counter); - } - - @Override - protected MessageClientProducer getProducer() { - return messageAgent.getHttpMessageClientProducer(); - } } diff --git a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java deleted file mode 100644 index 764c12cf4..000000000 --- a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.*; -import java.util.function.*; -import java.util.logging.*; -import org.redkale.boot.NodeHttpServer; -import org.redkale.net.http.*; -import org.redkale.service.Service; -import org.redkale.util.*; - -/** - * 一个Service对应一个MessageProcessor - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class HttpMessageClientProcessor implements MessageClientProcessor { - - protected final Logger logger; - - protected HttpMessageClient messageClient; - - protected final MessageClientProducer producer; - - protected final NodeHttpServer server; - - protected final Service service; - - protected final HttpServlet servlet; - - protected final String restModule; // 前后有/, 例如: /user/ - - protected ThreadLocal> respPoolThreadLocal; - - protected final Supplier respSupplier; - - protected final Consumer respConsumer; - - protected CountDownLatch cdl; - - protected long startTime; - - protected final Runnable innerCallback = () -> { - if (cdl != null) { - cdl.countDown(); - } - }; - - public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { - this.logger = logger; - this.messageClient = messageClient; - this.producer = producer; - this.server = server; - this.service = service; - this.servlet = servlet; - this.restModule = "/" + Rest.getRestModule(service) + "/"; - this.respSupplier = () -> respPoolThreadLocal.get().get(); - this.respConsumer = resp -> respPoolThreadLocal.get().accept(resp); - this.respPoolThreadLocal = Utility.withInitialThreadLocal(() -> ObjectPool.createUnsafePool(Utility.cpus(), - ps -> new HttpMessageResponse(server.getHttpServer().getContext(), messageClient, respSupplier, respConsumer), HttpMessageResponse::prepare, HttpMessageResponse::recycle)); - } - - @Override - public void begin(final int size, long starttime) { - this.startTime = starttime; - this.cdl = size > 1 ? new CountDownLatch(size) : null; - } - - @Override - public void process(final MessageRecord message, final Runnable callback) { - execute(message, innerCallback); - } - - private void execute(final MessageRecord message, final Runnable callback) { - HttpMessageRequest request = null; - try { - Traces.computeIfAbsent(message.getTraceid()); - long now = System.currentTimeMillis(); - long cha = now - message.createTime; - long e = now - startTime; - HttpMessageResponse response = respSupplier.get(); - request = response.request(); - response.prepare(message, callback, producer); - - server.getHttpServer().getContext().execute(servlet, request, response); - long o = System.currentTimeMillis() - now; - if ((cha > 1000 || e > 100 || o > 1000) && logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "HttpMessageProcessor.process (mqs.delays = " + cha + " ms, mqs.blocks = " + e + " ms, mqs.executes = " + o + " ms) message: " + message); - } else if ((cha > 50 || e > 10 || o > 50) && logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "HttpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms, mq.executes = " + o + " ms) message: " + message); - } else if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "HttpMessageProcessor.process (mq.delay = " + cha + " ms, mq.block = " + e + " ms, mq.execute = " + o + " ms) message: " + message); - } - } catch (Throwable ex) { - if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) { - HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(), - null, message, callback, messageClient, producer, message.getRespTopic(), new HttpResult().status(500)); - } - logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); - } - } - - @Override - public void commit() { - if (this.cdl != null) { - try { - this.cdl.await(30, TimeUnit.SECONDS); - } catch (Exception ex) { - logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " commit error, restmodule=" + this.restModule, ex); - } - this.cdl = null; - } - } - - public MessageClientProducer getProducer() { - return producer; - } - - public NodeHttpServer getServer() { - return server; - } - - public Service getService() { - return service; - } - - public HttpServlet getServlet() { - return servlet; - } - -} diff --git a/src/main/java/org/redkale/mq/HttpMessageRequest.java b/src/main/java/org/redkale/mq/HttpMessageRequest.java index 04644f5d3..f1a982022 100644 --- a/src/main/java/org/redkale/mq/HttpMessageRequest.java +++ b/src/main/java/org/redkale/mq/HttpMessageRequest.java @@ -22,7 +22,14 @@ public class HttpMessageRequest extends HttpRequest { protected MessageRecord message; public HttpMessageRequest(HttpContext context) { + this(context, (MessageRecord) null); + } + + public HttpMessageRequest(HttpContext context, MessageRecord message) { super(context, (HttpSimpleRequest) null); + if (message != null) { + prepare(message); + } } protected HttpMessageRequest prepare(MessageRecord message) { diff --git a/src/main/java/org/redkale/mq/HttpMessageResponse.java b/src/main/java/org/redkale/mq/HttpMessageResponse.java index b7a1bf639..a728940bc 100644 --- a/src/main/java/org/redkale/mq/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/HttpMessageResponse.java @@ -29,26 +29,19 @@ import org.redkale.service.RetResult; */ public class HttpMessageResponse extends HttpResponse { - protected final HttpMessageClient messageClient; + protected MessageClient messageClient; protected MessageRecord message; - protected MessageClientProducer producer; - - protected Runnable callback; - - public HttpMessageResponse(HttpContext context, HttpMessageClient messageClient, Supplier respSupplier, Consumer respConsumer) { - super(context, new HttpMessageRequest(context), null); - this.responseSupplier = (Supplier) respSupplier; - this.responseConsumer = (Consumer) respConsumer; + public HttpMessageResponse(MessageClient messageClient, HttpContext context, HttpMessageRequest request) { + super(context, request, null); this.messageClient = messageClient; + this.message = request.message; } - public void prepare(MessageRecord message, Runnable callback, MessageClientProducer producer) { + public void prepare(MessageRecord message) { ((HttpMessageRequest) request).prepare(message); this.message = message; - this.callback = callback; - this.producer = producer; } public HttpMessageRequest request() { @@ -56,22 +49,19 @@ public class HttpMessageResponse extends HttpResponse { } public void finishHttpResult(Type type, HttpResult result) { - finishHttpResult(producer.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(), - type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); + finishHttpResult(messageClient.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(), + type, this.message, this.messageClient, message.getRespTopic(), result); } public void finishHttpResult(Convert respConvert, Type type, HttpResult result) { - finishHttpResult(producer.logger.isLoggable(Level.FINEST), + finishHttpResult(messageClient.logger.isLoggable(Level.FINEST), respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, - type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); + type, this.message, this.messageClient, message.getRespTopic(), result); } public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, - Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) { - if (callback != null) { - callback.run(); - } - if (resptopic == null || resptopic.isEmpty()) { + MessageClient messageClient, String respTopic, HttpResult result) { + if (respTopic == null || respTopic.isEmpty()) { return; } if (result.getResult() instanceof RetResult) { @@ -89,11 +79,11 @@ public class HttpMessageResponse extends HttpResponse { if (innerrs instanceof byte[]) { innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); } - producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + messageClient.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); } byte[] content = HttpResultCoder.getInstance().encode(result); - producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content)); + messageClient.getProducer().apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, respTopic, null, content)); } @Override @@ -109,17 +99,12 @@ public class HttpMessageResponse extends HttpResponse { this.responseSupplier = respSupplier; this.responseConsumer = respConsumer; this.message = null; - this.producer = null; - this.callback = null; return rs; } @Override public void finish(final Convert convert, Type type, RetResult ret) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } finishHttpResult(convert, type, new HttpResult(ret).convert(ret == null ? null : ret.convert())); @@ -128,9 +113,6 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finish(final Convert convert, Type type, HttpResult result) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } if (convert != null) { @@ -152,9 +134,6 @@ public class HttpMessageResponse extends HttpResponse { finish(convert, type, (RetResult) obj); } else { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } finishHttpResult(convert, type, new HttpResult(obj)); @@ -164,9 +143,6 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finish(String obj) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } finishHttpResult(String.class, new HttpResult(obj == null ? "" : obj)); @@ -195,14 +171,11 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finish(int status, String msg) { if (status > 400) { - producer.logger.log(Level.WARNING, "HttpMessageResponse.finish status: " + status + ", uri: " + this.request.getRequestURI() + ", message: " + this.message); - } else if (producer.logger.isLoggable(Level.FINEST)) { - producer.logger.log(Level.FINEST, "HttpMessageResponse.finish status: " + status); + messageClient.logger.log(Level.WARNING, "HttpMessageResponse.finish status: " + status + ", uri: " + this.request.getRequestURI() + ", message: " + this.message); + } else if (messageClient.logger.isLoggable(Level.FINEST)) { + messageClient.logger.log(Level.FINEST, "HttpMessageResponse.finish status: " + status); } if (this.message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } finishHttpResult(String.class, new HttpResult(msg == null ? "" : msg).status(status)); @@ -211,9 +184,6 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finish(boolean kill, final byte[] bs, int offset, int length) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } if (offset == 0 && bs.length == length) { @@ -226,9 +196,6 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); @@ -238,9 +205,6 @@ public class HttpMessageResponse extends HttpResponse { @Override protected void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length, Consumer consumer, A attachment) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); @@ -250,9 +214,6 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finishBuffer(boolean kill, ByteBuffer buffer) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } byte[] bs = new byte[buffer.remaining()]; @@ -263,9 +224,6 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finishBuffers(boolean kill, ByteBuffer... buffers) { if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } return; } int size = 0; diff --git a/src/main/java/org/redkale/mq/HttpMessageServlet.java b/src/main/java/org/redkale/mq/HttpMessageServlet.java new file mode 100644 index 000000000..2a10bf9d0 --- /dev/null +++ b/src/main/java/org/redkale/mq/HttpMessageServlet.java @@ -0,0 +1,52 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.logging.*; +import org.redkale.boot.NodeHttpServer; +import org.redkale.net.Context; +import org.redkale.net.Request; +import org.redkale.net.Response; +import org.redkale.net.http.*; +import org.redkale.service.Service; + +/** + * 一个Service对应一个MessageProcessor + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class HttpMessageServlet extends MessageServlet { + + public HttpMessageServlet(MessageClient messageClient, NodeHttpServer server, + Service service, HttpServlet servlet, String topic) { + super(messageClient, server, service, servlet, topic); + } + + @Override + protected Request createRequest(Context context, MessageRecord message) { + return new HttpMessageRequest((HttpContext) context, message); + } + + @Override + protected Response createResponse(Context context, Request request) { + return new HttpMessageResponse(messageClient, (HttpContext) context, (HttpMessageRequest) request); + } + + @Override + protected void onError(Response response, MessageRecord message, Throwable t) { + if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) { + HttpMessageRequest request = ((HttpMessageResponse) response).request(); + HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), response == null ? null : request.getRespConvert(), + null, message, messageClient, message.getRespTopic(), new HttpResult().status(500)); + } + } + +} diff --git a/src/main/java/org/redkale/mq/HttpRpcMessageClient.java b/src/main/java/org/redkale/mq/HttpRpcMessageClient.java new file mode 100644 index 000000000..483969132 --- /dev/null +++ b/src/main/java/org/redkale/mq/HttpRpcMessageClient.java @@ -0,0 +1,49 @@ +/* + * + */ +package org.redkale.mq; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; +import org.redkale.cluster.HttpRpcClient; +import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST; +import org.redkale.net.http.HttpResult; +import org.redkale.net.http.HttpSimpleRequest; + +/** + * + * @author zhangjx + */ +final class HttpRpcMessageClient extends HttpRpcClient { + + private final MessageCoder requestCoder = HttpSimpleRequestCoder.getInstance(); + + private final int nodeid; + + private final MessageClient messageClient; + + public HttpRpcMessageClient(MessageClient messageClient, final int nodeid) { + this.messageClient = messageClient; + this.nodeid = nodeid; + } + + @Override + public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + MessageRecord message = messageClient.createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), requestCoder.encode(request)); + message.userid(userid).groupid(groupid); + return messageClient.sendMessage(message).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); + } + + @Override + public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + MessageRecord message = messageClient.createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), requestCoder.encode(request)); + message.userid(userid).groupid(groupid); + messageClient.produceMessage(message); + } + + @Override + protected int getNodeid() { + return nodeid; + } + +} diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index be3ecb337..b8a13f1a5 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -14,13 +14,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.*; -import java.util.stream.Collectors; import org.redkale.annotation.*; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ResourceListener; import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NAME; import static org.redkale.boot.Application.RESNAME_APP_NODEID; +import org.redkale.cluster.HttpRpcClient; import org.redkale.convert.Convert; import org.redkale.convert.ConvertFactory; import org.redkale.convert.ConvertType; @@ -61,6 +61,11 @@ public abstract class MessageAgent implements Resourcable { private ExecutorService workExecutor; + private int timeoutSeconds; + + final AtomicLong msgSeqno = new AtomicLong(Math.abs(System.nanoTime())); + + //-------------------------- MessageConsumer、MessageProducer -------------------------- protected final ReentrantLock messageProducerLock = new ReentrantLock(); protected MessageProducer messageBaseProducer; @@ -72,31 +77,31 @@ public abstract class MessageAgent implements Resourcable { //key: group, sub-key: topic protected final Map> messageConsumerMap = new HashMap<>(); - protected MessageClientProducer httpClientProducer; + //-------------------------- HttpRpcClient、SncpMessageClient -------------------------- + private HttpRpcMessageClient httpRpcClient; - protected MessageClientProducer sncpClientProducer; + private String httpAppRespTopic; - protected HttpMessageClient httpMessageClient; + private String sncpAppRespTopic; - protected SncpMessageClient sncpMessageClient; + protected MessageClient httpMessageClient; + + protected MessageClient sncpMessageClient; + + protected MessageClientProducer clientMessageProducer; protected final ReentrantLock clientConsumerLock = new ReentrantLock(); protected final ReentrantLock clientProducerLock = new ReentrantLock(); - protected final ReentrantLock serviceLock = new ReentrantLock(); - protected MessageCoder clientMessageCoder = MessageRecordSerializer.getInstance(); - //本地Service消息接收处理器, key:consumerid - protected HashMap clientConsumerNodes = new LinkedHashMap<>(); - - protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); - protected ScheduledThreadPoolExecutor timeoutExecutor; public void init(AnyValue config) { this.name = checkName(config.getValue("name", "")); + this.httpAppRespTopic = generateHttpAppRespTopic(); + this.sncpAppRespTopic = generateSncpAppRespTopic(); int threads = config.getIntValue("threads", -1); if (threads == 0) { this.workExecutor = application.getWorkExecutor(); @@ -105,8 +110,9 @@ public abstract class MessageAgent implements Resourcable { this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s") : WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s"); } - this.httpMessageClient = new HttpMessageClient(this); - this.sncpMessageClient = new SncpMessageClient(this); + this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic, Rest.getHttpReqTopicPrefix()); + this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic, Sncp.getSncpReqTopicPrefix()); + String coderType = config.getValue("coder", ""); if (!coderType.trim().isEmpty()) { try { @@ -132,6 +138,7 @@ public abstract class MessageAgent implements Resourcable { t.setDaemon(true); return t; }); + this.timeoutSeconds = config.getIntValue("timeoutSeconds", 30); this.timeoutExecutor.setRemoveOnCancelPolicy(true); } @@ -145,20 +152,38 @@ public abstract class MessageAgent implements Resourcable { if (loginfo.length() > 0) { logger.log(Level.INFO, loginfo.toString()); } - this.clientConsumerNodes.values().forEach(node -> { - long s = System.currentTimeMillis(); - node.consumer.start(); - long e = System.currentTimeMillis() - s; - }); + + this.clientMessageProducer = createMessageClientProducer("redkale-message-producer"); + if (this.httpRpcClient != null || !this.httpMessageClient.isEmpty()) { + this.httpMessageClient.putMessageRespProcessor(); + } + if (!this.sncpMessageClient.isEmpty()) { + this.sncpMessageClient.putMessageRespProcessor(); + } + this.startMessageClientConsumers(); + List topics = new ArrayList<>(); + if (this.httpMessageClient.isEmpty()) { + topics.addAll(this.httpMessageClient.getTopics()); + } + if (!this.sncpMessageClient.isEmpty()) { + topics.addAll(this.sncpMessageClient.getTopics()); + } + if (!topics.isEmpty()) { + Collections.sort(topics); + loginfo = new StringBuilder(); + loginfo.append(MessageClientConsumer.class.getSimpleName() + " subscribe topics:\r\n"); + for (String topic : topics) { + loginfo.append(" ").append(topic).append("\r\n"); + } + logger.log(Level.INFO, loginfo.toString()); + } } //Application.stop 在执行server.shutdown之前执行 public void stop() { this.stopMessageConsumer(); this.stopMessageProducer(); - this.clientConsumerNodes.values().forEach(node -> { - node.consumer.stop(); - }); + this.stopMessageClientConsumers(); } //Application.stop 在所有server.shutdown执行后执行 @@ -168,15 +193,11 @@ public abstract class MessageAgent implements Resourcable { } this.messageConsumerList.clear(); this.messageConsumerMap.clear(); - - this.httpMessageClient.close(); - this.sncpMessageClient.close(); - - if (this.httpClientProducer != null) { - this.httpClientProducer.stop(); - } - if (this.sncpClientProducer != null) { - this.sncpClientProducer.stop(); + //-------------- MessageClient -------------- + this.httpMessageClient.stop(); + this.sncpMessageClient.stop(); + if (this.clientMessageProducer != null) { + this.clientMessageProducer.stop(); } if (this.clientMessageCoder instanceof Service) { ((Service) this.clientMessageCoder).destroy(config); @@ -287,6 +308,10 @@ public abstract class MessageAgent implements Resourcable { return logger; } + public int getTimeoutSeconds() { + return timeoutSeconds; + } + public String getName() { return name; } @@ -303,11 +328,25 @@ public abstract class MessageAgent implements Resourcable { return workExecutor; } - public HttpMessageClient getHttpMessageClient() { + public HttpRpcClient getHttpRpcClient() { + if (this.httpRpcClient == null) { + messageProducerLock.lock(); + try { + if (this.httpRpcClient == null) { + this.httpRpcClient = new HttpRpcMessageClient(this.httpMessageClient, this.nodeid); + } + } finally { + messageProducerLock.unlock(); + } + } + return httpRpcClient; + } + + public MessageClient getHttpMessageClient() { return httpMessageClient; } - public SncpMessageClient getSncpMessageClient() { + public MessageClient getSncpMessageClient() { return sncpMessageClient; } @@ -326,83 +365,19 @@ public abstract class MessageAgent implements Resourcable { return name; } - protected List getMessageClientConsumers() { - List consumers = new ArrayList<>(); - MessageClientConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer; - if (one != null) { - consumers.add(one); - } - one = this.sncpMessageClient == null ? null : this.sncpMessageClient.respConsumer; - if (one != null) { - consumers.add(one); - } - consumers.addAll(clientConsumerNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList())); - return consumers; - } - - protected List getMessageClientProducers() { - List producers = new ArrayList<>(); - if (this.httpClientProducer != null) { - producers.add(this.httpClientProducer); - } - if (this.sncpClientProducer != null) { - producers.add(this.sncpClientProducer); - } - MessageClientProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); - if (one != null) { - producers.add(one); - } - one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer(); - if (one != null) { - producers.add(one); - } - return producers; - } - public MessageCoder getClientMessageCoder() { return this.clientMessageCoder; } - //获取指定topic的生产处理器 - public MessageClientProducer getSncpMessageClientProducer() { - if (this.sncpClientProducer == null) { - clientProducerLock.lock(); - try { - if (this.sncpClientProducer == null) { - long s = System.currentTimeMillis(); - this.sncpClientProducer = createMessageClientProducer("SncpProducer"); - long e = System.currentTimeMillis() - s; - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms"); - } - } - } finally { - clientProducerLock.unlock(); - } - } - return this.sncpClientProducer; + public MessageClientProducer getMessageClientProducer() { + return this.clientMessageProducer; } - public MessageClientProducer getHttpMessageClientProducer() { - if (this.httpClientProducer == null) { - clientProducerLock.lock(); - try { - if (this.httpClientProducer == null) { - long s = System.currentTimeMillis(); - this.httpClientProducer = createMessageClientProducer("HttpProducer"); - long e = System.currentTimeMillis() - s; - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); - } - } - } finally { - clientProducerLock.unlock(); - } - } - return this.httpClientProducer; - } + // + protected abstract void startMessageClientConsumers(); + + protected abstract void stopMessageClientConsumers(); - // protected abstract void startMessageConsumer(); protected abstract void stopMessageConsumer(); @@ -429,9 +404,6 @@ public abstract class MessageAgent implements Resourcable { //创建指定topic的生产处理器 protected abstract MessageClientProducer createMessageClientProducer(String producerName); - //创建指定topic的消费处理器 - public abstract MessageClientConsumer createMessageClientConsumer(String topic, String group, MessageClientProcessor processor); - public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) { @@ -447,18 +419,12 @@ public abstract class MessageAgent implements Resourcable { return; } } - String topic = generateHttpReqTopic(service); - String consumerid = generateHttpConsumerid(topic, service); - serviceLock.lock(); - try { - if (clientConsumerNodes.containsKey(consumerid)) { - throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); - } - HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); - this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); - } finally { - serviceLock.unlock(); + if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && nodeid == 0) { + throw new RedkaleException("Application.node not config in WebSocket Cluster"); } + String topic = Rest.generateHttpReqTopic(service, this.nodeid); + MessageServlet processor = new HttpMessageServlet(this.httpMessageClient, ns, service, servlet, topic); + this.httpMessageClient.putMessageServlet(processor); } public final void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { @@ -470,71 +436,36 @@ public abstract class MessageAgent implements Resourcable { if (al2 != null && !al2.value() && service.getClass().getAnnotation(Local.class) != null) { return; } - String topic = generateSncpReqTopic(service); - String consumerid = generateSncpConsumerid(topic, service); - serviceLock.lock(); - try { - if (clientConsumerNodes.containsKey(consumerid)) { - throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); - } - SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); - this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); - } finally { - serviceLock.unlock(); + if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && nodeid == 0) { + throw new RedkaleException("Application.node not config in WebSocket Cluster"); } - } - - //格式: sncp.req.module.user - public final String generateSncpReqTopic(Service service) { - return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service)); - } - - //格式: sncp.req.module.user - public final String generateSncpReqTopic(String resourceName, Class resourceType) { - if (WebSocketNode.class.isAssignableFrom(resourceType)) { - return "sncp.req.module.ws" + (resourceName.isEmpty() ? "" : ("-" + resourceName)) + ".node" + nodeid; - } - return "sncp.req.module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName)); - } - - //格式: http.req.module.user - public static String generateHttpReqTopic(String module) { - return "http.req.module." + module.toLowerCase(); - } - - //格式: http.req.module.user - public static String generateHttpReqTopic(String module, String resname) { - return "http.req.module." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname)); + String topic = Sncp.generateSncpReqTopic(service, this.nodeid); + MessageServlet processor = new SncpMessageServlet(this.sncpMessageClient, ns, service, servlet, topic); + this.sncpMessageClient.putMessageServlet(processor); } //格式: sncp.resp.app.node10 - protected String generateAppSncpRespTopic() { - return "sncp.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; + //格式参考Rest.generateHttpReqTopic + private String generateSncpAppRespTopic() { + return Sncp.getSncpRespTopicPrefix() + "app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } //格式: http.resp.app.node10 - protected String generateAppHttpRespTopic() { - return "http.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; + //格式参考Rest.generateHttpReqTopic + private String generateHttpAppRespTopic() { + return Rest.getHttpRespTopicPrefix() + "app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } - //格式: http.req.module.user - protected String generateHttpReqTopic(Service service) { - String resname = Sncp.getResourceName(service); - String module = Rest.getRestModule(service).toLowerCase(); - return "http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname)); + public final String getHttpAppRespTopic() { + return this.httpAppRespTopic; } - //格式: consumer-sncp.req.module.user 不提供外部使用 - protected final String generateSncpConsumerid(String topic, Service service) { - return "consumer-" + topic; + public final String getSncpAppRespTopic() { + return this.sncpAppRespTopic; } - //格式: consumer-http.req.module.user - protected String generateHttpConsumerid(String topic, Service service) { - String resname = Sncp.getResourceName(service); - String key = Rest.getRestModule(service).toLowerCase(); - return "consumer-http.req.module." + key + (resname.isEmpty() ? "" : ("-" + resname)); - + public final int getNodeid() { + return this.nodeid; } public static class MessageConsumerWrapper { @@ -647,11 +578,11 @@ public abstract class MessageAgent implements Resourcable { public final Servlet servlet; - public final MessageClientProcessor processor; + public final MessageServlet processor; public final MessageClientConsumer consumer; - public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) { + public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageServlet processor, MessageClientConsumer consumer) { this.server = server; this.service = service; this.servlet = servlet; diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index cd1ba2711..9227e7424 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -6,15 +6,26 @@ package org.redkale.mq; import java.nio.charset.StandardCharsets; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; +import java.util.logging.Logger; +import org.redkale.cluster.ClusterRpcClient; import org.redkale.convert.Convert; import org.redkale.convert.json.JsonConvert; -import static org.redkale.mq.MessageRecord.*; -import org.redkale.net.http.*; +import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST; +import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT; +import static org.redkale.mq.MessageRecord.CTYPE_STRING; +import org.redkale.net.http.HttpResult; +import org.redkale.net.http.HttpSimpleRequest; +import org.redkale.util.RedkaleException; import org.redkale.util.Traces; +import org.redkale.util.Utility; /** * @@ -25,108 +36,68 @@ import org.redkale.util.Traces; * * @since 2.1.0 */ -public abstract class MessageClient { +public class MessageClient implements ClusterRpcClient, MessageProcessor { - protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - private final ReentrantLock lock = new ReentrantLock(); + private final MessageAgent messageAgent; - protected final MessageAgent messageAgent; + private final String appRespTopic; + + private final String reqTopicPrefix; + + protected final ReentrantLock processorLock = new ReentrantLock(); protected final AtomicLong msgSeqno; - protected MessageClientConsumer respConsumer; + //key: reqTopic + private final HashMap messageProcessors = new HashMap<>(); - protected String appRespTopic; + final ConcurrentHashMap respQueue = new ConcurrentHashMap<>(); - protected String appRespConsumerid; - - private final String clazzName; - - protected MessageClient(MessageAgent messageAgent) { + protected MessageClient(MessageAgent messageAgent, String appRespTopic, String reqTopicPrefix) { this.messageAgent = messageAgent; - this.msgSeqno = messageAgent == null ? new AtomicLong() : messageAgent.msgSeqno; - this.clazzName = getClass().getSimpleName(); + this.appRespTopic = appRespTopic; + this.reqTopicPrefix = reqTopicPrefix; + this.msgSeqno = messageAgent.msgSeqno; } - protected void close() { - if (this.respConsumer != null) { - this.respConsumer.stop(); + @Override + public void process(final MessageRecord msg, long time) { + MessageProcessor processor = messageProcessors.get(msg.getTopic()); + if (processor == null) { + throw new RedkaleException(msg.getTopic() + " not found MessageProcessor, record=" + msg); + } else { + processor.process(msg, time); } } - protected CompletableFuture sendMessage(final MessageRecord message, boolean needresp) { - return sendMessage(message, needresp, null); + void putMessageRespProcessor() { + this.messageProcessors.put(appRespTopic, new MessageRespProcessor(this)); } - protected CompletableFuture sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) { + public Collection getTopics() { + return this.messageProcessors.keySet(); + } + + @Override + public void produceMessage(MessageRecord message) { + messageAgent.getMessageClientProducer().apply(message); + } + + @Override + public CompletableFuture sendMessage(final MessageRecord message) { CompletableFuture future = new CompletableFuture<>(); - boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST); try { - if (this.respConsumer == null) { - lock.lock(); - try { - if (this.appRespConsumerid == null) { - this.appRespConsumerid = "consumer-" + this.appRespTopic; - } - if (this.respConsumer == null) { - MessageClientProcessor processor = (msg, callback) -> { - long now = System.currentTimeMillis(); - MessageRespFutureNode node = respNodes.remove(msg.getSeqid()); - if (node == null) { - messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error, not found mqresp.futurenode"); - return; - } - if (node.scheduledFuture != null) { - node.scheduledFuture.cancel(true); - } - LongAdder ncer = node.getCounter(); - if (ncer != null) { - ncer.decrement(); - } - final long cha = now - msg.createTime; - if (finest) { - messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")"); - } - node.future.complete(msg); - long cha2 = System.currentTimeMillis() - now; - if ((cha > 1000 || cha2 > 1000) && messageAgent != null && messageAgent.logger.isLoggable(Level.FINE)) { - messageAgent.logger.log(Level.FINE, clazzName + ".MessageRespFutureNode.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms, mqs.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); - } else if ((cha > 50 || cha2 > 50) && messageAgent != null && messageAgent.logger.isLoggable(Level.FINER)) { - messageAgent.logger.log(Level.FINER, clazzName + ".MessageRespFutureNode.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms, mq.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); - } else if (finest) { - messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms, mq.counter = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); - } - }; - long ones = System.currentTimeMillis(); - MessageClientConsumer one = messageAgent.createMessageClientConsumer(appRespTopic, appRespConsumerid, processor); - one.start(); - long onee = System.currentTimeMillis() - ones; - if (finest) { - messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.startup " + onee + "ms "); - } - this.respConsumer = one; - } - } finally { - lock.unlock(); - } - } - if (needresp && (message.getRespTopic() == null || message.getRespTopic().isEmpty())) { + if (Utility.isEmpty(message.getRespTopic())) { message.setRespTopic(appRespTopic); } - if (counter != null) { - counter.increment(); - } - getProducer().apply(message); - if (needresp) { - MessageRespFutureNode node = new MessageRespFutureNode(messageAgent.logger, message, respNodes, counter, future); - respNodes.put(message.getSeqid(), node); - ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor; - if (executor != null) { - node.scheduledFuture = executor.schedule(node, 30, TimeUnit.SECONDS); - } - } else { - future.complete(null); + messageAgent.getMessageClientProducer().apply(message); + MessageRespFuture respNode = new MessageRespFuture(this, future, message); + respQueue.put(message.getSeqid(), respNode); + ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor; + if (executor != null && messageAgent.getTimeoutSeconds() > 0) { + respNode.scheduledFuture = executor.schedule(respNode, messageAgent.getTimeoutSeconds(), TimeUnit.SECONDS); } } catch (Throwable ex) { future.completeExceptionally(ex); @@ -134,84 +105,96 @@ public abstract class MessageClient { return future; } - protected MessageRecord formatRespMessage(MessageRecord message) { - return message; + //非线程安全 + public void putMessageServlet(MessageServlet servlet) { + String topic = servlet.getTopic(); + processorLock.lock(); + try { + if (messageProcessors.containsKey(topic)) { + throw new RedkaleException("req-topic(" + topic + ") is repeat"); + } + messageProcessors.put(topic, servlet); + } finally { + processorLock.unlock(); + } } - protected abstract MessageClientProducer getProducer(); + public boolean isEmpty() { + return messageProcessors.size() < 1; + } - public MessageRecord createMessageRecord(String resptopic, String content) { + public MessageRecord createMessageRecord(String respTopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, - null, null, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, null, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } - public MessageRecord createMessageRecord(String topic, String resptopic, String content) { + public MessageRecord createMessageRecord(String topic, String respTopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, - null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, topic, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } - public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { + public MessageRecord createMessageRecord(String topic, String respTopic, String traceid, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, - null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, topic, respTopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } - public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { + public MessageRecord createMessageRecord(int userid, String topic, String respTopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, - null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, topic, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } - public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { + public MessageRecord createMessageRecord(int userid, String topic, String respTopic, String traceid, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, - null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, topic, respTopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } - public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { + public MessageRecord createMessageRecord(String topic, String respTopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, - null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); + null, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } - public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { + public MessageRecord createMessageRecord(String topic, String respTopic, String traceid, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, - null, topic, resptopic, traceid, convert.convertToBytes(bean)); + null, topic, respTopic, traceid, convert.convertToBytes(bean)); } - public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { + public MessageRecord createMessageRecord(int userid, String topic, String respTopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, - null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); + null, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } - public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { + public MessageRecord createMessageRecord(int userid, String groupid, String topic, String respTopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, - groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); + groupid, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } - public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { + public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String respTopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, - groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); + groupid, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } - public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currentTraceid(), content); + public MessageRecord createMessageRecord(String topic, String respTopic, byte[] content) { + return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, respTopic, Traces.currentTraceid(), content); } - public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currentTraceid(), content); + public MessageRecord createMessageRecord(long seqid, String topic, String respTopic, byte[] content) { + return new MessageRecord(seqid, (byte) 0, topic, respTopic, Traces.currentTraceid(), content); } - protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currentTraceid(), content); + protected MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, byte[] content) { + return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, respTopic, Traces.currentTraceid(), content); } - protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, traceid, content); + protected MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, String traceid, byte[] content) { + return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, respTopic, traceid, content); } - protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currentTraceid(), content); + protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, byte[] content) { + return new MessageRecord(seqid, ctype, topic, respTopic, Traces.currentTraceid(), content); } - protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { - return new MessageRecord(seqid, ctype, topic, resptopic, traceid, content); + protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, String traceid, byte[] content) { + return new MessageRecord(seqid, ctype, topic, respTopic, traceid, content); } private byte ctype(Convert convert, Object bean) { @@ -225,4 +208,31 @@ public abstract class MessageClient { } return ctype; } + + public void start() { + } + + public void stop() { + } + + public MessageAgent getMessageAgent() { + return messageAgent; + } + + public MessageCoder getClientMessageCoder() { + return this.messageAgent.getClientMessageCoder(); + } + + public MessageClientProducer getProducer() { + return messageAgent.getMessageClientProducer(); + } + + public String getAppRespTopic() { + return appRespTopic; + } + + public String getReqTopicPrefix() { + return reqTopicPrefix; + } + } diff --git a/src/main/java/org/redkale/mq/MessageClientConsumer.java b/src/main/java/org/redkale/mq/MessageClientConsumer.java index 93240efbe..a51be4afc 100644 --- a/src/main/java/org/redkale/mq/MessageClientConsumer.java +++ b/src/main/java/org/redkale/mq/MessageClientConsumer.java @@ -5,9 +5,7 @@ */ package org.redkale.mq; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import java.util.Collection; import java.util.Objects; import java.util.logging.Logger; @@ -21,45 +19,28 @@ import java.util.logging.Logger; * * @since 2.1.0 */ -public abstract class MessageClientConsumer { - - protected final List topics; - - protected final String consumerid; - - protected MessageAgent messageAgent; - - protected final MessageClientProcessor processor; +public abstract class MessageClientConsumer implements MessageProcessor { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - protected volatile boolean closed; + protected MessageClient messageClient; - protected MessageClientConsumer(MessageAgent messageAgent, String topic, final String consumerid, MessageClientProcessor processor) { - Objects.requireNonNull(messageAgent); - Objects.requireNonNull(topic); - Objects.requireNonNull(consumerid); - Objects.requireNonNull(processor); - this.messageAgent = messageAgent; - this.topics = Collections.unmodifiableList(Arrays.asList(topic)); - this.consumerid = consumerid; - this.processor = processor; + protected MessageClientConsumer(MessageClient messageClient) { + Objects.requireNonNull(messageClient); + this.messageClient = messageClient; } - public MessageClientProcessor getProcessor() { - return processor; + public Collection getTopics() { + return messageClient.getTopics(); } - public List getTopics() { - return topics; + @Override + public void process(MessageRecord message, long time) { + messageClient.process(message, time); } public abstract void start(); public abstract void stop(); - public boolean isClosed() { - return closed; - } - } diff --git a/src/main/java/org/redkale/mq/MessageClientProcessor.java b/src/main/java/org/redkale/mq/MessageClientProcessor.java deleted file mode 100644 index 444733689..000000000 --- a/src/main/java/org/redkale/mq/MessageClientProcessor.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -/** - * 一个Service对应一个MessageProcessor - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public interface MessageClientProcessor { - - default void begin(int size, long starttime) { - } - - public void process(MessageRecord message, Runnable callback); - - default void commit() { - } -} diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index 35b8a48c5..8e298d52d 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -25,7 +25,7 @@ public interface MessageConsumer { default void init(AnyValue config) { } - public void onMessage(MessageConext context, T messages); + public void onMessage(MessageConext context, T message); default void destroy(AnyValue config) { } diff --git a/src/main/java/org/redkale/mq/MessageProcessor.java b/src/main/java/org/redkale/mq/MessageProcessor.java new file mode 100644 index 000000000..bdfc8d180 --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageProcessor.java @@ -0,0 +1,13 @@ +/* + * + */ +package org.redkale.mq; + +/** + * + * @author zhangjx + */ +public interface MessageProcessor { + + public void process(final MessageRecord msg, long time); +} diff --git a/src/main/java/org/redkale/mq/MessageRecord.java b/src/main/java/org/redkale/mq/MessageRecord.java index 725fd3cdc..b41cdce09 100644 --- a/src/main/java/org/redkale/mq/MessageRecord.java +++ b/src/main/java/org/redkale/mq/MessageRecord.java @@ -32,11 +32,14 @@ public class MessageRecord implements Serializable { protected static final byte CTYPE_STRING = 1; - protected static final byte CTYPE_HTTP_REQUEST = 2; + //Bson bytes + protected static final byte CTYPE_BSON = 2; - protected static final byte CTYPE_HTTP_RESULT = 3; + //HttpSimpleRequest + protected static final byte CTYPE_HTTP_REQUEST = 3; - protected static final byte CTYPE_BSON_RESULT = 4; + //HttpResult + protected static final byte CTYPE_HTTP_RESULT = 4; @ConvertColumn(index = 1) @Comment("消息序列号") @@ -54,8 +57,9 @@ public class MessageRecord implements Serializable { @Comment("创建时间") protected long createTime; + //@since 2.5.0 由int改成Serializable @ConvertColumn(index = 5) - @Comment("用户ID,无用户信息视为null或0, 具体数据类型只能是int、long、String") //@since 2.5.0 由int改成Serializable + @Comment("用户ID,无用户信息视为null或0, 具体数据类型只能是int、long、String") protected Serializable userid; @ConvertColumn(index = 6) @@ -330,7 +334,7 @@ public class MessageRecord implements Serializable { sb.append(",\"respTopic\":\"").append(this.respTopic).append("\""); } if (this.content != null) { - if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SUBSIZE) { + if (this.ctype == CTYPE_BSON && this.content.length > SncpHeader.HEADER_SUBSIZE) { //int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符 //Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset); //sb.append(",\"content\":").append(rs); diff --git a/src/main/java/org/redkale/mq/MessageRecordSerializer.java b/src/main/java/org/redkale/mq/MessageRecordSerializer.java index 5871f6107..c218174b9 100644 --- a/src/main/java/org/redkale/mq/MessageRecordSerializer.java +++ b/src/main/java/org/redkale/mq/MessageRecordSerializer.java @@ -46,7 +46,7 @@ public class MessageRecordSerializer implements MessageCoder { + 1 //ctype + 4 //version + 4 //flag - + 8 //createtime + + 8 //createTime + 2 + userid.length + 2 + groupid.length + 2 + topic.length @@ -105,12 +105,12 @@ public class MessageRecordSerializer implements MessageCoder { byte ctype = buffer.get(); int version = buffer.getInt(); int flag = buffer.getInt(); - long createtime = buffer.getLong(); + long createTime = buffer.getLong(); Serializable userid = MessageCoder.decodeUserid(buffer); String groupid = MessageCoder.getShortString(buffer); String topic = MessageCoder.getShortString(buffer); - String resptopic = MessageCoder.getShortString(buffer); + String respTopic = MessageCoder.getShortString(buffer); String traceid = MessageCoder.getShortString(buffer); byte[] content = null; @@ -119,7 +119,7 @@ public class MessageRecordSerializer implements MessageCoder { content = new byte[contentlen]; buffer.get(content); } - return new MessageRecord(seqid, ctype, version, flag, createtime, userid, groupid, topic, resptopic, traceid, content); + return new MessageRecord(seqid, ctype, version, flag, createTime, userid, groupid, topic, respTopic, traceid, content); } } diff --git a/src/main/java/org/redkale/mq/MessageRespFutureNode.java b/src/main/java/org/redkale/mq/MessageRespFuture.java similarity index 59% rename from src/main/java/org/redkale/mq/MessageRespFutureNode.java rename to src/main/java/org/redkale/mq/MessageRespFuture.java index 965ea321c..0c3a6714a 100644 --- a/src/main/java/org/redkale/mq/MessageRespFutureNode.java +++ b/src/main/java/org/redkale/mq/MessageRespFuture.java @@ -6,7 +6,6 @@ package org.redkale.mq; import java.util.concurrent.*; -import java.util.concurrent.atomic.LongAdder; import java.util.logging.*; /** @@ -19,39 +18,33 @@ import java.util.logging.*; * * @since 2.1.0 */ -public class MessageRespFutureNode implements Runnable { +public class MessageRespFuture implements Runnable { protected final long seqid; protected final long createTime; - protected final LongAdder counter; - protected final CompletableFuture future; - protected final Logger logger; - protected final MessageRecord message; - protected final ConcurrentHashMap respNodes; + protected final MessageClient messageClient; protected ScheduledFuture scheduledFuture; - public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap respNodes, LongAdder counter, CompletableFuture future) { - this.logger = logger; + public MessageRespFuture(MessageClient messageClient, CompletableFuture future, MessageRecord message) { + this.messageClient = messageClient; this.message = message; this.seqid = message.getSeqid(); - this.respNodes = respNodes; - this.counter = counter; this.future = future; this.createTime = System.currentTimeMillis(); } @Override //超时后被timeoutExecutor调用 public void run() { //timeout - respNodes.remove(this.seqid); - future.completeExceptionally(new TimeoutException("message-record: "+message)); - logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms" + messageClient.respQueue.remove(this.seqid); + future.completeExceptionally(new TimeoutException("message-record: " + message)); + messageClient.logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms" + (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : "")); } @@ -63,10 +56,6 @@ public class MessageRespFutureNode implements Runnable { return createTime; } - public LongAdder getCounter() { - return counter; - } - public CompletableFuture getFuture() { return future; } diff --git a/src/main/java/org/redkale/mq/MessageRespProcessor.java b/src/main/java/org/redkale/mq/MessageRespProcessor.java new file mode 100644 index 000000000..3a658476a --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageRespProcessor.java @@ -0,0 +1,54 @@ +/* + * + */ +package org.redkale.mq; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * 响应结果 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public class MessageRespProcessor implements MessageProcessor { + + private final MessageClient messageClient; + + public MessageRespProcessor(MessageClient messageClient) { + this.messageClient = messageClient; + } + + @Override + public void process(final MessageRecord msg, long time) { + long now = System.currentTimeMillis(); + Logger logger = messageClient.logger; + final boolean finest = logger.isLoggable(Level.FINEST); + MessageRespFuture resp = messageClient.respQueue.remove(msg.getSeqid()); + if (resp == null) { + logger.log(Level.WARNING, getClass().getSimpleName() + " process " + msg + " error, not found MessageRespFuture"); + return; + } + if (resp.scheduledFuture != null) { + resp.scheduledFuture.cancel(true); + } + final long cha = now - msg.createTime; + if (finest) { + logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")"); + } + resp.future.complete(msg); + long cha2 = System.currentTimeMillis() - now; + if ((cha > 1000 || cha2 > 1000) && logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms) mqresp.msg: " + msg); + } else if ((cha > 50 || cha2 > 50) && logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms) mqresp.msg: " + msg); + } else if (finest) { + logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms) mqresp.msg: " + msg); + } + } +} diff --git a/src/main/java/org/redkale/mq/MessageServlet.java b/src/main/java/org/redkale/mq/MessageServlet.java new file mode 100644 index 000000000..3a1993d14 --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageServlet.java @@ -0,0 +1,102 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.CompletionException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.redkale.boot.NodeServer; +import org.redkale.net.Context; +import org.redkale.net.Request; +import org.redkale.net.Response; +import org.redkale.net.Servlet; +import org.redkale.service.Service; +import org.redkale.util.Traces; + +/** + * 一个Service对应一个MessageProcessor + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public abstract class MessageServlet implements MessageProcessor { + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected final MessageClient messageClient; + + protected final NodeServer server; + + protected final Service service; + + protected final Servlet servlet; + + protected final String topic; + + public MessageServlet(MessageClient messageClient, NodeServer server, Service service, Servlet servlet, String topic) { + this.messageClient = messageClient; + this.server = server; + this.service = service; + this.servlet = servlet; + this.topic = topic; + } + + @Override + public void process(final MessageRecord message, long time) { + Response response = null; + try { + Traces.computeIfAbsent(message.getTraceid()); + long now = System.currentTimeMillis(); + long cha = now - message.createTime; + long e = now - time; + Context context = server.getServer().getContext(); + Request request = createRequest(context, message); + response = createResponse(context, request); + //执行逻辑 + context.execute(servlet, request, response); + long o = System.currentTimeMillis() - now; + if ((cha > 1000 || e > 100 || o > 1000) && logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, getClass().getSimpleName() + ".process (mqs.delays = " + cha + " ms, mqs.blocks = " + e + " ms, mqs.executes = " + o + " ms) message: " + message); + } else if ((cha > 50 || e > 10 || o > 50) && logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, getClass().getSimpleName() + ".process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms, mq.executes = " + o + " ms) message: " + message); + } else if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, getClass().getSimpleName() + ".process (mq.delay = " + cha + " ms, mq.block = " + e + " ms, mq.execute = " + o + " ms) message: " + message); + } + } catch (Throwable ex) { + if (response != null) { + onError(response, message, ex); + } + logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); + } + } + + protected abstract Request createRequest(Context context, MessageRecord message); + + protected abstract Response createResponse(Context context, Request request); + + protected abstract void onError(Response response, MessageRecord message, Throwable t); + + public NodeServer getServer() { + return server; + } + + public Service getService() { + return service; + } + + public Servlet getServlet() { + return servlet; + } + + public String getTopic() { + return topic; + } + +} diff --git a/src/main/java/org/redkale/mq/SncpMessageClient.java b/src/main/java/org/redkale/mq/SncpMessageClient.java deleted file mode 100644 index 0e16667f3..000000000 --- a/src/main/java/org/redkale/mq/SncpMessageClient.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.CompletableFuture; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class SncpMessageClient extends MessageClient { - - protected SncpMessageClient(MessageAgent messageAgent) { - super(messageAgent); - this.appRespTopic = messageAgent.generateAppSncpRespTopic(); - } - - @Override - protected MessageClientProducer getProducer() { - return messageAgent.getSncpMessageClientProducer(); - } - - public String getAppRespTopic() { - return this.appRespTopic; - } - - //只发送消息,不需要响应 - public final void produceMessage(MessageRecord message) { - sendMessage(message, false); - } - - //发送消息,需要响应 - public final CompletableFuture sendMessage(MessageRecord message) { - return sendMessage(message, true); - } - - @Override - protected MessageRecord formatRespMessage(MessageRecord message) { - if (message != null) { - message.ctype = MessageRecord.CTYPE_BSON_RESULT; - } - return message; - } -} diff --git a/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java deleted file mode 100644 index a97185af0..000000000 --- a/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.*; -import java.util.logging.*; -import org.redkale.boot.NodeSncpServer; -import org.redkale.net.sncp.*; -import org.redkale.service.Service; -import org.redkale.util.Traces; - -/** - * 一个Service对应一个MessageProcessor - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class SncpMessageClientProcessor implements MessageClientProcessor { - - protected final Logger logger; - - protected MessageClient messageClient; - - protected final MessageClientProducer producer; - - protected final NodeSncpServer server; - - protected final Service service; - - protected final SncpServlet servlet; - - protected CountDownLatch cdl; - - protected long starttime; - - protected final Runnable innerCallback = () -> { - if (cdl != null) { - cdl.countDown(); - } - }; - - public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { - this.logger = logger; - this.messageClient = messageClient; - this.producer = producer; - this.server = server; - this.service = service; - this.servlet = servlet; - } - - @Override - public void begin(final int size, long starttime) { - this.starttime = starttime; - this.cdl = size > 1 ? new CountDownLatch(size) : null; - } - - @Override - public void process(final MessageRecord message, final Runnable callback) { - execute(message, innerCallback); - } - - private void execute(final MessageRecord message, final Runnable callback) { - SncpMessageResponse response = null; - try { - Traces.computeIfAbsent(message.getTraceid()); - long now = System.currentTimeMillis(); - long cha = now - message.createTime; - long e = now - starttime; - SncpContext context = server.getSncpServer().getContext(); - SncpMessageRequest request = new SncpMessageRequest(context, message); - response = new SncpMessageResponse(context, request, callback, messageClient, producer); - - context.execute(servlet, request, response); - long o = System.currentTimeMillis() - now; - if ((cha > 1000 || e > 100 || o > 1000) && logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "SncpMessageProcessor.process (mqs.delays = " + cha + " ms, mqs.blocks = " + e + " ms, mqs.executes = " + o + " ms) message: " + message); - } else if ((cha > 50 || e > 10 || o > 50) && logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "SncpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms, mq.executes = " + o + " ms) message: " + message); - } else if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "SncpMessageProcessor.process (mq.delay = " + cha + " ms, mq.block = " + e + " ms, mq.execute = " + o + " ms) message: " + message); - } - } catch (Throwable ex) { - if (response != null) { - response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); - } - logger.log(Level.SEVERE, SncpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); - } - } - - @Override - public void commit() { - if (this.cdl != null) { - try { - this.cdl.await(30, TimeUnit.SECONDS); - } catch (Exception ex) { - } - this.cdl = null; - } - } - - public MessageClientProducer getProducer() { - return producer; - } - - public NodeSncpServer getServer() { - return server; - } - - public Service getService() { - return service; - } - - public SncpServlet getServlet() { - return servlet; - } - -} diff --git a/src/main/java/org/redkale/mq/SncpMessageResponse.java b/src/main/java/org/redkale/mq/SncpMessageResponse.java index 86305262e..6f774204a 100644 --- a/src/main/java/org/redkale/mq/SncpMessageResponse.java +++ b/src/main/java/org/redkale/mq/SncpMessageResponse.java @@ -24,32 +24,23 @@ public class SncpMessageResponse extends SncpResponse { protected MessageRecord message; - protected MessageClientProducer producer; - - protected Runnable callback; - - public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, MessageClient messageClient, MessageClientProducer producer) { + public SncpMessageResponse(MessageClient messageClient, SncpContext context, SncpMessageRequest request) { super(context, request); - this.message = request.message; - this.callback = callback; this.messageClient = messageClient; - this.producer = producer; + this.message = request.message; } @Override public void finish(final int retcode, final BsonWriter out) { - if (callback != null) { - callback.run(); - } int headerSize = SncpHeader.calcHeaderSize(request); if (out == null) { final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize); writeHeader(result, 0, retcode); - producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null)); + messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, (byte[]) null)); return; } final ByteArray result = out.toByteArray(); writeHeader(result, result.length() - headerSize, retcode); - producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes())); + messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, result.getBytes())); } } diff --git a/src/main/java/org/redkale/mq/SncpMessageServlet.java b/src/main/java/org/redkale/mq/SncpMessageServlet.java new file mode 100644 index 000000000..1671bceef --- /dev/null +++ b/src/main/java/org/redkale/mq/SncpMessageServlet.java @@ -0,0 +1,49 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import org.redkale.boot.NodeSncpServer; +import org.redkale.net.Context; +import org.redkale.net.Request; +import org.redkale.net.Response; +import org.redkale.net.sncp.*; +import org.redkale.service.Service; + +/** + * 一个Service对应一个MessageProcessor + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class SncpMessageServlet extends MessageServlet { + + public SncpMessageServlet(MessageClient messageClient, NodeSncpServer server, + Service service, SncpServlet servlet, String topic) { + super(messageClient, server, service, servlet, topic); + } + + @Override + protected Request createRequest(Context context, MessageRecord message) { + return new SncpMessageRequest((SncpContext) context, message); + } + + @Override + protected Response createResponse(Context context, Request request) { + return new SncpMessageResponse(messageClient, (SncpContext) context, (SncpMessageRequest) request); + } + + @Override + protected void onError(Response response, MessageRecord message, Throwable t) { + if (response != null) { + ((SncpMessageResponse) response).finish(SncpResponse.RETCODE_ILLSERVICEID, null); + } + } + +} diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index a406449a5..379a13361 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -27,6 +27,9 @@ public class Context { //服务启动时间 protected final long serverStartTime; + //Application节点id + protected final int nodeid; + //Server的线程池 protected final ExecutorService workExecutor; @@ -76,15 +79,16 @@ public class Context { protected Charset charset; public Context(ContextConfig config) { - this(config.serverStartTime, config.logger, config.workExecutor, config.sslBuilder, config.sslContext, + this(config.serverStartTime, config.nodeid, config.logger, config.workExecutor, config.sslBuilder, config.sslContext, config.bufferCapacity, config.maxConns, config.maxBody, config.charset, config.serverAddress, config.resourceFactory, config.dispatcher, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds); } - public Context(long serverStartTime, Logger logger, ExecutorService workExecutor, SSLBuilder sslBuilder, SSLContext sslContext, + public Context(long serverStartTime, int nodeid, Logger logger, ExecutorService workExecutor, SSLBuilder sslBuilder, SSLContext sslContext, int bufferCapacity, final int maxConns, final int maxBody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, DispatcherServlet dispatcher, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) { this.serverStartTime = serverStartTime; + this.nodeid = nodeid; this.logger = logger; this.workExecutor = workExecutor; this.sslBuilder = sslBuilder; @@ -172,6 +176,10 @@ public class Context { return serverStartTime; } + public int getNodeid() { + return nodeid; + } + public Charset getCharset() { return charset; } @@ -209,6 +217,9 @@ public class Context { //服务启动时间 public long serverStartTime; + //Application节点id + public int nodeid; + //Server的线程池 public ExecutorService workExecutor; diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index 09e146607..49e1e0910 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -427,6 +427,7 @@ public abstract class Server T orElse(T t, T defValue) { diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java index d0a5615a8..d760ccc04 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNode.java +++ b/src/main/java/org/redkale/net/http/WebSocketNode.java @@ -14,10 +14,12 @@ import java.util.stream.*; import org.redkale.annotation.*; import org.redkale.annotation.Comment; import org.redkale.boot.Application; +import static org.redkale.boot.Application.RESNAME_APP_NODEID; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; import org.redkale.mq.MessageAgent; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; +import org.redkale.net.sncp.Sncp; import org.redkale.service.*; import org.redkale.source.CacheSource; import org.redkale.util.*; @@ -40,6 +42,9 @@ public abstract class WebSocketNode implements Service { protected final Logger logger = Logger.getLogger(WebSocketNode.class.getSimpleName()); + @Resource(name = RESNAME_APP_NODEID) + protected int nodeid; + //"SNCP_ADDR" 如果不是分布式(没有SNCP) 值为null @Resource(name = Application.RESNAME_SNCP_ADDRESS, required = false) protected InetSocketAddress localSncpAddress; //为SncpServer的服务address @@ -83,7 +88,7 @@ public abstract class WebSocketNode implements Service { this.semaphore = new Semaphore(wsthreads); } } - String mqtopic = this.messageAgent == null ? null : this.messageAgent.generateSncpReqTopic((Service) this); + String mqtopic = this.messageAgent == null ? null : Sncp.generateSncpReqTopic((Service) this, nodeid); if (mqtopic != null || this.localSncpAddress != null) { this.wsNodeAddress = new WebSocketAddress(mqtopic, localSncpAddress); } @@ -717,7 +722,7 @@ public abstract class WebSocketNode implements Service { return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid)); } if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket want send message {userid:" + userid + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString(): (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send message {userid:" + userid + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } CompletableFuture localFuture = null; if (this.localEngine != null) { diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 39307e725..c083f9fa2 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -20,6 +20,7 @@ import org.redkale.asm.Type; import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonConvert; import org.redkale.mq.MessageAgent; +import org.redkale.net.http.WebSocketNode; import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction; import org.redkale.service.*; import org.redkale.util.AnyValue; @@ -254,6 +255,27 @@ public abstract class Sncp { return dyn != null ? dyn.type() : serviceImplClass; } + //格式: sncp.req.module.user + public static String generateSncpReqTopic(Service service, int nodeid) { + return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service), nodeid); + } + + //格式: sncp.req.module.user + public static String generateSncpReqTopic(String resourceName, Class resourceType, int nodeid) { + if (WebSocketNode.class.isAssignableFrom(resourceType)) { + return getSncpReqTopicPrefix() + "module.wsnode" + nodeid + (resourceName.isEmpty() ? "" : ("-" + resourceName)); + } + return getSncpReqTopicPrefix() + "module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName)); + } + + public static String getSncpReqTopicPrefix() { + return "sncp.req."; + } + + public static String getSncpRespTopicPrefix() { + return "sncp.resp."; + } + public static AnyValue getResourceConf(Service service) { if (service == null || !isSncpDyn(service)) { return null; diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 0e98341f1..02acf6a35 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -23,11 +23,14 @@ public class SncpClient extends Client { protected final MessageAgent messageAgent; //MQ模式下此字段才有值 - protected final SncpMessageClient messageClient; + protected final MessageClient messageClient; SncpRemoteInfo(String resourceName, Class resourceType, Class serviceImplClass, Convert convert, SncpRpcGroups sncpRpcGroups, SncpClient sncpClient, MessageAgent messageAgent, String remoteGroup) { @@ -85,7 +85,7 @@ public class SncpRemoteInfo { this.messageAgent = messageAgent; this.remoteGroup = remoteGroup; this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); - this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(resourceName, resourceType); + this.topic = messageAgent == null ? null : Sncp.generateSncpReqTopic(resourceName, resourceType, messageAgent.getNodeid()); for (Map.Entry en : loadMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) { this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient)); @@ -163,7 +163,7 @@ public class SncpRemoteInfo { } ByteArray array = new ByteArray(); request.writeTo(null, array); - MessageRecord message = messageClient.createMessageRecord(targetTopic, null, array.getBytes()); + MessageRecord message = messageAgent.getSncpMessageClient().createMessageRecord(targetTopic, null, array.getBytes()); final String tt = targetTopic; message.localActionName(action.actionName()); message.localParams(params); diff --git a/src/test/java/org/redkale/test/service/ABMainService.java b/src/test/java/org/redkale/test/service/ABMainService.java index 3c9501dc5..36ee06392 100644 --- a/src/test/java/org/redkale/test/service/ABMainService.java +++ b/src/test/java/org/redkale/test/service/ABMainService.java @@ -22,8 +22,8 @@ import org.redkale.net.client.ClientAddress; import org.redkale.net.http.*; import org.redkale.net.sncp.*; import org.redkale.service.Service; -import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.*; +import org.redkale.util.AnyValue.DefaultAnyValue; /** * @@ -43,7 +43,7 @@ public class ABMainService implements Service { final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); asyncGroup.start(); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", abport); - final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); + final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); final ResourceFactory resFactory = ResourceFactory.create(); resFactory.register(JsonConvert.root()); resFactory.register(BsonConvert.root()); diff --git a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java index 979920a1f..4ef29138c 100644 --- a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java @@ -32,7 +32,7 @@ public class SncpClientCodecTest { InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); - SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); + SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); SncpClientCodec codec = new SncpClientCodec(conn); List respResults = new ArrayList(); diff --git a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java index 14be31ebe..3fb97eb14 100644 --- a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java @@ -32,7 +32,7 @@ public class SncpRequestParseTest { InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); - SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); + SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig(); diff --git a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java index c340a49dc..5cf2ee655 100644 --- a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java @@ -46,7 +46,7 @@ public class SncpSleepTest { int port = server.getSocketAddress().getPort(); System.out.println("SNCP服务器启动端口: " + port); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", port); - final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); + final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); final SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); rpcGroups.computeIfAbsent("cs", "TCP").putAddress(sncpAddress); SncpSleepService remoteCService = Sncp.createSimpleRemoteService(SncpSleepService.class, resFactory, rpcGroups, client, "cs"); diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index f6e443efd..69f727df8 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -80,7 +80,7 @@ public class SncpTest { asyncGroup.start(); InetSocketAddress sncpAddress = addr; - final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100); + final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100); final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "client");//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client"); factory.inject(service); diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index c907f749f..2119d4604 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -93,7 +93,7 @@ public class SncpTestServiceImpl implements SncpTestIService { final SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 7070); rpcGroups.computeIfAbsent("g70", "TCP").putAddress(sncpAddress); - final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); + final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); for (Method method : service.getClass().getDeclaredMethods()) {