From 8954c63285d35714b76af94a2ddce8c08dd1e40f Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 6 Oct 2023 05:36:03 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4MessageMultiConsumer=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/ApiDocCommand.java | 5 - .../redkale/cluster/CacheClusterAgent.java | 27 ---- .../org/redkale/cluster/ClusterAgent.java | 23 +--- .../org/redkale/mq/HttpMessageClient.java | 26 ---- .../mq/HttpMessageClientProcessor.java | 13 -- .../redkale/mq/HttpMessageClusterClient.java | 129 ------------------ .../redkale/mq/HttpMessageLocalClient.java | 14 -- .../java/org/redkale/mq/MessageAgent.java | 4 - .../org/redkale/mq/MessageMultiConsumer.java | 126 ++++++++--------- .../net/http/HttpDispatcherServlet.java | 6 +- .../java/org/redkale/net/http/HttpServer.java | 4 - .../org/redkale/net/http/HttpServlet.java | 4 +- src/main/java/org/redkale/net/http/Rest.java | 19 +-- 13 files changed, 70 insertions(+), 330 deletions(-) diff --git a/src/main/java/org/redkale/boot/ApiDocCommand.java b/src/main/java/org/redkale/boot/ApiDocCommand.java index 549f0ba41..915a37463 100644 --- a/src/main/java/org/redkale/boot/ApiDocCommand.java +++ b/src/main/java/org/redkale/boot/ApiDocCommand.java @@ -16,7 +16,6 @@ import java.util.logging.*; import org.redkale.annotation.Comment; import org.redkale.convert.*; import org.redkale.convert.json.*; -import org.redkale.mq.MessageMultiConsumer; import org.redkale.net.http.*; import org.redkale.persistence.*; import org.redkale.service.RetResult; @@ -107,10 +106,6 @@ public final class ApiDocCommand { if (servlet instanceof WebSocketServlet) { continue; } - if (servlet.getClass().getAnnotation(MessageMultiConsumer.class) != null) { - node.logger.log(Level.INFO, servlet + " be skipped because has @MessageMultiConsumer"); - continue; - } WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); if (ws == null) { node.logger.log(Level.WARNING, servlet + " not found @WebServlet"); diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index a047358d6..f2126592f 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -47,9 +47,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { //可能被sncp用到的服务 key: serviceName protected final ConcurrentHashMap> sncpAddressMap = new ConcurrentHashMap<>(); - //可能被mqtp用到的服务 key: serviceName - protected final ConcurrentHashMap> mqtpAddressMap = new ConcurrentHashMap<>(); - @Override public void init(AnyValue config) { super.init(config); @@ -138,7 +135,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { checkApplicationHealth(); checkHttpAddressHealth(); loadSncpAddressHealth(); - loadMqtpAddressHealth(); localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> { checkLocalHealth(entry); }); @@ -162,17 +158,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { }); } - protected void loadMqtpAddressHealth() { - List keys = source.keysStartsWith("cluster.mqtp:"); - keys.forEach(serviceName -> { - try { - this.mqtpAddressMap.put(serviceName, queryAddress(serviceName).get(3, TimeUnit.SECONDS)); - } catch (Exception e) { - logger.log(Level.SEVERE, "loadMqtpAddressHealth check " + serviceName + " error", e); - } - }); - } - protected void checkHttpAddressHealth() { try { this.httpAddressMap.keySet().stream().forEach(serviceName -> { @@ -209,15 +194,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { }); } - @Override //获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段 - public CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname) { - final Map> rsmap = new ConcurrentHashMap<>(); - final String servicenamprefix = generateHttpServiceName(protocol, module, null) + ":"; - mqtpAddressMap.keySet().stream().filter(k -> k.startsWith(servicenamprefix)) - .forEach(sn -> rsmap.put(sn.substring(servicenamprefix.length()), mqtpAddressMap.get(sn))); - return CompletableFuture.completedFuture(rsmap); - } - @Override //获取HTTP远程服务的可用ip列表 public CompletableFuture> queryHttpAddress(String protocol, String module, String resname) { final String serviceName = generateHttpServiceName(protocol, module, resname); @@ -329,9 +305,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { if (realcanceled && currEntry != null) { currEntry.canceled = true; } - if (!"mqtp".equals(protocol) && currEntry != null && currEntry.submqtp) { - deregister(ns, "mqtp", service, realcanceled); - } } @Override diff --git a/src/main/java/org/redkale/cluster/ClusterAgent.java b/src/main/java/org/redkale/cluster/ClusterAgent.java index 9dd12424f..645741e1d 100644 --- a/src/main/java/org/redkale/cluster/ClusterAgent.java +++ b/src/main/java/org/redkale/cluster/ClusterAgent.java @@ -11,14 +11,13 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; -import org.redkale.annotation.AutoLoad; import org.redkale.annotation.*; +import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ResourceListener; import org.redkale.boot.*; import static org.redkale.boot.Application.*; import org.redkale.convert.ConvertDisabled; import org.redkale.convert.json.JsonConvert; -import org.redkale.mq.MessageMultiConsumer; import org.redkale.net.Server; import org.redkale.net.http.*; import org.redkale.net.sncp.*; @@ -148,14 +147,6 @@ public abstract class ClusterAgent { } ClusterEntry htentry = register(ns, protocol, service); localEntrys.put(htentry.serviceid, htentry); - if (protocol.toLowerCase().startsWith("http")) { - MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); - if (mmc != null) { - ClusterEntry mqentry = register(ns, "mqtp", service); - localEntrys.put(mqentry.serviceid, mqentry); - htentry.submqtp = true; - } - } } //远程模式加载IP列表, 只支持SNCP协议 if (ns.isSNCP()) { @@ -224,9 +215,6 @@ public abstract class ClusterAgent { return 10; } - //获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段 - public abstract CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname); - //获取HTTP远程服务的可用ip列表 public abstract CompletableFuture> queryHttpAddress(String protocol, String module, String resname); @@ -312,11 +300,6 @@ public abstract class ClusterAgent { String module = Rest.getRestModule(service).toLowerCase(); return protocol.toLowerCase() + serviceSeparator() + module + (resname.isEmpty() ? "" : ("-" + resname)); } - if ("mqtp".equalsIgnoreCase(protocol)) { - MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); - String selfmodule = Rest.getRestModule(service).toLowerCase(); - return protocol.toLowerCase() + serviceSeparator() + mmc.module() + serviceSeparator() + selfmodule; - } if (!Sncp.isSncpDyn(service)) { return protocol.toLowerCase() + serviceSeparator() + service.getClass().getName(); } @@ -395,7 +378,7 @@ public abstract class ClusterAgent { public String checkName; - //http or sncp or mqtp + //http or sncp public String protocol; //TCP or UDP @@ -408,8 +391,6 @@ public abstract class ClusterAgent { public boolean canceled; - public boolean submqtp; - public ClusterEntry(NodeServer ns, String protocol, Service service) { this.serviceid = generateServiceId(ns, protocol, service); this.serviceName = generateServiceName(ns, protocol, service); diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index 9f30344d5..fd3b81e5d 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -78,26 +78,6 @@ public class HttpMessageClient extends MessageClient { produceMessage(topic, userid, groupid, request, null); } - public final void broadcastMessage(HttpSimpleRequest request) { - broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null); - } - - public final void broadcastMessage(Serializable userid, HttpSimpleRequest request) { - broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null); - } - - public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request) { - broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); - } - - public final void broadcastMessage(String topic, HttpSimpleRequest request) { - broadcastMessage(topic, 0, null, request, null); - } - - public final void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { - broadcastMessage(topic, userid, groupid, request, null); - } - public CompletableFuture sendMessage(HttpSimpleRequest request, Type type) { return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult httbs) -> { if (httbs == null || httbs.getResult() == null) { @@ -152,12 +132,6 @@ public class HttpMessageClient extends MessageClient { return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } - protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); - message.userid(userid).groupid(groupid); - sendMessage(message, false, counter); - } - protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); diff --git a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java index de242227d..764c12cf4 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java +++ b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java @@ -37,12 +37,8 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { protected final HttpServlet servlet; - protected final boolean multiConsumer; - protected final String restModule; // 前后有/, 例如: /user/ - protected final String multiModule; // 前后有/, 例如: /userstat/ - protected ThreadLocal> respPoolThreadLocal; protected final Supplier respSupplier; @@ -66,10 +62,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { this.server = server; this.service = service; this.servlet = servlet; - MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); - this.multiConsumer = mmc != null; this.restModule = "/" + Rest.getRestModule(service) + "/"; - this.multiModule = mmc != null ? ("/" + mmc.module() + "/") : null; this.respSupplier = () -> respPoolThreadLocal.get().get(); this.respConsumer = resp -> respPoolThreadLocal.get().accept(resp); this.respPoolThreadLocal = Utility.withInitialThreadLocal(() -> ObjectPool.createUnsafePool(Utility.cpus(), @@ -94,15 +87,9 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { long now = System.currentTimeMillis(); long cha = now - message.createTime; long e = now - startTime; - if (multiConsumer) { - message.setRespTopic(null); //不容许有响应 - } HttpMessageResponse response = respSupplier.get(); request = response.request(); response.prepare(message, callback, producer); - if (multiConsumer) { - request.setRequestURI(request.getRequestURI().replaceFirst(this.multiModule, this.restModule)); - } server.getHttpServer().getContext().execute(servlet, request, response); long o = System.currentTimeMillis() - now; diff --git a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java index 22f710976..83bb7a117 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java @@ -72,88 +72,6 @@ public class HttpMessageClusterClient extends HttpMessageClient { } } - @Override - protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - mqtpAsync(userid, request); - } - - private CompletableFuture> mqtpAsync(Serializable userid, HttpSimpleRequest req) { - String module = req.getRequestURI(); - module = module.substring(1); //去掉/ - module = module.substring(0, module.indexOf('/')); - Map headers = req.getHeaders(); - String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); - final String localModule = module; - return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> { - if (addrmap == null || addrmap.isEmpty()) { - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "mqtpAsync.broadcastMessage: module=" + localModule + ", resname=" + resname + ", addrmap is empty"); - } - return new HttpResult().status(404).toFuture(); - } - final Map clientHeaders = new LinkedHashMap<>(); - byte[] clientBody = null; - if (req.isRpc()) { - clientHeaders.put(Rest.REST_HEADER_RPC, "true"); - } - if (req.isFrombody()) { - clientHeaders.put(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); - } - if (req.getReqConvertType() != null) { - clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); - } - if (req.getRespConvertType() != null) { - clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); - } - if (userid != null) { - clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid); - } - if (headers != null) { - headers.forEach((n, v) -> { - if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) { - clientHeaders.put(n, v); - } - }); - } - clientHeaders.put("Content-Type", "x-www-form-urlencoded"); - if (req.getBody() != null && req.getBody().length > 0) { - String paramstr = req.getParametersToString(); - if (paramstr != null) { - if (req.getRequestURI().indexOf('?') > 0) { - req.setRequestURI(req.getRequestURI() + "&" + paramstr); - } else { - req.setRequestURI(req.getRequestURI() + "?" + paramstr); - } - } - clientBody = req.getBody(); - } else { - String paramstr = req.getParametersToString(); - if (paramstr != null) { - clientBody = paramstr.getBytes(StandardCharsets.UTF_8); - } - } - List futures = new ArrayList<>(); - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "mqtpAsync: module=" + localModule + ", resname=" + resname + ", addrmap=" + addrmap); - } - for (Map.Entry> en : addrmap.entrySet()) { - String realmodule = en.getKey(); - Collection addrs = en.getValue(); - if (addrs == null || addrs.isEmpty()) { - continue; - } - String suburi = req.getRequestURI(); - suburi = suburi.substring(1); //跳过 / - suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/')); - futures.add(forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, clientHeaders, clientBody, addrs.iterator())); - } - if (futures.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null); - }); - } - private CompletableFuture> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { String module = req.getRequestURI(); module = module.substring(1); //去掉/ @@ -254,53 +172,6 @@ public class HttpMessageClusterClient extends HttpMessageClient { }); } -// private CompletableFuture> mqtpAsync(Serializable userid, HttpSimpleRequest req) { -// final boolean finest = logger.isLoggable(Level.FINEST); -// String module = req.getRequestURI(); -// module = module.substring(1); //去掉/ -// module = module.substring(0, module.indexOf('/')); -// Map headers = req.getHeaders(); -// String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); -// return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> { -// if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture(); -// java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); -// if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); -// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); -// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); -// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); -// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid); -// if (headers != null) headers.forEach((n, v) -> { -// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); -// }); -// builder.header("Content-Type", "x-www-form-urlencoded"); -// if (req.getBody() != null && req.getBody().length > 0) { -// String paramstr = req.getParametersToString(); -// if (paramstr != null) { -// if (req.getRequestURI().indexOf('?') > 0) { -// req.setRequestURI(req.getRequestURI() + "&" + paramstr); -// } else { -// req.setRequestURI(req.getRequestURI() + "?" + paramstr); -// } -// } -// builder.POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(req.getBody())); -// } else { -// String paramstr = req.getParametersToString(); -// if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); -// } -// List futures = new ArrayList<>(); -// for (Map.Entry> en : addrmap.entrySet()) { -// String realmodule = en.getKey(); -// Collection addrs = en.getValue(); -// if (addrs == null || addrs.isEmpty()) continue; -// String suburi = req.getRequestURI(); -// suburi = suburi.substring(1); //跳过 / -// suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/')); -// futures.add(forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, builder, addrs.iterator())); -// } -// if (futures.isEmpty()) return CompletableFuture.completedFuture(null); -// return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null); -// }); -// } // // private CompletableFuture> httpAsync(Serializable userid, HttpSimpleRequest req) { // final boolean finest = logger.isLoggable(Level.FINEST); diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java index b742108b5..1875b6afc 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java @@ -164,20 +164,6 @@ public class HttpMessageLocalClient extends HttpMessageClient { } } - @Override - protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - HttpDispatcherServlet ps = dispatcherServlet(); - HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); - HttpResponse resp = new HttpMessageLocalResponse(req, null); - ps.filterServletsByMmcTopic(topic).forEach(s -> { - try { - s.execute(req, resp); - } catch (Exception e) { - logger.log(Level.SEVERE, request + " execute " + s + " error", e); - } - }); - } - public static class HttpMessageLocalRequest extends HttpRequest { public HttpMessageLocalRequest(HttpContext context, HttpSimpleRequest req, Serializable userid) { diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 6e5da475c..0c4857e1b 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -488,10 +488,6 @@ public abstract class MessageAgent implements Resourcable { protected String[] generateHttpReqTopics(Service service) { String resname = Sncp.getResourceName(service); String module = Rest.getRestModule(service).toLowerCase(); - MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); - if (mmc != null) { - return new String[]{generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname))}; - } return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))}; } diff --git a/src/main/java/org/redkale/mq/MessageMultiConsumer.java b/src/main/java/org/redkale/mq/MessageMultiConsumer.java index d292c4a9e..33679b26c 100644 --- a/src/main/java/org/redkale/mq/MessageMultiConsumer.java +++ b/src/main/java/org/redkale/mq/MessageMultiConsumer.java @@ -1,63 +1,63 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; -import static java.lang.annotation.ElementType.*; - -/** - * 多消费组,需要同 @RestService 一起使用 - *

- * 通常一个topic只会被一个RestService消费, 当一个topic需要被其他RestService消费时,就需要使用@MessageMultiConsumer - * - *

- * @RestService(name = "user", comment = "用户服务")
- * public class UserService implements Service{
- *
- *      @RestMapping(comment = "用户登录")
- *      public RetResult login(LoginBean bean){
- *          //do something
- *      }
- * }
- * 
- * - * 需求:统计用户登录次数, 可以创建一个MessageMultiConsumer 的 RestService: - *
- * @MessageMultiConsumer(module = "user") 
- * @RestService(name = "loginstat", comment = "用户统计服务")
- * public class LoginStatService implements Service{
- *
- *      private LongAdder counter = new LongAdder();
- *
- *      @RestMapping(name = "login", comment = "用户登录统计")
- *      public void stat(LoginBean bean){     //参数必须和UserService.login方法一致
- *          counter.increment();
- *      }
- * }
- * 
- * - *

- * 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型
- * 由 MessageConsumer 代替 - *

- * 详情见: https://redkale.org - * - * - * @author zhangjx - * @deprecated - * - * @since 2.1.0 - */ -@Inherited -@Documented -@Target({TYPE}) -@Retention(RUNTIME) -@Deprecated(since = "2.8.0") -public @interface MessageMultiConsumer { - - String module(); -} +///* +// * To change this license header, choose License Headers in Project Properties. +// * To change this template file, choose Tools | Templates +// * and open the template in the editor. +// */ +//package org.redkale.mq; +// +//import static java.lang.annotation.RetentionPolicy.RUNTIME; +//import java.lang.annotation.*; +//import static java.lang.annotation.ElementType.*; +// +///** +// * 多消费组,需要同 @RestService 一起使用 +// *

+// * 通常一个topic只会被一个RestService消费, 当一个topic需要被其他RestService消费时,就需要使用@MessageMultiConsumer +// * +// *

+// * @RestService(name = "user", comment = "用户服务")
+// * public class UserService implements Service{
+// *
+// *      @RestMapping(comment = "用户登录")
+// *      public RetResult login(LoginBean bean){
+// *          //do something
+// *      }
+// * }
+// * 
+// * +// * 需求:统计用户登录次数, 可以创建一个MessageMultiConsumer 的 RestService: +// *
+// * @MessageMultiConsumer(module = "user") 
+// * @RestService(name = "loginstat", comment = "用户统计服务")
+// * public class LoginStatService implements Service{
+// *
+// *      private LongAdder counter = new LongAdder();
+// *
+// *      @RestMapping(name = "login", comment = "用户登录统计")
+// *      public void stat(LoginBean bean){     //参数必须和UserService.login方法一致
+// *          counter.increment();
+// *      }
+// * }
+// * 
+// * +// *

+// * 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型
+// * 由 MessageConsumer 代替 +// *

+// * 详情见: https://redkale.org +// * +// * +// * @author zhangjx +// * @deprecated +// * +// * @since 2.1.0 +// */ +//@Inherited +//@Documented +//@Target({TYPE}) +//@Retention(RUNTIME) +//@Deprecated(since = "2.8.0") +//public @interface MessageMultiConsumer { +// +// String module(); +//} diff --git a/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java b/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java index 57365734b..ac3542c0e 100644 --- a/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java @@ -17,8 +17,8 @@ import org.redkale.net.*; import org.redkale.net.Filter; import org.redkale.net.http.Rest.RestDynSourceType; import org.redkale.service.Service; -import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.*; +import org.redkale.util.AnyValue.DefaultAnyValue; /** * HTTP Servlet的总入口,请求在HttpDispatcherServlet中进行分流。
@@ -523,10 +523,6 @@ public class HttpDispatcherServlet extends DispatcherServlet x._reqtopic != null && x._reqtopic.equals(topic)).findFirst().orElse(null); } - public Stream filterServletsByMmcTopic(String mmctopic) { - return filterServlets(x -> x._mmctopic != null && x._mmctopic.equals(mmctopic)); - } - public Stream filterServlets(Predicate predicate) { return predicate == null ? servletStream() : servletStream().filter(predicate); } diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index 16e1473d9..3f0beba12 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -322,10 +322,6 @@ public class HttpServer extends Server String _reqtopic; //根据RestService+MQ生成的值 @since 2.5.0 - String _mmctopic; //根据RestService+@MessageMultiConsumer生成的值 @since 2.5.0 - HashMap _actionmap; //Rest生成时赋值, 字段名Rest有用到 private Map.Entry[] mappings; //字段名Rest有用到 diff --git a/src/main/java/org/redkale/net/http/Rest.java b/src/main/java/org/redkale/net/http/Rest.java index 7d932ce7a..19941f9e5 100644 --- a/src/main/java/org/redkale/net/http/Rest.java +++ b/src/main/java/org/redkale/net/http/Rest.java @@ -6,18 +6,18 @@ package org.redkale.net.http; import java.io.*; +import java.lang.annotation.*; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; import java.lang.reflect.*; import java.net.InetSocketAddress; import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.CompletionStage; -import org.redkale.annotation.Comment; import org.redkale.annotation.*; -import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; +import org.redkale.annotation.Comment; import org.redkale.asm.*; +import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.*; @@ -1627,16 +1627,6 @@ public final class Rest { boolean dynsimple = baseServletType == HttpServlet.class; //有自定义的BaseServlet会存在读取header的操作 //获取所有可以转换成HttpMapping的方法 int methodidex = 0; - final MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class); - if (mmc != null && (mmc.module() == null || mmc.module().isEmpty())) { - throw new RestException("@" + MessageMultiConsumer.class.getSimpleName() + ".module can not empty in " + serviceType.getName()); - } - if (mmc != null && !checkName2(mmc.module())) { - throw new RestException(serviceType.getName() + " have illegal " + MessageMultiConsumer.class.getSimpleName() + ".module, only 0-9 a-z A-Z _ - . cannot begin 0-9"); - } - if (mmc != null) { - Asms.visitAnnotation(cw.visitAnnotation(Type.getDescriptor(mmc.annotationType()), true), mmc); - } final Method[] allMethods = serviceType.getMethods(); Arrays.sort(allMethods, (m1, m2) -> { //必须排序,否则paramTypes顺序容易乱 int s = m1.getName().compareTo(m2.getName()); @@ -1691,9 +1681,6 @@ public final class Rest { } } } - if (mmc != null && method.getReturnType() != void.class) { - throw new RestException("@" + RestMapping.class.getSimpleName() + " only for method(" + method + ") with return void by @" + MessageMultiConsumer.class.getSimpleName() + " Service"); - } paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType)); retvalTypes.add(formatRestReturnType(method, serviceType)); if (mappings.length == 0) { //没有Mapping,设置一个默认值