This commit is contained in:
redkale
2023-05-05 08:12:03 +08:00
parent 8dbf474662
commit 20a98e2a95
10 changed files with 35 additions and 56 deletions

View File

@@ -19,6 +19,7 @@ public enum ConvertType {
BSON(2),
PROTOBUF(64),
PROTOBUF_JSON(64 + 1),
PROTOBUF_BSON(64 + 2),
DIY(256),
ALL(1023);

View File

@@ -32,7 +32,7 @@ public class HttpMessageClient extends MessageClient {
protected HttpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
if (messageAgent != null) { // //RPC方式下无messageAgent
this.respTopic = messageAgent.generateApplicationHttpRespTopic();
this.appRespTopic = messageAgent.generateAppHttpRespTopic();
}
}

View File

@@ -377,11 +377,6 @@ public abstract class MessageAgent implements Resourcable {
return "sncp.req.module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName));
}
//格式: consumer-sncp.req.module.user 不提供外部使用
protected final String generateSncpConsumerid(String topic, Service service) {
return "consumer-" + topic;
}
//格式: http.req.module.user
public static String generateHttpReqTopic(String module) {
return "http.req.module." + module.toLowerCase();
@@ -393,12 +388,12 @@ public abstract class MessageAgent implements Resourcable {
}
//格式: sncp.resp.app.node10
protected String generateApplicationSncpRespTopic() {
protected String generateAppSncpRespTopic() {
return "sncp.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
}
//格式: http.resp.app.node10
protected String generateApplicationHttpRespTopic() {
protected String generateAppHttpRespTopic() {
return "http.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
}
@@ -413,6 +408,11 @@ public abstract class MessageAgent implements Resourcable {
return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))};
}
//格式: consumer-sncp.req.module.user 不提供外部使用
protected final String generateSncpConsumerid(String topic, Service service) {
return "consumer-" + topic;
}
//格式: consumer-http.req.module.user
protected String generateHttpConsumerid(String[] topics, Service service) {
String resname = Sncp.getResourceName(service);

View File

@@ -37,9 +37,9 @@ public abstract class MessageClient {
protected MessageClientConsumer respConsumer;
protected String respTopic;
protected String appRespTopic;
protected String respConsumerid;
protected String appRespConsumerid;
private final String clazzName;
@@ -67,8 +67,8 @@ public abstract class MessageClient {
if (this.respConsumer == null) {
lock.lock();
try {
if (this.respConsumerid == null) {
this.respConsumerid = "consumer-" + this.respTopic;
if (this.appRespConsumerid == null) {
this.appRespConsumerid = "consumer-" + this.appRespTopic;
}
if (this.respConsumer == null) {
MessageClientProcessor processor = (msg, callback) -> {
@@ -100,7 +100,7 @@ public abstract class MessageClient {
}
};
long ones = System.currentTimeMillis();
MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{respTopic}, respConsumerid, processor);
MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{appRespTopic}, appRespConsumerid, processor);
one.startup().join();
long onee = System.currentTimeMillis() - ones;
if (finest) {
@@ -113,7 +113,7 @@ public abstract class MessageClient {
}
}
if (needresp && (message.getRespTopic() == null || message.getRespTopic().isEmpty())) {
message.setRespTopic(respTopic);
message.setRespTopic(appRespTopic);
}
if (counter != null) {
counter.increment();
@@ -129,11 +129,10 @@ public abstract class MessageClient {
} else {
future.complete(null);
}
} catch (Exception ex) {
} catch (Throwable ex) {
future.completeExceptionally(ex);
} finally {
return future;
}
return future;
}
protected MessageRecord formatRespMessage(MessageRecord message) {

View File

@@ -55,9 +55,10 @@ public abstract class MessageClientConsumer {
public abstract CompletableFuture<Void> startup();
public abstract CompletableFuture<Void> shutdown();
public boolean isClosed() {
return closed;
}
public abstract CompletableFuture<Void> shutdown();
}

View File

@@ -35,9 +35,10 @@ public abstract class MessageClientProducer {
public abstract CompletableFuture<Void> startup();
public abstract CompletableFuture<Void> shutdown();
public boolean isClosed() {
return closed.get();
}
public abstract CompletableFuture<Void> shutdown();
}

View File

@@ -1,20 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public interface MessageResponse {
public void finish(MessageRecord message);
}

View File

@@ -20,7 +20,7 @@ public class SncpMessageClient extends MessageClient {
protected SncpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
this.respTopic = messageAgent.generateApplicationSncpRespTopic();
this.appRespTopic = messageAgent.generateAppSncpRespTopic();
}
@Override
@@ -28,8 +28,8 @@ public class SncpMessageClient extends MessageClient {
return messageAgent.getSncpMessageClientProducer();
}
public String getRespTopic() {
return this.respTopic;
public String getAppRespTopic() {
return this.appRespTopic;
}
//只发送消息,不需要响应

View File

@@ -29,5 +29,5 @@ public class SncpMessageRequest extends SncpRequest {
this.createTime = System.currentTimeMillis();
readHeader(ByteBuffer.wrap(message.getContent()), null);
}
}

View File

@@ -5,14 +5,14 @@
*/
package org.redkale.util;
import java.lang.annotation.*;
import java.lang.ref.*;
import java.lang.annotation.Annotation;
import java.lang.ref.WeakReference;
import java.lang.reflect.*;
import java.math.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.annotation.*;
@@ -46,8 +46,6 @@ public final class ResourceFactory {
public static final String RESOURCE_SELF_TYPE = "@type";
private static final boolean skipCheckRequired = Boolean.getBoolean("redkale.resource.skip.check");
private static final Logger logger = Logger.getLogger(ResourceFactory.class.getSimpleName());
private final ReentrantLock lock = new ReentrantLock();
@@ -56,7 +54,7 @@ public final class ResourceFactory {
private final List<WeakReference<ResourceFactory>> chidren = new CopyOnWriteArrayList<>();
private final ConcurrentHashMap<Type, ResourceAnnotationProvider> resAnnotationProviderMap = new ConcurrentHashMap();
private final ConcurrentHashMap<Class<? extends Annotation>, ResourceAnnotationProvider> resAnnotationProviderMap = new ConcurrentHashMap();
private final ConcurrentHashMap<Type, ResourceTypeLoader> resTypeLoaderMap = new ConcurrentHashMap();
@@ -814,7 +812,7 @@ public final class ResourceFactory {
try {
list.add(srcObj);
Class clazz = srcObj.getClass();
final boolean diyloaderflag = !parentRoot().resAnnotationProviderMap.isEmpty();
final boolean diyLoaderFlag = !parentRoot().resAnnotationProviderMap.isEmpty();
do {
if (java.lang.Enum.class.isAssignableFrom(clazz)) {
break;
@@ -853,13 +851,12 @@ public final class ResourceFactory {
break;
}
}
if (flag && diyloaderflag) {
if (flag && diyLoaderFlag) {
parentRoot().resAnnotationProviderMap.values().stream().forEach(iloader -> {
Annotation ann = field.getAnnotation(iloader.annotationType());
if (ann == null) {
return;
if (ann != null) {
iloader.load(this, srcResourceName, srcObj, ann, field, attachment);
}
iloader.load(this, srcResourceName, srcObj, ann, field, attachment);
});
}
if (ns == null) {
@@ -1002,7 +999,7 @@ public final class ResourceFactory {
if (rs != null) {
field.set(srcObj, rs);
}
if (rs == null && !skipCheckRequired && rc1 != null && rc1.required()) {
if (rs == null && rc1 != null && rc1.required()) {
String t = srcObj.getClass().getName();
if (srcObj.getClass().getSimpleName().startsWith("_Dyn")) {
t = srcObj.getClass().getSuperclass().getName();