This commit is contained in:
redkale
2023-05-04 21:35:59 +08:00
parent 26d0cce404
commit 8dbf474662
12 changed files with 189 additions and 144 deletions

View File

@@ -640,8 +640,8 @@ public abstract class NodeServer {
}
protected boolean acceptsComponent(Class<? extends Service> serviceImplClass) {
if (MessageConsumerListener.class.isAssignableFrom(serviceImplClass)) {
MessageConsumer mqConsumer = serviceImplClass.getAnnotation(MessageConsumer.class);
if (MessageConsumer.class.isAssignableFrom(serviceImplClass)) {
ResourceConsumer mqConsumer = serviceImplClass.getAnnotation(ResourceConsumer.class);
if (mqConsumer == null) {
return false;
}
@@ -654,10 +654,10 @@ public abstract class NodeServer {
}
protected boolean interceptComponent(Service service) {
if (service instanceof MessageConsumerListener) {
MessageConsumer mqConsumer = service.getClass().getAnnotation(MessageConsumer.class);
if (service instanceof MessageConsumer) {
ResourceConsumer mqConsumer = service.getClass().getAnnotation(ResourceConsumer.class);
MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq());
mqAgent.addConsumerListener((MessageConsumerListener) service);
mqAgent.addMessageConsumer(mqConsumer, (MessageConsumer) service);
return true;
}
return false;

View File

@@ -32,7 +32,7 @@ public class HttpMessageClient extends MessageClient {
protected HttpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
if (messageAgent != null) { // //RPC方式下无messageAgent
this.respTopic = messageAgent.generateHttpRespTopic();
this.respTopic = messageAgent.generateApplicationHttpRespTopic();
}
}

View File

@@ -14,6 +14,7 @@ import java.util.stream.Collectors;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.annotation.ResourceListener;
import static org.redkale.boot.Application.RESNAME_APP_NAME;
import static org.redkale.boot.Application.RESNAME_APP_NODEID;
import org.redkale.boot.*;
import org.redkale.net.Servlet;
@@ -38,34 +39,42 @@ public abstract class MessageAgent implements Resourcable {
@Resource(required = false)
protected Application application;
@Resource(name = RESNAME_APP_NODEID)
protected int nodeid;
@Resource(name = RESNAME_APP_NAME)
protected String nodeName;
protected String name;
protected AnyValue config;
protected MessageProducer messageProducer;
//key: group, sub-key: topic
protected final ConcurrentHashMap<String, ConcurrentHashMap<String, MessageConsumer>> consumerMap = new ConcurrentHashMap<>();
protected final CopyOnWriteArrayList<MessageConsumer> consumerList = new CopyOnWriteArrayList<>();
protected MessageClientProducer httpProducer;
protected MessageClientProducer sncpProducer;
protected final ReentrantLock httpProducerLock = new ReentrantLock();
protected final ReentrantLock sncpProducerLock = new ReentrantLock();
protected final ReentrantLock httpNodesLock = new ReentrantLock();
protected final ReentrantLock sncpNodesLock = new ReentrantLock();
protected final List<MessageConsumerListener> consumerListeners = new CopyOnWriteArrayList<>();
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
protected HttpMessageClient httpMessageClient;
protected SncpMessageClient sncpMessageClient;
protected final ReentrantLock consumerLock = new ReentrantLock();
protected final ReentrantLock producerLock = new ReentrantLock();
protected final ReentrantLock nodesLock = new ReentrantLock();
protected final List<MessageConsumer> consumerListeners = new CopyOnWriteArrayList<>();
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
protected ScheduledThreadPoolExecutor timeoutExecutor;
protected MessageCoder<MessageRecord> messageCoder = MessageRecordCoder.getInstance();
@@ -226,10 +235,10 @@ public abstract class MessageAgent implements Resourcable {
//获取指定topic的生产处理器
public MessageClientProducer getSncpMessageClientProducer() {
if (this.sncpProducer == null) {
sncpProducerLock.lock();
producerLock.lock();
try {
if (this.sncpProducer == null) {
long s = System.currentTimeMillis();
long s = System.currentTimeMillis();
this.sncpProducer = createMessageClientProducer("SncpProducer");
long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) {
@@ -237,7 +246,7 @@ public abstract class MessageAgent implements Resourcable {
}
}
} finally {
sncpProducerLock.unlock();
producerLock.unlock();
}
}
return this.sncpProducer;
@@ -245,7 +254,7 @@ public abstract class MessageAgent implements Resourcable {
public MessageClientProducer getHttpMessageClientProducer() {
if (this.httpProducer == null) {
httpProducerLock.lock();
producerLock.lock();
try {
if (this.httpProducer == null) {
long s = System.currentTimeMillis();
@@ -253,10 +262,10 @@ public abstract class MessageAgent implements Resourcable {
long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms");
}
}
}
} finally {
httpProducerLock.unlock();
producerLock.unlock();
}
}
return this.httpProducer;
@@ -283,8 +292,24 @@ public abstract class MessageAgent implements Resourcable {
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public void addConsumerListener(MessageConsumerListener listener) {
public void addMessageConsumer(ResourceConsumer res, MessageConsumer consumer) {
consumerLock.lock();
try {
ConcurrentHashMap<String, MessageConsumer> map = consumerMap.computeIfAbsent(res.group(), g -> new ConcurrentHashMap<>());
for (String topic : res.topics()) {
if (!topic.trim().isEmpty()) {
if (map.containsKey(topic.trim())) {
throw new RedkaleException(MessageConsumer.class.getSimpleName()
+ " consume topic (" + topic + ") repeat with "
+ map.get(topic).getClass().getName() + " and " + consumer.getClass().getName());
}
map.put(topic.trim(), consumer);
}
}
consumerList.add(consumer);
} finally {
consumerLock.unlock();
}
}
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
@@ -304,7 +329,7 @@ public abstract class MessageAgent implements Resourcable {
}
String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service);
httpNodesLock.lock();
nodesLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
@@ -312,7 +337,7 @@ public abstract class MessageAgent implements Resourcable {
HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor)));
} finally {
httpNodesLock.unlock();
nodesLock.unlock();
}
}
@@ -327,7 +352,7 @@ public abstract class MessageAgent implements Resourcable {
}
String topic = generateSncpReqTopic(service);
String consumerid = generateSncpConsumerid(topic, service);
sncpNodesLock.lock();
nodesLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
@@ -335,49 +360,49 @@ public abstract class MessageAgent implements Resourcable {
SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor)));
} finally {
sncpNodesLock.unlock();
nodesLock.unlock();
}
}
//格式: sncp.req.user
//格式: sncp.req.module.user
public final String generateSncpReqTopic(Service service) {
return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service));
}
//格式: sncp.req.user
//格式: sncp.req.module.user
public final String generateSncpReqTopic(String resourceName, Class resourceType) {
if (WebSocketNode.class.isAssignableFrom(resourceType)) {
return "sncp.req.ws" + (resourceName.isEmpty() ? "" : ("-" + resourceName)) + ".node" + nodeid;
return "sncp.req.module.ws" + (resourceName.isEmpty() ? "" : ("-" + resourceName)) + ".node" + nodeid;
}
return "sncp.req." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName));
return "sncp.req.module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName));
}
//格式: consumer-sncp.req.user 不提供外部使用
//格式: consumer-sncp.req.module.user 不提供外部使用
protected final String generateSncpConsumerid(String topic, Service service) {
return "consumer-" + topic;
}
//格式: http.req.user
//格式: http.req.module.user
public static String generateHttpReqTopic(String module) {
return "http.req." + module.toLowerCase();
return "http.req.module." + module.toLowerCase();
}
//格式: http.req.user
//格式: http.req.module.user
public static String generateHttpReqTopic(String module, String resname) {
return "http.req." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
return "http.req.module." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
}
//格式: sncp.resp.node10
protected String generateSncpRespTopic() {
return "sncp.resp.node" + nodeid;
//格式: sncp.resp.app.node10
protected String generateApplicationSncpRespTopic() {
return "sncp.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
}
//格式: http.resp.node10
protected String generateHttpRespTopic() {
return "http.resp.node" + nodeid;
//格式: http.resp.app.node10
protected String generateApplicationHttpRespTopic() {
return "http.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
}
//格式: http.req.user
//格式: http.req.module.user
protected String[] generateHttpReqTopics(Service service) {
String resname = Sncp.getResourceName(service);
String module = Rest.getRestModule(service).toLowerCase();
@@ -385,22 +410,17 @@ public abstract class MessageAgent implements Resourcable {
if (mmc != null) {
return new String[]{generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname))};
}
return new String[]{"http.req." + module + (resname.isEmpty() ? "" : ("-" + resname))};
return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))};
}
//格式: consumer-http.req.user
//格式: consumer-http.req.module.user
protected String generateHttpConsumerid(String[] topics, Service service) {
String resname = Sncp.getResourceName(service);
String key = Rest.getRestModule(service).toLowerCase();
return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname));
return "consumer-http.req.module." + key + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: xxxx.resp.node10
protected String generateRespTopic(String protocol) {
return protocol + ".resp.node" + nodeid;
}
protected static class MessageClientConsumerNode {
public final NodeServer server;

View File

@@ -3,10 +3,9 @@
*/
package org.redkale.mq;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
import org.redkale.annotation.Component;
import org.redkale.service.Local;
import org.redkale.util.AnyValue;
/**
* MQ资源注解
@@ -15,19 +14,19 @@ import org.redkale.convert.ConvertType;
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <T> 泛型
*
* @since 2.8.0
*/
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface MessageConsumer {
@Local
@Component
public interface MessageConsumer<T> {
String mq();
default void init(AnyValue config) {
}
String group() default "";
public void onMessage(String topic, T message);
String[] topics();
ConvertType convertType() default ConvertType.JSON;
default void destroy(AnyValue config) {
}
}

View File

@@ -1,31 +0,0 @@
/*
*
*/
package org.redkale.mq;
import org.redkale.annotation.Component;
import org.redkale.service.Local;
import org.redkale.util.AnyValue;
/**
* MQ资源注解
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Local
@Component
public interface MessageConsumerListener<T> {
default void init(AnyValue config) {
}
public void onMessage(String topic, T message);
default void destroy(AnyValue config) {
}
}

View File

@@ -42,7 +42,7 @@ import static java.lang.annotation.ElementType.*;
*
* <p>
* 注: 标记 &#64;MessageMultiConsumer 的Service的&#64;RestMapping方法都只能是void返回类型 <br>
* 由 MessageConsumerListener 代替
* 由 MessageConsumer 代替
* <p>
* 详情见: https://redkale.org
*

View File

@@ -3,13 +3,12 @@
*/
package org.redkale.mq;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import org.redkale.convert.Convert;
/**
* MQ资源注解, 只能标记在MessageProducerSender类型字段上
* MQ消息发送器
*
* <p>
* 详情见: https://redkale.org
@@ -18,13 +17,15 @@ import org.redkale.convert.ConvertType;
*
* @since 2.8.0
*/
@Documented
@Target({FIELD})
@Retention(RUNTIME)
public @interface MessageProducer {
public interface MessageProducer {
String mq();
public CompletableFuture<Void> sendMessage(String topic, Object value);
ConvertType convertType() default ConvertType.JSON;
default CompletableFuture<Void> sendMessage(String topic, Convert convert, Object value) {
return sendMessage(topic, convert.convertToBytes(value));
}
default CompletableFuture<Void> sendMessage(String topic, Convert convert, Type type, Object value) {
return sendMessage(topic, convert.convertToBytes(type, value));
}
}

View File

@@ -1,31 +0,0 @@
/*
*
*/
package org.redkale.mq;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import org.redkale.convert.Convert;
/**
* MQ消息发送器
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public interface MessageProducerSender {
public CompletableFuture<Void> sendMessage(String topic, Object value);
default CompletableFuture<Void> sendMessage(String topic, Convert convert, Object value) {
return sendMessage(topic, convert.convertToBytes(value));
}
default CompletableFuture<Void> sendMessage(String topic, Convert convert, Type type, Object value) {
return sendMessage(topic, convert.convertToBytes(type, value));
}
}

View File

@@ -0,0 +1,33 @@
/*
*
*/
package org.redkale.mq;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
/**
* MQ资源注解, 只能标记在MessageConsumer子类上
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface ResourceConsumer {
String mq();
String group() default "";
String[] topics();
ConvertType convertType() default ConvertType.JSON;
}

View File

@@ -0,0 +1,30 @@
/*
*
*/
package org.redkale.mq;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
/**
* MQ资源注解, 只能标记在MessageProducer类型字段上
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Documented
@Target({FIELD})
@Retention(RUNTIME)
public @interface ResourceProducer {
String mq();
ConvertType convertType() default ConvertType.JSON;
}

View File

@@ -20,7 +20,7 @@ public class SncpMessageClient extends MessageClient {
protected SncpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
this.respTopic = messageAgent.generateSncpRespTopic();
this.respTopic = messageAgent.generateApplicationSncpRespTopic();
}
@Override

View File

@@ -473,6 +473,30 @@ public final class Utility {
});
}
/**
* 是否为空
*
* @param str 字符串
*
* @return 是否为空
*
*/
public static boolean isEmpty(CharSequence str) {
return str == null || str.length() == 0;
}
/**
* 是否不为空
*
* @param str 字符串
*
* @return 是否不为空
*
*/
public static boolean isNotEmpty(CharSequence str) {
return str != null && str.length() > 0;
}
/**
* 将字符串首字母大写
*