From 797d04fea7629152863c43e5962c028c22bdb53f Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 7 Feb 2023 00:50:53 +0800 Subject: [PATCH] =?UTF-8?q?sncp=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/NodeServer.java | 2 +- src/main/java/org/redkale/net/Server.java | 2 +- src/main/java/org/redkale/net/sncp/Sncp.java | 88 ++++++- .../java/org/redkale/net/sncp/SncpClient.java | 32 ++- .../org/redkale/net/sncp/SncpClientCodec.java | 6 + .../net/sncp/SncpClientConnection.java | 6 + .../redkale/net/sncp/SncpClientRequest.java | 6 + .../redkale/net/sncp/SncpClientResult.java | 6 + .../org/redkale/net/sncp/SncpDynServlet.java | 2 +- .../java/org/redkale/net/sncp/SncpHeader.java | 28 ++- .../org/redkale/net/sncp/SncpOldClient.java | 205 +--------------- .../org/redkale/net/sncp/SncpRequest.java | 2 +- .../org/redkale/net/sncp/SncpServiceInfo.java | 218 ++++++++++++++++++ .../test/sncp/SncpClientCodecTest.java | 2 +- .../java/org/redkale/test/sncp/SncpTest.java | 4 +- .../test/sncp/SncpTestServiceImpl.java | 8 +- 16 files changed, 393 insertions(+), 224 deletions(-) create mode 100644 src/main/java/org/redkale/net/sncp/SncpServiceInfo.java diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 7cbdb5c85..829919b41 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -490,7 +490,7 @@ public abstract class NodeServer { } final ResourceTypeLoader resourceLoader = (ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) -> { try { - if (SncpOldClient.parseMethodActions(serviceImplClass).isEmpty() + if (Sncp.loadMethodActions(serviceImplClass).isEmpty() && (serviceImplClass.getAnnotation(Priority.class) == null && serviceImplClass.getAnnotation(javax.annotation.Priority.class) == null)) { //class没有可用的方法且没有标记启动优先级的, 通常为BaseService if (!serviceImplClass.getName().startsWith("org.redkale.") && !serviceImplClass.getSimpleName().contains("Base")) { logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method"); diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index 511377ae9..b03f3bf55 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -309,7 +309,7 @@ public abstract class Server loadMethodActions(final Class resourceServiceType) { + final List list = new ArrayList<>(); + final List multis = new ArrayList<>(); + final Map actionids = new LinkedHashMap<>(); + for (final java.lang.reflect.Method method : resourceServiceType.getMethods()) { + if (method.isSynthetic()) { + continue; + } + if (Modifier.isStatic(method.getModifiers())) { + continue; + } + if (Modifier.isFinal(method.getModifiers())) { + continue; + } + if (method.getAnnotation(Local.class) != null) { + continue; + } + if (method.getName().equals("getClass") || method.getName().equals("toString")) { + continue; + } + if (method.getName().equals("equals") || method.getName().equals("hashCode")) { + continue; + } + if (method.getName().equals("notify") || method.getName().equals("notifyAll") || method.getName().equals("wait")) { + continue; + } + if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == AnyValue.class) { + if (method.getName().equals("init") || method.getName().equals("stop") || method.getName().equals("destroy")) { + continue; + } + } + + Uint128 actionid = Sncp.actionid(method); + Method old = actionids.get(actionid); + if (old != null) { + if (old.getDeclaringClass().equals(method.getDeclaringClass())) { + throw new SncpException(resourceServiceType.getName() + " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")"); + } + continue; + } + actionids.put(actionid, method); + if (method.getAnnotation(Sncp.SncpDyn.class) != null) { + multis.add(method); + } else { + list.add(method); + } + } + multis.sort((m1, m2) -> m1.getAnnotation(Sncp.SncpDyn.class).index() - m2.getAnnotation(Sncp.SncpDyn.class).index()); + list.sort((Method o1, Method o2) -> { + if (!o1.getName().equals(o2.getName())) { + return o1.getName().compareTo(o2.getName()); + } + if (o1.getParameterCount() != o2.getParameterCount()) { + return o1.getParameterCount() - o2.getParameterCount(); + } + return 0; + }); + //带SncpDyn必须排在前面 + multis.addAll(list); + final LinkedHashMap rs = new LinkedHashMap<>(); + for (Method method : multis) { + for (Map.Entry en : actionids.entrySet()) { + if (en.getValue() == method) { + rs.put(en.getKey(), en.getValue()); + break; + } + } + } + return rs; + } + + public static SncpServiceInfo createSncpServiceInfo(String resourceName, + Class resourceServiceType, T service, MessageAgent messageAgent, SncpMessageClient messageClient) { + return new SncpServiceInfo(resourceName, resourceServiceType, service, messageAgent, messageClient); + } + public static Uint128 actionid(final RpcAction action) { return hash(action.name()); } @@ -790,7 +867,12 @@ public abstract class Sncp { } int i = -1; Uint128 serviceid = serviceid(name, serviceTypeOrImplClass); - for (final SncpAction entry : SncpOldClient.getSncpActions(realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, serviceid)) { + final List serviceActions = new ArrayList<>(); + Class serviceImpClass = realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass; + for (Map.Entry en : loadMethodActions(serviceImpClass).entrySet()) { + serviceActions.add(new SncpServiceAction(serviceImpClass, en.getValue(), serviceid, en.getKey())); + } + for (final SncpServiceAction entry : serviceActions) { final int index = ++i; final java.lang.reflect.Method method = entry.method; { diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index c59ebf61d..367a98090 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -4,21 +4,31 @@ package org.redkale.net.sncp; import java.net.InetSocketAddress; +import org.redkale.annotation.Resource; +import org.redkale.convert.bson.BsonConvert; import org.redkale.net.*; import org.redkale.net.client.*; /** + * SNCP版Client + * + *

+ * 详情见: https://redkale.org * * @author zhangjx + * + * @since 2.8.0 */ public class SncpClient extends Client { - private InetSocketAddress sncpAddress; + private final InetSocketAddress clientSncpAddress; - @SuppressWarnings("OverridableMethodCallInConstructor") - public SncpClient(String name, AsyncGroup group, InetSocketAddress sncpAddress, ClientAddress address, int maxConns, int maxPipelines) { - super(name, group, true, address, maxConns, maxPipelines, null, null, null); //maxConns - this.sncpAddress = sncpAddress; + @Resource + protected BsonConvert bsonConvert; + + public SncpClient(String name, AsyncGroup group, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) { + super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns + this.clientSncpAddress = clientSncpAddress; } @Override @@ -26,8 +36,16 @@ public class SncpClient extends Client T remote(final SncpServiceInfo info, final int index, final Object... params) { + return null; + } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java index 9f7f967a3..713716259 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java @@ -10,8 +10,14 @@ import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; import org.redkale.util.*; /** + * SncpClient编解码器 + * + *

+ * 详情见: https://redkale.org * * @author zhangjx + * + * @since 2.8.0 */ public class SncpClientCodec extends ClientCodec { diff --git a/src/main/java/org/redkale/net/sncp/SncpClientConnection.java b/src/main/java/org/redkale/net/sncp/SncpClientConnection.java index 18497159b..a1103ec52 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientConnection.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientConnection.java @@ -8,8 +8,14 @@ import org.redkale.net.client.*; import org.redkale.util.ObjectPool; /** + * client版连接 + * + *

+ * 详情见: https://redkale.org * * @author zhangjx + * + * @since 2.8.0 */ public class SncpClientConnection extends ClientConnection { diff --git a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java index 82aded86b..bb21b328e 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java @@ -8,8 +8,14 @@ import org.redkale.net.client.*; import org.redkale.util.ByteArray; /** + * client版请求 + * + *

+ * 详情见: https://redkale.org * * @author zhangjx + * + * @since 2.8.0 */ public class SncpClientRequest extends ClientRequest { diff --git a/src/main/java/org/redkale/net/sncp/SncpClientResult.java b/src/main/java/org/redkale/net/sncp/SncpClientResult.java index 58e6a3201..edd6403ca 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientResult.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientResult.java @@ -9,8 +9,14 @@ import java.util.Objects; import org.redkale.util.ByteArray; /** + * client版响应结果 + * + *

+ * 详情见: https://redkale.org * * @author zhangjx + * + * @since 2.8.0 */ public class SncpClientResult { diff --git a/src/main/java/org/redkale/net/sncp/SncpDynServlet.java b/src/main/java/org/redkale/net/sncp/SncpDynServlet.java index 7516fdf6c..6de06565c 100644 --- a/src/main/java/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpDynServlet.java @@ -49,7 +49,7 @@ public final class SncpDynServlet extends SncpServlet { this.maxNameLength = maxNameLength; this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType); RedkaleClassLoader.putReflectionPublicMethods(service.getClass().getName()); - for (Map.Entry en : SncpOldClient.parseMethodActions(service.getClass()).entrySet()) { + for (Map.Entry en : Sncp.loadMethodActions(service.getClass()).entrySet()) { SncpServletAction action; try { action = SncpServletAction.create(service, serviceid, en.getKey(), en.getValue()); diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index 17e498d9a..5e589ebdc 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -22,6 +22,7 @@ public class SncpHeader { private Uint128 serviceid; + //【预留字段】service接口版本 private int serviceVersion; private Uint128 actionid; @@ -94,15 +95,24 @@ public class SncpHeader { } public ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) { - array.putLong(newSeqid); //8 - array.putChar((char) HEADER_SIZE); //2 - array.putUint128(serviceid); //16 - array.putInt(serviceVersion); //4 - array.putUint128(actionid); //16 - array.put(newAddrBytes); //4 - array.putChar((char) newAddrPort); //2 - array.putInt(bodyLength); //4 - array.putInt(retcode); //4 + int offset = 0; + array.putLong(offset, newSeqid); //8 + offset += 8; + array.putChar(offset, (char) HEADER_SIZE); //2 + offset += 2; + array.putUint128(offset, serviceid); //16 + offset += 16; + array.putInt(offset, serviceVersion); //4 + offset += 4; + array.putUint128(offset, actionid); //16 + offset += 16; + array.put(offset, newAddrBytes); //4 + offset += 4; + array.putChar(offset, (char) newAddrPort); //2 + offset += 2; + array.putInt(offset, bodyLength); //4 + offset += 4; + array.putInt(offset, retcode); //4 return array; } diff --git a/src/main/java/org/redkale/net/sncp/SncpOldClient.java b/src/main/java/org/redkale/net/sncp/SncpOldClient.java index 6bde336a6..0ce942fcf 100644 --- a/src/main/java/org/redkale/net/sncp/SncpOldClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpOldClient.java @@ -5,7 +5,6 @@ */ package org.redkale.net.sncp; -import java.lang.annotation.Annotation; import java.lang.reflect.*; import java.net.*; import java.nio.ByteBuffer; @@ -21,6 +20,7 @@ import org.redkale.net.*; import org.redkale.net.sncp.Sncp.SncpDyn; import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; import static org.redkale.net.sncp.SncpRequest.*; +import org.redkale.net.sncp.SncpServiceInfo.SncpServiceAction; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -54,7 +54,7 @@ public final class SncpOldClient { protected final int serviceVersion; - protected final SncpAction[] actions; + protected final SncpServiceAction[] actions; protected final MessageAgent messageAgent; @@ -83,12 +83,12 @@ public final class SncpOldClient { this.name = serviceResourceName; Class serviceResourceType = ResourceFactory.getResourceType(serviceTypeOrImplClass); //serviceResourceType this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType); - final List methodens = new ArrayList<>(); + final List methodens = new ArrayList<>(); //------------------------------------------------------------------------------ - for (Map.Entry en : parseMethodActions(serviceClass).entrySet()) { - methodens.add(new SncpAction(serviceClass, en.getValue(), serviceid, en.getKey())); + for (Map.Entry en : Sncp.loadMethodActions(serviceClass).entrySet()) { + methodens.add(new SncpServiceAction(serviceClass, en.getValue(), serviceid, en.getKey())); } - this.actions = methodens.toArray(new SncpAction[methodens.size()]); + this.actions = methodens.toArray(new SncpServiceAction[methodens.size()]); this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); if (this.addrBytes.length != 4) { @@ -96,15 +96,6 @@ public final class SncpOldClient { } } - static List getSncpActions(final Class serviceClass, Uint128 serviceid) { - final List actions = new ArrayList<>(); - //------------------------------------------------------------------------------ - for (Map.Entry en : parseMethodActions(serviceClass).entrySet()) { - actions.add(new SncpAction(serviceClass, en.getValue(), serviceid, en.getKey())); - } - return actions; - } - public MessageAgent getMessageAgent() { return messageAgent; } @@ -168,80 +159,9 @@ public final class SncpOldClient { + ", actions.size = " + actions.length + ")"; } - public static LinkedHashMap parseMethodActions(final Class serviceClass) { - final List list = new ArrayList<>(); - final List multis = new ArrayList<>(); - final Map actionids = new LinkedHashMap<>(); - for (final java.lang.reflect.Method method : serviceClass.getMethods()) { - if (method.isSynthetic()) { - continue; - } - if (Modifier.isStatic(method.getModifiers())) { - continue; - } - if (Modifier.isFinal(method.getModifiers())) { - continue; - } - if (method.getAnnotation(Local.class) != null) { - continue; - } - if (method.getName().equals("getClass") || method.getName().equals("toString")) { - continue; - } - if (method.getName().equals("equals") || method.getName().equals("hashCode")) { - continue; - } - if (method.getName().equals("notify") || method.getName().equals("notifyAll") || method.getName().equals("wait")) { - continue; - } - if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == AnyValue.class) { - if (method.getName().equals("init") || method.getName().equals("stop") || method.getName().equals("destroy")) { - continue; - } - } - - Uint128 actionid = Sncp.actionid(method); - Method old = actionids.get(actionid); - if (old != null) { - if (old.getDeclaringClass().equals(method.getDeclaringClass())) { - throw new SncpException(serviceClass.getName() + " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")"); - } - continue; - } - actionids.put(actionid, method); - if (method.getAnnotation(SncpDyn.class) != null) { - multis.add(method); - } else { - list.add(method); - } - } - multis.sort((m1, m2) -> m1.getAnnotation(SncpDyn.class).index() - m2.getAnnotation(SncpDyn.class).index()); - list.sort((Method o1, Method o2) -> { - if (!o1.getName().equals(o2.getName())) { - return o1.getName().compareTo(o2.getName()); - } - if (o1.getParameterCount() != o2.getParameterCount()) { - return o1.getParameterCount() - o2.getParameterCount(); - } - return 0; - }); - //带SncpDyn必须排在前面 - multis.addAll(list); - final LinkedHashMap rs = new LinkedHashMap<>(); - for (Method method : multis) { - for (Map.Entry en : actionids.entrySet()) { - if (en.getValue() == method) { - rs.put(en.getKey(), en.getValue()); - break; - } - } - } - return rs; - } - //只给远程模式调用的 public T remote(final int index, final Object... params) { - final SncpAction action = actions[index]; + final SncpServiceAction action = actions[index]; final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null; if (action.handlerFuncParamIndex >= 0) { params[action.handlerFuncParamIndex] = null; @@ -295,7 +215,7 @@ public final class SncpOldClient { } } - private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpServiceAction action, final Object... params) { final String traceid = Traces.currTraceid(); final Type[] myparamtypes = action.paramTypes; final Class[] myparamclass = action.paramClass; @@ -369,7 +289,6 @@ public final class SncpOldClient { final AsyncConnection conn = conn0; final ByteArray array = writer.toByteArray(); fillHeader(array, action, seqid, traceid, reqBodyLength); - conn.write(array, new CompletionHandler() { @Override @@ -433,6 +352,7 @@ public final class SncpOldClient { success(); } } catch (Throwable e) { + e.printStackTrace(); future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params))); transport.offerConnection(true, conn); if (handler != null) { @@ -496,7 +416,7 @@ public final class SncpOldClient { }); } - private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) { + private void checkResult(long seqid, final SncpServiceAction action, ByteBuffer buffer) { long rseqid = buffer.getLong(); if (rseqid != seqid) { throw new SncpException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid); @@ -521,111 +441,8 @@ public final class SncpOldClient { buffer.getChar(); //端口 } - private void fillHeader(ByteArray buffer, SncpAction action, long seqid, String traceid, int bodyLength) { + private void fillHeader(ByteArray buffer, SncpServiceAction action, long seqid, String traceid, int bodyLength) { action.header.writeTo(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0 } - protected static final class SncpAction { - - protected final Uint128 actionid; - - protected final Method method; - - protected final Type resultTypes; //void 必须设为 null - - protected final Type[] paramTypes; - - protected final Class[] paramClass; - - protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 - - protected final int handlerFuncParamIndex; - - protected final int handlerAttachParamIndex; - - protected final int addressTargetParamIndex; - - protected final int addressSourceParamIndex; - - protected final int topicTargetParamIndex; - - protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture - - protected final Creator futureCreator; - - protected final SncpHeader header; - - @SuppressWarnings("unchecked") - public SncpAction(final Class clazz, Method method, Uint128 serviceid, Uint128 actionid) { - this.actionid = actionid == null ? Sncp.actionid(method) : actionid; - Type rt = TypeToken.getGenericType(method.getGenericReturnType(), clazz); - this.resultTypes = rt == void.class ? null : rt; - this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); - this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; - this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), clazz); - this.paramClass = method.getParameterTypes(); - this.method = method; - Annotation[][] anns = method.getParameterAnnotations(); - int tpoicAddrIndex = -1; - int targetAddrIndex = -1; - int sourceAddrIndex = -1; - int handlerAttachIndex = -1; - int handlerFuncIndex = -1; - boolean hasattr = false; - Attribute[] atts = new Attribute[paramTypes.length + 1]; - if (anns.length > 0) { - Class[] params = method.getParameterTypes(); - for (int i = 0; i < params.length; i++) { - if (CompletionHandler.class.isAssignableFrom(params[i])) { - if (boolReturnTypeFuture) { - throw new SncpException(method + " have both CompletionHandler and CompletableFuture"); - } - if (handlerFuncIndex >= 0) { - throw new SncpException(method + " have more than one CompletionHandler type parameter"); - } - Sncp.checkAsyncModifier(params[i], method); - handlerFuncIndex = i; - break; - } - } - for (int i = 0; i < anns.length; i++) { - if (anns[i].length > 0) { - for (Annotation ann : anns[i]) { - if (ann.annotationType() == RpcAttachment.class) { - if (handlerAttachIndex >= 0) { - throw new SncpException(method + " have more than one @RpcAttachment parameter"); - } - handlerAttachIndex = i; - } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - targetAddrIndex = i; - } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - sourceAddrIndex = i; - } else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) { - tpoicAddrIndex = i; - } - } - } - } - } - this.topicTargetParamIndex = tpoicAddrIndex; - this.addressTargetParamIndex = targetAddrIndex; - this.addressSourceParamIndex = sourceAddrIndex; - this.handlerFuncParamIndex = handlerFuncIndex; - this.handlerAttachParamIndex = handlerAttachIndex; - this.paramAttrs = hasattr ? atts : null; - this.header = new SncpHeader(null, serviceid, actionid); - if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { - throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); - } - } - - public String actionName() { - return method.getDeclaringClass().getSimpleName() + "." + method.getName(); - } - - @Override - public String toString() { - return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}"; - } - } } diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index c530679f7..afc204233 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -59,7 +59,7 @@ public class SncpRequest extends Request { this.header = new SncpHeader(); int headerSize = this.header.read(buffer); if (headerSize != HEADER_SIZE) { - context.getLogger().log(Level.WARNING, "sncp buffer header.length not " + HEADER_SIZE); + context.getLogger().log(Level.WARNING, "sncp buffer header.length not " + HEADER_SIZE + ", but " + headerSize); return -1; } if (this.header.getRetcode() != 0) { // retcode diff --git a/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java b/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java new file mode 100644 index 000000000..1cad212cc --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java @@ -0,0 +1,218 @@ +/* + * + */ +package org.redkale.net.sncp; + +import java.lang.annotation.Annotation; +import java.lang.reflect.*; +import java.net.*; +import java.nio.channels.CompletionHandler; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import org.redkale.mq.*; +import static org.redkale.net.sncp.Sncp.loadMethodActions; +import org.redkale.service.*; +import org.redkale.util.*; + +/** + * 每个Service的client相关信息对象 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param Service泛型 + * + * @since 2.8.0 + */ +public final class SncpServiceInfo { + + protected final String name; + + protected final Class serviceType; + + protected final T service; + + protected final Uint128 serviceid; + + protected final int serviceVersion; + + protected final SncpServiceAction[] actions; + + protected final String topic; + + //MQ模式下此字段才有值 + protected final MessageAgent messageAgent; + + //MQ模式下此字段才有值 + protected final SncpMessageClient messageClient; + + //远程模式, 可能为null + protected Set remoteGroups; + + //远程模式, 可能为null + protected Set remoteAddresses; + + SncpServiceInfo(String resourceName, Class resourceServiceType, final T service, MessageAgent messageAgent, SncpMessageClient messageClient) { + this.name = resourceName; + this.serviceType = resourceServiceType; + this.serviceid = Sncp.serviceid(name, resourceServiceType); + this.service = service; + this.serviceVersion = 0; + this.messageAgent = messageAgent; + this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); + this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service); + + final List serviceActions = new ArrayList<>(); + final Class serviceImplClass = service.getClass(); + for (Map.Entry en : loadMethodActions(resourceServiceType).entrySet()) { + serviceActions.add(new SncpServiceAction(serviceImplClass, en.getValue(), serviceid, en.getKey())); + } + this.actions = serviceActions.toArray(new SncpServiceAction[serviceActions.size()]); + } + + public void updateRemoteAddress(Set remoteGroups, Set remoteAddresses) { + this.remoteGroups = remoteGroups; + this.remoteAddresses = remoteAddresses; + } + + public String getName() { + return name; + } + + public Class getServiceClass() { + return serviceType; + } + + public T getService() { + return service; + } + + public Uint128 getServiceid() { + return serviceid; + } + + public int getServiceVersion() { + return serviceVersion; + } + + public SncpServiceAction[] getActions() { + return actions; + } + + public String getTopic() { + return topic; + } + + public Set getRemoteGroups() { + return remoteGroups; + } + + public Set getRemoteAddresses() { + return remoteAddresses; + } + + public static final class SncpServiceAction { + + protected final Uint128 actionid; + + protected final Method method; + + protected final Type resultTypes; //void 必须设为 null + + protected final Type[] paramTypes; + + protected final Class[] paramClass; + + protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 + + protected final int handlerFuncParamIndex; + + protected final int handlerAttachParamIndex; + + protected final int addressTargetParamIndex; + + protected final int addressSourceParamIndex; + + protected final int topicTargetParamIndex; + + protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture + + protected final Creator futureCreator; + + protected final SncpHeader header; + + @SuppressWarnings("unchecked") + SncpServiceAction(final Class serviceImplClass, Method method, Uint128 serviceid, Uint128 actionid) { + this.actionid = actionid == null ? Sncp.actionid(method) : actionid; + Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass); + this.resultTypes = rt == void.class ? null : rt; + this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); + this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; + this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), serviceImplClass); + this.paramClass = method.getParameterTypes(); + this.method = method; + Annotation[][] anns = method.getParameterAnnotations(); + int tpoicAddrIndex = -1; + int targetAddrIndex = -1; + int sourceAddrIndex = -1; + int handlerAttachIndex = -1; + int handlerFuncIndex = -1; + boolean hasattr = false; + Attribute[] atts = new Attribute[paramTypes.length + 1]; + if (anns.length > 0) { + Class[] params = method.getParameterTypes(); + for (int i = 0; i < params.length; i++) { + if (CompletionHandler.class.isAssignableFrom(params[i])) { + if (boolReturnTypeFuture) { + throw new SncpException(method + " have both CompletionHandler and CompletableFuture"); + } + if (handlerFuncIndex >= 0) { + throw new SncpException(method + " have more than one CompletionHandler type parameter"); + } + Sncp.checkAsyncModifier(params[i], method); + handlerFuncIndex = i; + break; + } + } + for (int i = 0; i < anns.length; i++) { + if (anns[i].length > 0) { + for (Annotation ann : anns[i]) { + if (ann.annotationType() == RpcAttachment.class) { + if (handlerAttachIndex >= 0) { + throw new SncpException(method + " have more than one @RpcAttachment parameter"); + } + handlerAttachIndex = i; + } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + targetAddrIndex = i; + } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + sourceAddrIndex = i; + } else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) { + tpoicAddrIndex = i; + } + } + } + } + } + this.topicTargetParamIndex = tpoicAddrIndex; + this.addressTargetParamIndex = targetAddrIndex; + this.addressSourceParamIndex = sourceAddrIndex; + this.handlerFuncParamIndex = handlerFuncIndex; + this.handlerAttachParamIndex = handlerAttachIndex; + this.paramAttrs = hasattr ? atts : null; + this.header = new SncpHeader(null, serviceid, actionid); + if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { + throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); + } + } + + public String actionName() { + return method.getDeclaringClass().getSimpleName() + "." + method.getName(); + } + + @Override + public String toString() { + return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}"; + } + } +} diff --git a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java index a1ff07a35..f06420dec 100644 --- a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java @@ -32,7 +32,7 @@ public class SncpClientCodecTest { InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); - SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), Utility.cpus(), 16); + SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); SncpClientCodec codec = new SncpClientCodec(conn); List respResults = new ArrayList(); diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 8511bafaf..97ce7477e 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -25,9 +25,9 @@ import org.redkale.util.*; */ public class SncpTest { - private static final String myhost = Utility.localInetAddress().getHostAddress(); + private static final String myhost = "127.0.0.1"; - private static int port = 0; + private static int port = 63877; private static int port2 = 4240; diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index e6b4b90d6..d3b3f3163 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -11,7 +11,7 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.CompletableFuture; import org.redkale.annotation.ResourceType; import org.redkale.net.*; -import org.redkale.net.sncp.*; +import org.redkale.net.sncp.Sncp; import org.redkale.service.*; import org.redkale.util.ResourceFactory; @@ -89,7 +89,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) { + for (Method method : Sncp.loadMethodActions(service.getClass()).values()) { System.out.println(method); } System.out.println("-----------------------------------"); @@ -98,7 +98,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) { + for (Method method : Sncp.loadMethodActions(service.getClass()).values()) { System.out.println(method); } System.out.println("-----------------------------------"); @@ -107,7 +107,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) { + for (Method method : Sncp.loadMethodActions(service.getClass()).values()) { System.out.println(method); } System.out.println("-----------------------------------");