diff --git a/src/main/java/org/redkale/convert/ConvertType.java b/src/main/java/org/redkale/convert/ConvertType.java index 19f816e3e..cd2300cb8 100644 --- a/src/main/java/org/redkale/convert/ConvertType.java +++ b/src/main/java/org/redkale/convert/ConvertType.java @@ -19,6 +19,7 @@ public enum ConvertType { BSON(2), PROTOBUF(64), PROTOBUF_JSON(64 + 1), + PROTOBUF_BSON(64 + 2), DIY(256), ALL(1023); diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index f04a0c6dd..9f30344d5 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -32,7 +32,7 @@ public class HttpMessageClient extends MessageClient { protected HttpMessageClient(MessageAgent messageAgent) { super(messageAgent); if (messageAgent != null) { // //RPC方式下无messageAgent - this.respTopic = messageAgent.generateApplicationHttpRespTopic(); + this.appRespTopic = messageAgent.generateAppHttpRespTopic(); } } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 7b5d54ecc..fb04fb38c 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -377,11 +377,6 @@ public abstract class MessageAgent implements Resourcable { return "sncp.req.module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName)); } - //格式: consumer-sncp.req.module.user 不提供外部使用 - protected final String generateSncpConsumerid(String topic, Service service) { - return "consumer-" + topic; - } - //格式: http.req.module.user public static String generateHttpReqTopic(String module) { return "http.req.module." + module.toLowerCase(); @@ -393,12 +388,12 @@ public abstract class MessageAgent implements Resourcable { } //格式: sncp.resp.app.node10 - protected String generateApplicationSncpRespTopic() { + protected String generateAppSncpRespTopic() { return "sncp.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } //格式: http.resp.app.node10 - protected String generateApplicationHttpRespTopic() { + protected String generateAppHttpRespTopic() { return "http.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } @@ -413,6 +408,11 @@ public abstract class MessageAgent implements Resourcable { return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))}; } + //格式: consumer-sncp.req.module.user 不提供外部使用 + protected final String generateSncpConsumerid(String topic, Service service) { + return "consumer-" + topic; + } + //格式: consumer-http.req.module.user protected String generateHttpConsumerid(String[] topics, Service service) { String resname = Sncp.getResourceName(service); diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 47a0dd82a..75af8de7a 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -37,9 +37,9 @@ public abstract class MessageClient { protected MessageClientConsumer respConsumer; - protected String respTopic; + protected String appRespTopic; - protected String respConsumerid; + protected String appRespConsumerid; private final String clazzName; @@ -67,8 +67,8 @@ public abstract class MessageClient { if (this.respConsumer == null) { lock.lock(); try { - if (this.respConsumerid == null) { - this.respConsumerid = "consumer-" + this.respTopic; + if (this.appRespConsumerid == null) { + this.appRespConsumerid = "consumer-" + this.appRespTopic; } if (this.respConsumer == null) { MessageClientProcessor processor = (msg, callback) -> { @@ -100,7 +100,7 @@ public abstract class MessageClient { } }; long ones = System.currentTimeMillis(); - MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{respTopic}, respConsumerid, processor); + MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{appRespTopic}, appRespConsumerid, processor); one.startup().join(); long onee = System.currentTimeMillis() - ones; if (finest) { @@ -113,7 +113,7 @@ public abstract class MessageClient { } } if (needresp && (message.getRespTopic() == null || message.getRespTopic().isEmpty())) { - message.setRespTopic(respTopic); + message.setRespTopic(appRespTopic); } if (counter != null) { counter.increment(); @@ -129,11 +129,10 @@ public abstract class MessageClient { } else { future.complete(null); } - } catch (Exception ex) { + } catch (Throwable ex) { future.completeExceptionally(ex); - } finally { - return future; } + return future; } protected MessageRecord formatRespMessage(MessageRecord message) { diff --git a/src/main/java/org/redkale/mq/MessageClientConsumer.java b/src/main/java/org/redkale/mq/MessageClientConsumer.java index 6f6e0934a..3da495e7e 100644 --- a/src/main/java/org/redkale/mq/MessageClientConsumer.java +++ b/src/main/java/org/redkale/mq/MessageClientConsumer.java @@ -55,9 +55,10 @@ public abstract class MessageClientConsumer { public abstract CompletableFuture startup(); + public abstract CompletableFuture shutdown(); + public boolean isClosed() { return closed; } - public abstract CompletableFuture shutdown(); } diff --git a/src/main/java/org/redkale/mq/MessageClientProducer.java b/src/main/java/org/redkale/mq/MessageClientProducer.java index 8d6093846..8430a68a8 100644 --- a/src/main/java/org/redkale/mq/MessageClientProducer.java +++ b/src/main/java/org/redkale/mq/MessageClientProducer.java @@ -35,9 +35,10 @@ public abstract class MessageClientProducer { public abstract CompletableFuture startup(); + public abstract CompletableFuture shutdown(); + public boolean isClosed() { return closed.get(); } - public abstract CompletableFuture shutdown(); } diff --git a/src/main/java/org/redkale/mq/MessageResponse.java b/src/main/java/org/redkale/mq/MessageResponse.java deleted file mode 100644 index 0dab904b3..000000000 --- a/src/main/java/org/redkale/mq/MessageResponse.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public interface MessageResponse { - - public void finish(MessageRecord message); -} diff --git a/src/main/java/org/redkale/mq/SncpMessageClient.java b/src/main/java/org/redkale/mq/SncpMessageClient.java index 25f11b044..0e16667f3 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClient.java +++ b/src/main/java/org/redkale/mq/SncpMessageClient.java @@ -20,7 +20,7 @@ public class SncpMessageClient extends MessageClient { protected SncpMessageClient(MessageAgent messageAgent) { super(messageAgent); - this.respTopic = messageAgent.generateApplicationSncpRespTopic(); + this.appRespTopic = messageAgent.generateAppSncpRespTopic(); } @Override @@ -28,8 +28,8 @@ public class SncpMessageClient extends MessageClient { return messageAgent.getSncpMessageClientProducer(); } - public String getRespTopic() { - return this.respTopic; + public String getAppRespTopic() { + return this.appRespTopic; } //只发送消息,不需要响应 diff --git a/src/main/java/org/redkale/mq/SncpMessageRequest.java b/src/main/java/org/redkale/mq/SncpMessageRequest.java index 5ae09cb8d..374ce42ec 100644 --- a/src/main/java/org/redkale/mq/SncpMessageRequest.java +++ b/src/main/java/org/redkale/mq/SncpMessageRequest.java @@ -29,5 +29,5 @@ public class SncpMessageRequest extends SncpRequest { this.createTime = System.currentTimeMillis(); readHeader(ByteBuffer.wrap(message.getContent()), null); } - + } diff --git a/src/main/java/org/redkale/util/ResourceFactory.java b/src/main/java/org/redkale/util/ResourceFactory.java index 51ee45e81..f3983fbe8 100644 --- a/src/main/java/org/redkale/util/ResourceFactory.java +++ b/src/main/java/org/redkale/util/ResourceFactory.java @@ -5,14 +5,14 @@ */ package org.redkale.util; -import java.lang.annotation.*; -import java.lang.ref.*; +import java.lang.annotation.Annotation; +import java.lang.ref.WeakReference; import java.lang.reflect.*; import java.math.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; import org.redkale.annotation.*; @@ -46,8 +46,6 @@ public final class ResourceFactory { public static final String RESOURCE_SELF_TYPE = "@type"; - private static final boolean skipCheckRequired = Boolean.getBoolean("redkale.resource.skip.check"); - private static final Logger logger = Logger.getLogger(ResourceFactory.class.getSimpleName()); private final ReentrantLock lock = new ReentrantLock(); @@ -56,7 +54,7 @@ public final class ResourceFactory { private final List> chidren = new CopyOnWriteArrayList<>(); - private final ConcurrentHashMap resAnnotationProviderMap = new ConcurrentHashMap(); + private final ConcurrentHashMap, ResourceAnnotationProvider> resAnnotationProviderMap = new ConcurrentHashMap(); private final ConcurrentHashMap resTypeLoaderMap = new ConcurrentHashMap(); @@ -814,7 +812,7 @@ public final class ResourceFactory { try { list.add(srcObj); Class clazz = srcObj.getClass(); - final boolean diyloaderflag = !parentRoot().resAnnotationProviderMap.isEmpty(); + final boolean diyLoaderFlag = !parentRoot().resAnnotationProviderMap.isEmpty(); do { if (java.lang.Enum.class.isAssignableFrom(clazz)) { break; @@ -853,13 +851,12 @@ public final class ResourceFactory { break; } } - if (flag && diyloaderflag) { + if (flag && diyLoaderFlag) { parentRoot().resAnnotationProviderMap.values().stream().forEach(iloader -> { Annotation ann = field.getAnnotation(iloader.annotationType()); - if (ann == null) { - return; + if (ann != null) { + iloader.load(this, srcResourceName, srcObj, ann, field, attachment); } - iloader.load(this, srcResourceName, srcObj, ann, field, attachment); }); } if (ns == null) { @@ -1002,7 +999,7 @@ public final class ResourceFactory { if (rs != null) { field.set(srcObj, rs); } - if (rs == null && !skipCheckRequired && rc1 != null && rc1.required()) { + if (rs == null && rc1 != null && rc1.required()) { String t = srcObj.getClass().getName(); if (srcObj.getClass().getSimpleName().startsWith("_Dyn")) { t = srcObj.getClass().getSuperclass().getName();