This commit is contained in:
redkale
2023-10-11 13:52:32 +08:00
parent a5e3a277ba
commit 52d7a7a3f7
5 changed files with 61 additions and 31 deletions

View File

@@ -49,8 +49,7 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
protected ObjectDecoder(Type type) {
this.type = ((type instanceof Class) && ((Class) type).isInterface()) ? Object.class : type;
if (type instanceof ParameterizedType) {
final ParameterizedType pt = (ParameterizedType) type;
this.typeClass = (Class) pt.getRawType();
this.typeClass = TypeToken.typeToClass(type);
} else if (type instanceof TypeVariable) {
TypeVariable tv = (TypeVariable) type;
Type[] ts = tv.getBounds();
@@ -72,11 +71,9 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
this.creatorConstructorMembers = null;
return;
}
Class clazz = null;
if (type instanceof ParameterizedType) {
final ParameterizedType pts = (ParameterizedType) type;
clazz = (Class) (pts).getRawType();
clazz = TypeToken.typeToClass(type);
} else if (type instanceof TypeVariable) {
TypeVariable tv = (TypeVariable) type;
Type[] ts = tv.getBounds();

View File

@@ -9,11 +9,9 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.redkale.annotation.Comment;
import org.redkale.convert.*;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.net.sncp.SncpHeader;
import org.redkale.util.ByteArray;
/**
* 存在MQ里面的数据结构<p>
@@ -84,7 +82,10 @@ public class MessageRecord implements Serializable {
protected byte ctype;
@Comment("本地附加对象,不会被序列化")
protected Object localAttach;
protected String localActionName;
@Comment("本地附加对象,不会被序列化")
protected Object[] localParams;
public MessageRecord() {
}
@@ -115,8 +116,13 @@ public class MessageRecord implements Serializable {
return content == null ? null : new String(content, StandardCharsets.UTF_8);
}
public MessageRecord attach(Object attach) {
this.localAttach = attach;
public MessageRecord localActionName(String actionName) {
this.localActionName = actionName;
return this;
}
public MessageRecord localParams(Object[] params) {
this.localParams = params;
return this;
}
@@ -156,9 +162,9 @@ public class MessageRecord implements Serializable {
public int hash() {
if (groupid != null && !groupid.isEmpty()) {
return groupid.hashCode();
return Math.abs(groupid.hashCode());
} else if (userid != null) {
return userid.hashCode();
return Math.abs(userid.hashCode());
} else {
return 0;
}
@@ -300,6 +306,7 @@ public class MessageRecord implements Serializable {
StringBuilder sb = new StringBuilder(128);
sb.append("{\"seqid\":").append(this.seqid);
sb.append(",\"version\":").append(this.version);
sb.append(",\"ctype\":").append(this.ctype);
if (this.flag != 0) {
sb.append(",\"flag\":").append(this.flag);
}
@@ -320,9 +327,11 @@ public class MessageRecord implements Serializable {
}
if (this.content != null) {
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SUBSIZE) {
int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符
Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset);
sb.append(",\"content\":").append(rs);
//int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符
//Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset);
//sb.append(",\"content\":").append(rs);
//SncpHeader包含不确定长度的信息故不能再直接偏移读取
sb.append(",\"content\":").append("bytes[" + this.content.length + "]");
} else if (this.ctype == CTYPE_HTTP_REQUEST) {
HttpSimpleRequest req = HttpSimpleRequestCoder.getInstance().decode(this.content);
if (req != null) {
@@ -336,8 +345,13 @@ public class MessageRecord implements Serializable {
sb.append(",\"content\":").append(req);
} else if (this.ctype == CTYPE_HTTP_RESULT) {
sb.append(",\"content\":").append(HttpResultCoder.getInstance().decode(this.content));
} else if (localAttach != null) {
sb.append(",\"attach\":").append(JsonConvert.root().convertTo(localAttach));
} else if (localActionName != null || localParams != null) {
if (localActionName != null) {
sb.append(",\"actionName\":").append(localActionName);
}
if (localParams != null) {
sb.append(",\"params\":").append(JsonConvert.root().convertTo(localParams));
}
} else {
sb.append(",\"content\":\"").append(new String(this.content, StandardCharsets.UTF_8)).append("\"");
}

View File

@@ -326,6 +326,7 @@ public final class Rest {
}
//仅供Rest动态构建里 currentUserid() 使用
@AsmDepends
public static <T> T orElse(T t, T defValue) {
return t == null ? defValue : t;
}
@@ -1194,8 +1195,18 @@ public final class Rest {
if (ignore) {
continue;
}
paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
retvalTypes.add(formatRestReturnType(method, serviceType));
java.lang.reflect.Type[] ptypes = TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType);
for (java.lang.reflect.Type t : ptypes) {
if (!TypeToken.isClassType(t)) {
throw new RedkaleException("param type (" + t + ") is not a class in method " + method + ", serviceType is " + serviceType.getName());
}
}
paramTypes.add(ptypes);
java.lang.reflect.Type rtype = formatRestReturnType(method, serviceType);
if (!TypeToken.isClassType(rtype)) {
throw new RedkaleException("return type (" + rtype + ") is not a class in method " + method + ", serviceType is " + serviceType.getName());
}
retvalTypes.add(rtype);
if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, parentNonBlocking, null, bigModuleName, method);
entrys.add(entry);
@@ -1692,8 +1703,18 @@ public final class Rest {
}
}
}
paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
retvalTypes.add(formatRestReturnType(method, serviceType));
java.lang.reflect.Type[] ptypes = TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType);
for (java.lang.reflect.Type t : ptypes) {
if (!TypeToken.isClassType(t)) {
throw new RedkaleException("param type (" + t + ") is not a class in method " + method + ", serviceType is " + serviceType.getName());
}
}
paramTypes.add(ptypes);
java.lang.reflect.Type rtype = formatRestReturnType(method, serviceType);
if (!TypeToken.isClassType(rtype)) {
throw new RedkaleException("return type (" + rtype + ") is not a class in method " + method + ", serviceType is " + serviceType.getName());
}
retvalTypes.add(rtype);
if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, parentNonBlocking, null, bigmodulename, method);
if (entrys.contains(entry)) {

View File

@@ -212,8 +212,7 @@ public class SncpHeader {
//供client端request和response的header判断
public boolean checkValid(SncpHeader other) {
return Objects.equals(this.seqid, other.seqid)
&& Objects.equals(this.serviceid, other.serviceid)
return Objects.equals(this.serviceid, other.serviceid)
&& Objects.equals(this.actionid, other.actionid);
}

View File

@@ -155,7 +155,7 @@ public class SncpRemoteInfo<T extends Service> {
}
//MQ模式RPC
protected CompletableFuture<byte[]> remoteMessage(final SncpRemoteAction action, final String traceid, final Object[] params) {
private CompletableFuture<byte[]> remoteMessage(final SncpRemoteAction action, final String traceid, final Object[] params) {
final SncpClientRequest request = createSncpClientRequest(action, this.sncpClient.clientSncpAddress, traceid, params);
String targetTopic = action.paramTopicTargetIndex >= 0 ? (String) params[action.paramTopicTargetIndex] : this.topic;
if (targetTopic == null) {
@@ -165,14 +165,12 @@ public class SncpRemoteInfo<T extends Service> {
request.writeTo(null, array);
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, array.getBytes());
final String tt = targetTopic;
if (logger.isLoggable(Level.FINER)) {
message.attach(Utility.append(new Object[]{action.actionName()}, params));
} else {
message.attach(params);
}
message.localActionName(action.actionName());
message.localParams(params);
return messageClient.sendMessage(message).thenApply(msg -> {
if (msg == null || msg.getContent() == null) {
logger.log(Level.SEVERE, action.method + " sncp mq(params: " + JsonConvert.root().convertTo(params) + ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg);
logger.log(Level.SEVERE, action.method + " sncp mq(params: " + JsonConvert.root().convertTo(params)
+ ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg);
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
@@ -186,7 +184,8 @@ public class SncpRemoteInfo<T extends Service> {
}
final int retcode = header.getRetcode();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + JsonConvert.root().convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
logger.log(Level.SEVERE, action.method + " sncp (params: " + JsonConvert.root().convertTo(params)
+ ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
final int respBodyLength = header.getBodyLength();