增加 @MessageMultiConsumer 功能

This commit is contained in:
Redkale
2020-06-11 14:06:25 +08:00
parent bd1e326404
commit bdf2fd21c3
5 changed files with 63 additions and 4 deletions

View File

@@ -31,19 +31,33 @@ public class HttpMessageProcessor implements MessageProcessor {
protected final HttpServlet servlet;
protected final boolean multiconsumer;
protected final String restmodule; // 前后有/, 例如: /user/
protected final String multimodule; // 前后有/, 例如: /userstat/
public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.producer = producer;
this.server = server;
this.service = service;
this.servlet = servlet;
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
this.multiconsumer = mmc != null;
this.restmodule = "/" + Rest.getRestModule(service) + "/";
this.multimodule = mmc != null ? ("/" + mmc.module() + "/") : null;
}
@Override
public void process(MessageRecord message) {
try {
if (multiconsumer) message.setResptopic(null); //不容许有响应
HttpContext context = server.getHttpServer().getContext();
HttpMessageRequest request = new HttpMessageRequest(context, message);
if (multiconsumer) {
request.setRequestURI(request.getRequestURI().replaceFirst(this.restmodule, this.multimodule));
}
HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer);
servlet.execute(request, response);
} catch (Exception ex) {

View File

@@ -26,4 +26,7 @@ public class HttpMessageRequest extends HttpRequest {
this.currentUserid = message.getUserid();
}
public void setRequestURI(String uri) {
this.requestURI = uri;
}
}

View File

@@ -190,13 +190,16 @@ public abstract class MessageAgent {
//格式: http.req.user
protected String generateHttpReqTopic(Service service) {
String resname = Sncp.getResourceName(service);
return "http.req." + Rest.getRestName(service).toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
String module = Rest.getRestModule(service).toLowerCase();
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
if (mmc != null) return generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname));
return "http.req." + module + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: consumer-http.req.user
protected String generateHttpConsumerid(String topic, Service service) {
String resname = Sncp.getResourceName(service);
String key = Rest.getRestName(service).toLowerCase();
String key = Rest.getRestModule(service).toLowerCase();
return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname));
}

View File

@@ -0,0 +1,32 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 多消费组,需要同 @RestService 一起使用
* <p>
* 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型
*
* <p>
* 详情见: https://redkale.org
*
*
* @author zhangjx
*
* @since 2.1.0
*/
@Inherited
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface MessageMultiConsumer {
String module();
}

View File

@@ -22,7 +22,7 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.*;
import org.redkale.net.Cryptor;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.*;
@@ -210,7 +210,7 @@ public final class Rest {
}
}
public static String getRestName(Service service) {
public static String getRestModule(Service service) {
final RestService controller = service.getClass().getAnnotation(RestService.class);
if (controller != null && !controller.name().isEmpty()) return controller.name();
final Class serviceType = Sncp.getServiceType(service);
@@ -819,6 +819,10 @@ public final class Rest {
//获取所有可以转换成HttpMapping的方法
int methodidex = 0;
final List<java.lang.reflect.Type[]> paramtypes = new ArrayList<>();
final MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class);
if (mmc != null && (mmc.module() == null || mmc.module().isEmpty())) {
throw new RuntimeException("@" + MessageMultiConsumer.class.getSimpleName() + ".module can not empty in " + serviceType.getName());
}
for (final Method method : serviceType.getMethods()) {
if (Modifier.isStatic(method.getModifiers())) continue;
if (method.isSynthetic()) continue;
@@ -847,6 +851,9 @@ public final class Rest {
}
}
}
if (mmc != null && method.getReturnType() != void.class) {
throw new RuntimeException("@" + RestMapping.class.getSimpleName() + " only for method(" + method + ") with return void by @" + MessageMultiConsumer.class.getSimpleName() + " Service");
}
paramtypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(methodidex, null, bigmodulename, method);