diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index 3e8b1dfd3..bd5cac030 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -31,19 +31,33 @@ public class HttpMessageProcessor implements MessageProcessor { protected final HttpServlet servlet; + protected final boolean multiconsumer; + + protected final String restmodule; // 前后有/, 例如: /user/ + + protected final String multimodule; // 前后有/, 例如: /userstat/ + public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.producer = producer; this.server = server; this.service = service; this.servlet = servlet; + MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); + this.multiconsumer = mmc != null; + this.restmodule = "/" + Rest.getRestModule(service) + "/"; + this.multimodule = mmc != null ? ("/" + mmc.module() + "/") : null; } @Override public void process(MessageRecord message) { try { + if (multiconsumer) message.setResptopic(null); //不容许有响应 HttpContext context = server.getHttpServer().getContext(); HttpMessageRequest request = new HttpMessageRequest(context, message); + if (multiconsumer) { + request.setRequestURI(request.getRequestURI().replaceFirst(this.restmodule, this.multimodule)); + } HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer); servlet.execute(request, response); } catch (Exception ex) { diff --git a/src/org/redkale/mq/HttpMessageRequest.java b/src/org/redkale/mq/HttpMessageRequest.java index b015fa981..8611b8921 100644 --- a/src/org/redkale/mq/HttpMessageRequest.java +++ b/src/org/redkale/mq/HttpMessageRequest.java @@ -26,4 +26,7 @@ public class HttpMessageRequest extends HttpRequest { this.currentUserid = message.getUserid(); } + public void setRequestURI(String uri) { + this.requestURI = uri; + } } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 7bad6f6b9..238b75ba9 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -190,13 +190,16 @@ public abstract class MessageAgent { //格式: http.req.user protected String generateHttpReqTopic(Service service) { String resname = Sncp.getResourceName(service); - return "http.req." + Rest.getRestName(service).toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); + String module = Rest.getRestModule(service).toLowerCase(); + MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); + if (mmc != null) return generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname)); + return "http.req." + module + (resname.isEmpty() ? "" : ("-" + resname)); } //格式: consumer-http.req.user protected String generateHttpConsumerid(String topic, Service service) { String resname = Sncp.getResourceName(service); - String key = Rest.getRestName(service).toLowerCase(); + String key = Rest.getRestModule(service).toLowerCase(); return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname)); } diff --git a/src/org/redkale/mq/MessageMultiConsumer.java b/src/org/redkale/mq/MessageMultiConsumer.java new file mode 100644 index 000000000..91c148128 --- /dev/null +++ b/src/org/redkale/mq/MessageMultiConsumer.java @@ -0,0 +1,32 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.lang.annotation.*; +import static java.lang.annotation.ElementType.*; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * 多消费组,需要同 @RestService 一起使用 + *

+ * 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型 + * + *

+ * 详情见: https://redkale.org + * + * + * @author zhangjx + * + * @since 2.1.0 + */ +@Inherited +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +public @interface MessageMultiConsumer { + + String module(); +} diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 5c7fa090f..a1785ae30 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -22,7 +22,7 @@ import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.*; import org.redkale.convert.json.*; -import org.redkale.mq.MessageAgent; +import org.redkale.mq.*; import org.redkale.net.Cryptor; import org.redkale.net.sncp.Sncp; import org.redkale.service.*; @@ -210,7 +210,7 @@ public final class Rest { } } - public static String getRestName(Service service) { + public static String getRestModule(Service service) { final RestService controller = service.getClass().getAnnotation(RestService.class); if (controller != null && !controller.name().isEmpty()) return controller.name(); final Class serviceType = Sncp.getServiceType(service); @@ -819,6 +819,10 @@ public final class Rest { //获取所有可以转换成HttpMapping的方法 int methodidex = 0; final List paramtypes = new ArrayList<>(); + final MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class); + if (mmc != null && (mmc.module() == null || mmc.module().isEmpty())) { + throw new RuntimeException("@" + MessageMultiConsumer.class.getSimpleName() + ".module can not empty in " + serviceType.getName()); + } for (final Method method : serviceType.getMethods()) { if (Modifier.isStatic(method.getModifiers())) continue; if (method.isSynthetic()) continue; @@ -847,6 +851,9 @@ public final class Rest { } } } + if (mmc != null && method.getReturnType() != void.class) { + throw new RuntimeException("@" + RestMapping.class.getSimpleName() + " only for method(" + method + ") with return void by @" + MessageMultiConsumer.class.getSimpleName() + " Service"); + } paramtypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType)); if (mappings.length == 0) { //没有Mapping,设置一个默认值 MappingEntry entry = new MappingEntry(methodidex, null, bigmodulename, method);