diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 31ccc12eb..db65db9ab 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -107,7 +107,7 @@ public final class Application { /** * 当前Service所属的SNCP Server的地址 类型: SocketAddress、InetSocketAddress、String
*/ - public static final String RESNAME_SNCP_ADDR = "SNCP_ADDR"; + public static final String RESNAME_SNCP_ADDRESS = "SNCP_ADDRESS"; /** * 当前Service所属的SNCP Server所属的组 类型: String
@@ -942,7 +942,7 @@ public final class Application { }); } if (!compileMode) { - properties.put(SncpClient.class.getSimpleName() + ".handlers", LoggingFileHandler.LoggingSncpFileHandler.class.getName()); + properties.put(SncpOldClient.class.getSimpleName() + ".handlers", LoggingFileHandler.LoggingSncpFileHandler.class.getName()); } } if (compileMode) { diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index f3523bf50..1af72964d 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -14,7 +14,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.stream.Stream; import org.redkale.annotation.*; -import static org.redkale.boot.Application.RESNAME_SNCP_ADDR; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.cluster.ClusterAgent; import org.redkale.mq.MessageAgent; @@ -25,6 +24,7 @@ import org.redkale.service.Service; import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.*; import org.redkale.watch.*; +import static org.redkale.boot.Application.RESNAME_SNCP_ADDRESS; /** * HTTP Server节点的配置Server @@ -136,10 +136,10 @@ public class NodeHttpServer extends NodeServer { if (nodeService == null) { nodeService = (Service) rf.find(resourceName, WebSocketNode.class); } - if (sncpResFactory != null && resourceFactory.find(RESNAME_SNCP_ADDR, String.class) == null) { - resourceFactory.register(RESNAME_SNCP_ADDR, InetSocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDR, InetSocketAddress.class)); - resourceFactory.register(RESNAME_SNCP_ADDR, SocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDR, SocketAddress.class)); - resourceFactory.register(RESNAME_SNCP_ADDR, String.class, sncpResFactory.find(RESNAME_SNCP_ADDR, String.class)); + if (sncpResFactory != null && resourceFactory.find(RESNAME_SNCP_ADDRESS, String.class) == null) { + resourceFactory.register(RESNAME_SNCP_ADDRESS, InetSocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDRESS, InetSocketAddress.class)); + resourceFactory.register(RESNAME_SNCP_ADDRESS, SocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDRESS, SocketAddress.class)); + resourceFactory.register(RESNAME_SNCP_ADDRESS, String.class, sncpResFactory.find(RESNAME_SNCP_ADDRESS, String.class)); } if (nodeService == null) { MessageAgent messageAgent = null; diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index c075e4278..b88b09051 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -135,9 +135,10 @@ public abstract class NodeServer { } //单点服务不会有 sncpAddress、sncpGroup if (this.sncpAddress != null) { - this.resourceFactory.register(RESNAME_SNCP_ADDR, this.sncpAddress); - this.resourceFactory.register(RESNAME_SNCP_ADDR, SocketAddress.class, this.sncpAddress); - this.resourceFactory.register(RESNAME_SNCP_ADDR, String.class, this.sncpAddress.getHostString() + ":" + this.sncpAddress.getPort()); + this.resourceFactory.register(RESNAME_SNCP_ADDRESS, this.sncpAddress); + this.resourceFactory.register(RESNAME_SNCP_ADDRESS, SocketAddress.class, this.sncpAddress); + this.resourceFactory.register(RESNAME_SNCP_ADDRESS, InetSocketAddress.class, this.sncpAddress); + this.resourceFactory.register(RESNAME_SNCP_ADDRESS, String.class, this.sncpAddress.getHostString() + ":" + this.sncpAddress.getPort()); } if (this.sncpGroup != null) { this.resourceFactory.register(RESNAME_SNCP_GROUP, this.sncpGroup); @@ -290,7 +291,7 @@ public abstract class NodeServer { } //ResourceFactory resfactory = (isSNCP() ? appResFactory : resourceFactory); - SncpClient client = srcObj instanceof Service ? Sncp.getSncpClient((Service) srcObj) : null; + SncpOldClient client = srcObj instanceof Service ? Sncp.getSncpOldClient((Service) srcObj) : null; final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); final Set groups = new HashSet<>(); Service service = Modifier.isFinal(resServiceType.getModifiers()) ? (Service) resServiceType.getConstructor().newInstance() : Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, null, appResFactory, appSncpTranFactory, sncpAddr, groups, null); @@ -342,7 +343,7 @@ public abstract class NodeServer { throw new RuntimeException("CacheSource must be inject in Service, cannot in " + srcObj); } final Service srcService = (Service) srcObj; - SncpClient client = Sncp.getSncpClient(srcService); + SncpOldClient client = Sncp.getSncpOldClient(srcService); final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); //final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService) && sncpAddr != null; //不配置SNCP服务会导致ws=false时没有注入CacheMemorySource final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService); @@ -489,7 +490,7 @@ public abstract class NodeServer { } final ResourceTypeLoader resourceLoader = (ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) -> { try { - if (SncpClient.parseMethodActions(serviceImplClass).isEmpty() + if (SncpOldClient.parseMethodActions(serviceImplClass).isEmpty() && (serviceImplClass.getAnnotation(Priority.class) == null && serviceImplClass.getAnnotation(javax.annotation.Priority.class) == null)) { //class没有可用的方法且没有标记启动优先级的, 通常为BaseService if (!serviceImplClass.getName().startsWith("org.redkale.") && !serviceImplClass.getSimpleName().contains("Base")) { logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method"); diff --git a/src/main/java/org/redkale/boot/watch/TransportWatchService.java b/src/main/java/org/redkale/boot/watch/TransportWatchService.java index 705daad7d..dea20c12c 100644 --- a/src/main/java/org/redkale/boot/watch/TransportWatchService.java +++ b/src/main/java/org/redkale/boot/watch/TransportWatchService.java @@ -75,7 +75,7 @@ public class TransportWatchService extends AbstractWatchService { if (!Sncp.isSncpDyn(service)) { continue; } - SncpClient client = Sncp.getSncpClient(service); + SncpOldClient client = Sncp.getSncpOldClient(service); if (Sncp.isRemote(service)) { if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) { client.getRemoteGroupTransport().addRemoteAddresses(address); @@ -117,7 +117,7 @@ public class TransportWatchService extends AbstractWatchService { if (!Sncp.isSncpDyn(service)) { continue; } - SncpClient client = Sncp.getSncpClient(service); + SncpOldClient client = Sncp.getSncpOldClient(service); if (Sncp.isRemote(service)) { if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) { client.getRemoteGroupTransport().removeRemoteAddresses(address); diff --git a/src/main/java/org/redkale/mq/MessageRecord.java b/src/main/java/org/redkale/mq/MessageRecord.java index 850f0dd35..d05f58a8d 100644 --- a/src/main/java/org/redkale/mq/MessageRecord.java +++ b/src/main/java/org/redkale/mq/MessageRecord.java @@ -12,7 +12,7 @@ import org.redkale.convert.*; import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.HttpSimpleRequest; -import org.redkale.net.sncp.Sncp; +import org.redkale.net.sncp.SncpHeader; /** * 存在MQ里面的数据结构

@@ -318,8 +318,8 @@ public class MessageRecord implements Serializable { sb.append(",\"respTopic\":\"").append(this.respTopic).append("\""); } if (this.content != null) { - if (this.ctype == CTYPE_BSON_RESULT && this.content.length > Sncp.HEADER_SIZE) { - int offset = Sncp.HEADER_SIZE + 1; //循环占位符 + if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SIZE) { + int offset = SncpHeader.HEADER_SIZE + 1; //循环占位符 Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset); sb.append(",\"content\":").append(rs); } else if (this.ctype == CTYPE_HTTP_REQUEST) { diff --git a/src/main/java/org/redkale/mq/SncpMessageResponse.java b/src/main/java/org/redkale/mq/SncpMessageResponse.java index ae18e41bd..3be4ddf61 100644 --- a/src/main/java/org/redkale/mq/SncpMessageResponse.java +++ b/src/main/java/org/redkale/mq/SncpMessageResponse.java @@ -50,14 +50,14 @@ public class SncpMessageResponse extends SncpResponse { callback.run(); } if (out == null) { - final ByteArray result = new ByteArray(Sncp.HEADER_SIZE); + final ByteArray result = new ByteArray(SncpHeader.HEADER_SIZE); fillHeader(result, 0, retcode); producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null)); return; } final int respBodyLength = out.count(); //body总长度 final ByteArray result = out.toByteArray(); - fillHeader(result, respBodyLength - Sncp.HEADER_SIZE, retcode); + fillHeader(result, respBodyLength - SncpHeader.HEADER_SIZE, retcode); producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes())); } } diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java index f98d27b79..27a63c42c 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNode.java +++ b/src/main/java/org/redkale/net/http/WebSocketNode.java @@ -41,7 +41,7 @@ public abstract class WebSocketNode { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); //"SNCP_ADDR" 如果不是分布式(没有SNCP) 值为null - @Resource(name = Application.RESNAME_SNCP_ADDR, required = false) + @Resource(name = Application.RESNAME_SNCP_ADDRESS, required = false) protected InetSocketAddress localSncpAddress; //为SncpServer的服务address protected WebSocketAddress wsNodeAddress; diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index f99b41148..b6acd79bd 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -22,7 +22,7 @@ import org.redkale.asm.Type; import org.redkale.mq.MessageAgent; import org.redkale.net.TransportFactory; import org.redkale.net.http.WebSocketNode; -import org.redkale.net.sncp.SncpClient.SncpAction; +import org.redkale.net.sncp.SncpOldClient.SncpAction; import org.redkale.service.*; import org.redkale.util.*; @@ -37,18 +37,7 @@ import org.redkale.util.*; */ public abstract class Sncp { - public static final int HEADER_SIZE = 60; - - private static final byte[] PING_BYTES = new ByteArray(HEADER_SIZE) - .putLong(0L) //8 seqid - .putChar((char) HEADER_SIZE) //2 headerSize - .putUint128(Uint128.ZERO) //16 serviceid - .putInt(0) //4 serviceVersion - .putUint128(Uint128.ZERO) //16 actionid - .put(new byte[6]) //6 addr - .putInt(0) //4 bodyLength - .putInt(0) //4 retcode - .getBytes(); + private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).write(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes(); private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length); @@ -178,14 +167,14 @@ public abstract class Sncp { } } - public static SncpClient getSncpClient(Service service) { + public static SncpOldClient getSncpOldClient(Service service) { if (service == null || !isSncpDyn(service)) { return null; } try { Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_client"); ts.setAccessible(true); - return (SncpClient) ts.get(service); + return (SncpOldClient) ts.get(service); } catch (Exception e) { throw new SncpException(service + " not found " + FIELDPREFIX + "_client"); } @@ -228,7 +217,7 @@ public abstract class Sncp { if (!isSncpDyn(service)) { return false; } - SncpClient client = getSncpClient(service); + SncpOldClient client = getSncpOldClient(service); client.setRemoteGroups(groups); if (client.getRemoteGroupTransport() != null) { client.getRemoteGroupTransport().updateRemoteAddresses(addresses); @@ -330,11 +319,11 @@ public abstract class Sncp { * @ResourceType(TestService.class) * public final class _DynLocalTestService extends TestService{ * - * private AnyValue _redkale_conf; + * private AnyValue _redkale_conf; * - * private SncpClient _redkale_client; + * private SncpOldClient _redkale_client; * - * @Override + * @Override * public String toString() { * return _redkale_selfstring == null ? super.toString() : _redkale_selfstring; * } @@ -365,9 +354,9 @@ public abstract class Sncp { throw new SncpException(serviceImplClass + " is abstract"); } final String supDynName = serviceImplClass.getName().replace('.', '/'); - final String clientName = SncpClient.class.getName().replace('.', '/'); + final String clientName = SncpOldClient.class.getName().replace('.', '/'); final String resDesc = Type.getDescriptor(Resource.class); - final String clientDesc = Type.getDescriptor(SncpClient.class); + final String clientDesc = Type.getDescriptor(SncpOldClient.class); final String anyValueDesc = Type.getDescriptor(AnyValue.class); final String sncpDynDesc = Type.getDescriptor(SncpDyn.class); ClassLoader loader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader; @@ -552,12 +541,12 @@ public abstract class Sncp { } } while ((loop = loop.getSuperclass()) != Object.class); } - SncpClient client = null; + SncpOldClient client = null; { try { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); - client = new SncpClient(name, serviceImplClass, service, messageAgent, transportFactory, false, newClazz, clientSncpAddress); + client = new SncpOldClient(name, serviceImplClass, service, messageAgent, transportFactory, false, newClazz, clientSncpAddress); c.set(service, client); if (transportFactory != null) { transportFactory.addSncpService(service); @@ -604,11 +593,11 @@ public abstract class Sncp { * @ResourceType(TestService.class) * public final class _DynRemoteTestService extends TestService{ * - * private AnyValue _redkale_conf; + * private AnyValue _redkale_conf; * - * private SncpClient _redkale_client; + * private SncpOldClient _redkale_client; * - * @Override + * @Override * public void createSomeThing(TestBean bean){ * _redkale_client.remote(0, bean); * } @@ -664,9 +653,9 @@ public abstract class Sncp { return null; } final String supDynName = serviceTypeOrImplClass.getName().replace('.', '/'); - final String clientName = SncpClient.class.getName().replace('.', '/'); + final String clientName = SncpOldClient.class.getName().replace('.', '/'); final String resDesc = Type.getDescriptor(Resource.class); - final String clientDesc = Type.getDescriptor(SncpClient.class); + final String clientDesc = Type.getDescriptor(SncpOldClient.class); final String sncpDynDesc = Type.getDescriptor(SncpDyn.class); final String anyValueDesc = Type.getDescriptor(AnyValue.class); final ClassLoader loader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader; @@ -676,7 +665,7 @@ public abstract class Sncp { Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.')); Class newClazz = clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz; T service = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpOldClient client = new SncpOldClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); if (transportFactory != null) { client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); @@ -800,7 +789,8 @@ public abstract class Sncp { mv.visitEnd(); } int i = -1; - for (final SncpAction entry : SncpClient.getSncpActions(realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass)) { + Uint128 serviceid = serviceid(name, serviceTypeOrImplClass); + for (final SncpAction entry : SncpOldClient.getSncpActions(realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, serviceid)) { final int index = ++i; final java.lang.reflect.Method method = entry.method; { @@ -895,7 +885,7 @@ public abstract class Sncp { RedkaleClassLoader.putReflectionDeclaredConstructors(newClazz, newDynName.replace('/', '.')); try { T service = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpOldClient client = new SncpOldClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); if (transportFactory != null) { client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index bc4535b26..0d7e86e0f 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -1,630 +1,25 @@ /* - * To change this license header, choose License Headers reader Project Properties. - * To change this template file, choose Tools | Templates - * and open the template reader the editor. + * */ package org.redkale.net.sncp; -import java.lang.annotation.Annotation; -import java.lang.reflect.*; -import java.net.*; -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; -import java.util.*; -import java.util.concurrent.*; -import java.util.logging.*; -import org.redkale.annotation.Resource; -import org.redkale.convert.bson.*; -import org.redkale.convert.json.*; -import org.redkale.mq.*; import org.redkale.net.*; -import static org.redkale.net.sncp.Sncp.HEADER_SIZE; -import org.redkale.net.sncp.Sncp.SncpDyn; -import static org.redkale.net.sncp.SncpRequest.*; -import static org.redkale.net.sncp.SncpResponse.fillRespHeader; -import org.redkale.service.*; -import org.redkale.source.*; -import org.redkale.util.*; +import org.redkale.net.client.*; /** - * - *

- * 详情见: https://redkale.org * * @author zhangjx */ -public final class SncpClient { +public class SncpClient extends Client { - protected static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName()); - - protected final JsonConvert convert = JsonFactory.root().getConvert(); - - protected final String name; - - protected final boolean remote; - - private final Class serviceClass; - - protected final InetSocketAddress clientSncpAddress; - - private final byte[] addrBytes; - - private final int addrPort; - - protected final Uint128 serviceid; - - protected final int serviceVersion; - - protected final SncpAction[] actions; - - protected final MessageAgent messageAgent; - - protected final SncpMessageClient messageClient; - - protected final String topic; - - @Resource - protected BsonConvert bsonConvert; - - //远程模式, 可能为null - protected Set remoteGroups; - - //远程模式, 可能为null - protected Transport remoteGroupTransport; - - public SncpClient(final String serviceResourceName, final Class serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory, - final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) { - this.remote = remote; - this.messageAgent = messageAgent; - this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); - this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service); - this.serviceClass = serviceClass; - this.serviceVersion = 0; //暂不实现Version - this.clientSncpAddress = clientSncpAddress; - this.name = serviceResourceName; - Class serviceResourceType = ResourceFactory.getResourceType(serviceTypeOrImplClass); //serviceResourceType - this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType); - final List methodens = new ArrayList<>(); - //------------------------------------------------------------------------------ - for (Map.Entry en : parseMethodActions(serviceClass).entrySet()) { - methodens.add(new SncpAction(serviceClass, en.getValue(), en.getKey())); - } - this.actions = methodens.toArray(new SncpAction[methodens.size()]); - this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); - this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); - if (this.addrBytes.length != 4) { - throw new SncpException("SNCP clientAddress only support IPv4"); - } - } - - static List getSncpActions(final Class serviceClass) { - final List actions = new ArrayList<>(); - //------------------------------------------------------------------------------ - for (Map.Entry en : parseMethodActions(serviceClass).entrySet()) { - actions.add(new SncpAction(serviceClass, en.getValue(), en.getKey())); - } - return actions; - } - - public MessageAgent getMessageAgent() { - return messageAgent; - } - - public InetSocketAddress getClientAddress() { - return clientSncpAddress; - } - - public Uint128 getServiceid() { - return serviceid; - } - - public int getServiceVersion() { - return serviceVersion; - } - - public int getActionCount() { - return actions.length; - } - - public Set getRemoteGroups() { - return remoteGroups; - } - - public void setRemoteGroups(Set remoteGroups) { - this.remoteGroups = remoteGroups; - } - - public Transport getRemoteGroupTransport() { - return remoteGroupTransport; - } - - public void setRemoteGroupTransport(Transport remoteGroupTransport) { - this.remoteGroupTransport = remoteGroupTransport; + @SuppressWarnings("OverridableMethodCallInConstructor") + public SncpClient(String name, AsyncGroup group, String key, ClientAddress address, int maxConns, int maxPipelines) { + super(name, group, true, address, maxConns, maxPipelines, null, null, null); //maxConns } @Override - public String toString() { - String service = serviceClass.getName(); - if (remote) { - service = service.replace("DynLocalService", "DynRemoteService"); - } - return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", name = '" + name - + "', address = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) - + ", actions.size = " + actions.length + ")"; + protected SncpClientConnection createClientConnection(int index, AsyncConnection channel) { + throw new UnsupportedOperationException("Not supported yet."); } - public String toSimpleString() { //给Sncp产生的Service用 - if (DataSource.class.isAssignableFrom(serviceClass) || CacheSource.class.isAssignableFrom(serviceClass)) { - String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); - return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")"; - } - String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); - if (remote) { - service = service.replace("DynLocalService", "DynRemoteService"); - } - return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion - + ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) - + ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups) - + (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses())) - + ", actions.size = " + actions.length + ")"; - } - - public static LinkedHashMap parseMethodActions(final Class serviceClass) { - final List list = new ArrayList<>(); - final List multis = new ArrayList<>(); - final Map actionids = new LinkedHashMap<>(); - for (final java.lang.reflect.Method method : serviceClass.getMethods()) { - if (method.isSynthetic()) { - continue; - } - if (Modifier.isStatic(method.getModifiers())) { - continue; - } - if (Modifier.isFinal(method.getModifiers())) { - continue; - } - if (method.getAnnotation(Local.class) != null) { - continue; - } - if (method.getName().equals("getClass") || method.getName().equals("toString")) { - continue; - } - if (method.getName().equals("equals") || method.getName().equals("hashCode")) { - continue; - } - if (method.getName().equals("notify") || method.getName().equals("notifyAll") || method.getName().equals("wait")) { - continue; - } - if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == AnyValue.class) { - if (method.getName().equals("init") || method.getName().equals("stop") || method.getName().equals("destroy")) { - continue; - } - } - - Uint128 actionid = Sncp.actionid(method); - Method old = actionids.get(actionid); - if (old != null) { - if (old.getDeclaringClass().equals(method.getDeclaringClass())) { - throw new SncpException(serviceClass.getName() + " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")"); - } - continue; - } - actionids.put(actionid, method); - if (method.getAnnotation(SncpDyn.class) != null) { - multis.add(method); - } else { - list.add(method); - } - } - multis.sort((m1, m2) -> m1.getAnnotation(SncpDyn.class).index() - m2.getAnnotation(SncpDyn.class).index()); - list.sort((Method o1, Method o2) -> { - if (!o1.getName().equals(o2.getName())) { - return o1.getName().compareTo(o2.getName()); - } - if (o1.getParameterCount() != o2.getParameterCount()) { - return o1.getParameterCount() - o2.getParameterCount(); - } - return 0; - }); - //带SncpDyn必须排在前面 - multis.addAll(list); - final LinkedHashMap rs = new LinkedHashMap<>(); - for (Method method : multis) { - for (Map.Entry en : actionids.entrySet()) { - if (en.getValue() == method) { - rs.put(en.getKey(), en.getValue()); - break; - } - } - } - return rs; - } - - //只给远程模式调用的 - public T remote(final int index, final Object... params) { - final SncpAction action = actions[index]; - final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null; - if (action.handlerFuncParamIndex >= 0) { - params[action.handlerFuncParamIndex] = null; - } - final BsonReader reader = bsonConvert.pollBsonReader(); - CompletableFuture future = remote0(handlerFunc, remoteGroupTransport, null, action, params); - if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥 - CompletableFuture result = action.futureCreator.create(); - future.whenComplete((v, e) -> { - try { - if (e != null) { - result.completeExceptionally(e); - } else { - reader.setBytes(v); - byte i; - while ((i = reader.readByte()) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); - } - Object rs = bsonConvert.convertFrom(Object.class, reader); - - result.complete(rs); - } - } catch (Exception exp) { - result.completeExceptionally(exp); - } finally { - bsonConvert.offerBsonReader(reader); - } - }); //需要获取 Executor - return (T) result; - } - if (handlerFunc != null) { - return null; - } - try { - reader.setBytes(future.get(5, TimeUnit.SECONDS)); - byte i; - while ((i = reader.readByte()) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); - } - return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); - } catch (RpcRemoteException re) { - throw re; - } catch (TimeoutException e) { - throw new RpcRemoteException(actions[index].method + " sncp remote timeout, params=" + JsonConvert.root().convertTo(params)); - } catch (InterruptedException | ExecutionException e) { - throw new RpcRemoteException(actions[index].method + " sncp remote error, params=" + JsonConvert.root().convertTo(params), e); - } finally { - bsonConvert.offerBsonReader(reader); - } - } - - private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { - final String traceid = Traces.currTraceid(); - final Type[] myparamtypes = action.paramTypes; - final Class[] myparamclass = action.paramClass; - if (action.addressSourceParamIndex >= 0) { - params[action.addressSourceParamIndex] = this.clientSncpAddress; - } - if (bsonConvert == null) { - bsonConvert = BsonConvert.root(); - } - final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入 - writer.writeTo(DEFAULT_HEADER); - for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean - BsonConvert bcc = bsonConvert; - if (params[i] instanceof org.redkale.service.RetResult) { - org.redkale.convert.Convert cc = ((org.redkale.service.RetResult) params[i]).convert(); - if (cc instanceof BsonConvert) { - bcc = (BsonConvert) cc; - } - } - bcc.convertTo(writer, CompletionHandler.class.isAssignableFrom(myparamclass[i]) ? CompletionHandler.class : myparamtypes[i], params[i]); - } - final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 - final long seqid = System.nanoTime(); - final Uint128 actionid = action.actionid; - if (messageAgent != null) { //MQ模式 - final ByteArray reqbytes = writer.toByteArray(); - fillHeader(reqbytes, seqid, actionid, traceid, reqBodyLength); - String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; - if (targetTopic == null) { - targetTopic = this.topic; - } - MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes.getBytes()); - final String tt = targetTopic; - if (logger.isLoggable(Level.FINER)) { - message.attach(Utility.append(new Object[]{action.actionName()}, params)); - } else { - message.attach(params); - } - return messageClient.sendMessage(message).thenApply(msg -> { - if (msg == null || msg.getContent() == null) { - logger.log(Level.SEVERE, action.method + " sncp mq(params: " + convert.convertTo(params) + ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg); - return null; - } - ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); - checkResult(seqid, action, buffer); - - final int respBodyLength = buffer.getInt(); - final int retcode = buffer.getInt(); - if (retcode != 0) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params)); - throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); - } - byte[] body = new byte[respBodyLength]; - buffer.get(body, 0, respBodyLength); - return body; - }); - } - final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; - CompletableFuture connFuture = transport.pollConnection(addr); - return connFuture.thenCompose(conn0 -> { - final CompletableFuture future = new CompletableFuture(); - if (conn0 == null) { - future.completeExceptionally(new RpcRemoteException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect, params=" + JsonConvert.root().convertTo(params))); - return future; - } - if (!conn0.isOpen()) { - conn0.dispose(); - future.completeExceptionally(new RpcRemoteException("sncp " + conn0.getRemoteAddress() + " cannot connect, params=" + JsonConvert.root().convertTo(params))); - return future; - } - final AsyncConnection conn = conn0; - final ByteArray array = writer.toByteArray(); - fillHeader(array, seqid, actionid, traceid, reqBodyLength); - - conn.write(array, new CompletionHandler() { - - @Override - public void completed(Integer result, Void attachments) { - //----------------------- 读取返回结果 ------------------------------------- - conn.read(new CompletionHandler() { - - private byte[] body; - - private int received; - - @Override - public void completed(Integer count, ByteBuffer buffer) { - try { - if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 - future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params))); - conn.offerReadBuffer(buffer); - transport.offerConnection(true, conn); - return; - } - if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 - conn.setReadBuffer(buffer); - conn.read(this); - return; - } - buffer.flip(); - if (received > 0) { - int offset = this.received; - this.received += buffer.remaining(); - buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); - if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 - buffer.clear(); - conn.setReadBuffer(buffer); - conn.read(this); - } else { - conn.offerReadBuffer(buffer); - success(); - } - return; - } - checkResult(seqid, action, buffer); - - final int respBodyLength = buffer.getInt(); - final int retcode = buffer.getInt(); - if (retcode != 0) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params)); - throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); - } - - if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 - this.body = new byte[respBodyLength]; - this.received = buffer.remaining(); - buffer.get(body, 0, this.received); - buffer.clear(); - conn.setReadBuffer(buffer); - conn.read(this); - } else { - this.body = new byte[respBodyLength]; - buffer.get(body, 0, respBodyLength); - conn.offerReadBuffer(buffer); - success(); - } - } catch (Throwable e) { - future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params))); - transport.offerConnection(true, conn); - if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - handler.failed(e, handlerAttach); - } - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e); - } - } - - @SuppressWarnings("unchecked") - public void success() { - future.complete(this.body); - transport.offerConnection(false, conn); - if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - final BsonReader reader = bsonConvert.pollBsonReader(); - try { - reader.setBytes(this.body); - int i; - while ((i = (reader.readByte() & 0xff)) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); - } - Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); - handler.completed(rs, handlerAttach); - } catch (Exception e) { - handler.failed(e, handlerAttach); - } finally { - bsonConvert.offerBsonReader(reader); - } - } - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment2) { - future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); - conn.offerReadBuffer(attachment2); - transport.offerConnection(true, conn); - if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - handler.failed(exc, handlerAttach); - } - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed, params=" + JsonConvert.root().convertTo(params), exc); - } - }); - } - - @Override - public void failed(Throwable exc, Void attachment) { - future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); - transport.offerConnection(true, conn); - if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - handler.failed(exc, handlerAttach); - } - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed, params=" + JsonConvert.root().convertTo(params), exc); - } - }); - return future; - }); - } - - private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) { - long rseqid = buffer.getLong(); - if (rseqid != seqid) { - throw new SncpException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid); - } - if (buffer.getChar() != HEADER_SIZE) { - throw new SncpException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); - } - Uint128 rserviceid = Uint128.read(buffer); - if (!rserviceid.equals(this.serviceid)) { - throw new SncpException("sncp(" + action.method + ") response.serviceid = " + serviceid + ", but request.serviceid =" + rserviceid); - } - int version = buffer.getInt(); - if (version != this.serviceVersion) { - throw new SncpException("sncp(" + action.method + ") response.serviceVersion = " + serviceVersion + ", but request.serviceVersion =" + version); - } - Uint128 raction = Uint128.read(buffer); - Uint128 actid = action.actionid; - if (!actid.equals(raction)) { - throw new SncpException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + raction + ")"); - } - buffer.getInt(); //地址 - buffer.getChar(); //端口 - } - - private void fillHeader(ByteArray buffer, long seqid, Uint128 actionid, String traceid, int bodyLength) { - fillRespHeader(buffer, seqid, this.serviceid, this.serviceVersion, - actionid, traceid, this.addrBytes, this.addrPort, bodyLength, 0); //结果码, 请求方固定传0 - } - - protected static final class SncpAction { - - protected final Uint128 actionid; - - protected final Method method; - - protected final Type resultTypes; //void 必须设为 null - - protected final Type[] paramTypes; - - protected final Class[] paramClass; - - protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 - - protected final int handlerFuncParamIndex; - - protected final int handlerAttachParamIndex; - - protected final int addressTargetParamIndex; - - protected final int addressSourceParamIndex; - - protected final int topicTargetParamIndex; - - protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture - - protected final Creator futureCreator; - - @SuppressWarnings("unchecked") - public SncpAction(final Class clazz, Method method, Uint128 actionid) { - this.actionid = actionid == null ? Sncp.actionid(method) : actionid; - Type rt = TypeToken.getGenericType(method.getGenericReturnType(), clazz); - this.resultTypes = rt == void.class ? null : rt; - this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); - this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; - this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), clazz); - this.paramClass = method.getParameterTypes(); - this.method = method; - Annotation[][] anns = method.getParameterAnnotations(); - int tpoicAddrIndex = -1; - int targetAddrIndex = -1; - int sourceAddrIndex = -1; - int handlerAttachIndex = -1; - int handlerFuncIndex = -1; - boolean hasattr = false; - Attribute[] atts = new Attribute[paramTypes.length + 1]; - if (anns.length > 0) { - Class[] params = method.getParameterTypes(); - for (int i = 0; i < params.length; i++) { - if (CompletionHandler.class.isAssignableFrom(params[i])) { - if (boolReturnTypeFuture) { - throw new SncpException(method + " have both CompletionHandler and CompletableFuture"); - } - if (handlerFuncIndex >= 0) { - throw new SncpException(method + " have more than one CompletionHandler type parameter"); - } - Sncp.checkAsyncModifier(params[i], method); - handlerFuncIndex = i; - break; - } - } - for (int i = 0; i < anns.length; i++) { - if (anns[i].length > 0) { - for (Annotation ann : anns[i]) { - if (ann.annotationType() == RpcAttachment.class) { - if (handlerAttachIndex >= 0) { - throw new SncpException(method + " have more than one @RpcAttachment parameter"); - } - handlerAttachIndex = i; - } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - targetAddrIndex = i; - } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - sourceAddrIndex = i; - } else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) { - tpoicAddrIndex = i; - } - } - } - } - } - this.topicTargetParamIndex = tpoicAddrIndex; - this.addressTargetParamIndex = targetAddrIndex; - this.addressSourceParamIndex = sourceAddrIndex; - this.handlerFuncParamIndex = handlerFuncIndex; - this.handlerAttachParamIndex = handlerAttachIndex; - this.paramAttrs = hasattr ? atts : null; - if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { - throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); - } - } - - public String actionName() { - return method.getDeclaringClass().getSimpleName() + "." + method.getName(); - } - - @Override - public String toString() { - return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}"; - } - } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java new file mode 100644 index 000000000..941e55fb4 --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java @@ -0,0 +1,93 @@ +/* + * + */ +package org.redkale.net.sncp; + +import java.nio.ByteBuffer; +import java.util.logging.Logger; +import org.redkale.net.client.ClientCodec; +import org.redkale.util.ByteArray; + +/** + * + * @author zhangjx + */ +public class SncpClientCodec extends ClientCodec { + + protected static final Logger logger = Logger.getLogger(SncpClientCodec.class.getSimpleName()); + + private ByteArray recyclableArray; + + protected ByteArray halfBodyBytes; + + protected ByteArray halfHeaderBytes; + + SncpClientResult lastResult = null; + + public SncpClientCodec(SncpClientConnection connection) { + super(connection); + } + + protected ByteArray pollArray() { + if (recyclableArray == null) { + recyclableArray = new ByteArray(); + return recyclableArray; + } + recyclableArray.clear(); + return recyclableArray; + } + + @Override + public boolean decodeMessages(ByteBuffer realBuf, ByteArray array) { + SncpClientConnection conn = (SncpClientConnection) connection; + + ByteBuffer buffer = realBuf; + boolean hadResult = false; + while (buffer.hasRemaining()) { + if (halfHeaderBytes != null) { + if (buffer.remaining() + halfHeaderBytes.length() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header + halfHeaderBytes.put(buffer); + return hadResult; + } + halfHeaderBytes.put(buffer, SncpHeader.HEADER_SIZE - halfHeaderBytes.length()); + //读取完整header + SncpClientResult result = new SncpClientResult(); + result.readHeader(halfHeaderBytes); + halfHeaderBytes = null; + if (result.getBodyLength() < 1) { + addMessage(findRequest(result.getRequestid()), result); + lastResult = null; + continue; + } + //还需要读body + lastResult = result; + } + if (lastResult != null) { //buffer不够 + if (halfBodyBytes != null) { + if (buffer.remaining() + halfBodyBytes.length() < lastResult.getBodyLength()) { //buffer不足以读取完整body + halfBodyBytes.put(buffer); + return hadResult; + } + halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfHeaderBytes.length()); + //读取完整body + lastResult.setBodyContent(halfBodyBytes.getBytes()); + halfBodyBytes = null; + addMessage(findRequest(lastResult.getRequestid()), lastResult); + lastResult = null; + continue; + } + + } + if (buffer.remaining() < SncpHeader.HEADER_SIZE) { //内容不足以读取完整header + halfHeaderBytes = pollArray(); + halfHeaderBytes.put(buffer); + return hadResult; + } + + SncpClientRequest request = null; + buffer = realBuf; + } + return hadResult; + } + +} diff --git a/src/main/java/org/redkale/net/sncp/SncpClientConnection.java b/src/main/java/org/redkale/net/sncp/SncpClientConnection.java new file mode 100644 index 000000000..54aa7cd2d --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpClientConnection.java @@ -0,0 +1,24 @@ +/* + * + */ +package org.redkale.net.sncp; + +import org.redkale.net.AsyncConnection; +import org.redkale.net.client.*; + +/** + * + * @author zhangjx + */ +public class SncpClientConnection extends ClientConnection { + + public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) { + super(client, index, channel); + } + + @Override + protected ClientCodec createCodec() { + return new SncpClientCodec(this); + } + +} diff --git a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java new file mode 100644 index 000000000..84a527500 --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java @@ -0,0 +1,101 @@ +/* + * + */ +package org.redkale.net.sncp; + +import java.net.InetSocketAddress; +import java.util.Objects; +import org.redkale.net.client.*; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class SncpClientRequest extends ClientRequest { + + private final InetSocketAddress clientSncpAddress; + + private final byte[] addrBytes; + + private final int addrPort; + + private long seqid; + + private Uint128 serviceid; + + private int serviceVersion; + + private Uint128 actionid; + + private byte[] bodyContent; + + public SncpClientRequest(InetSocketAddress clientSncpAddress) { + this.clientSncpAddress = clientSncpAddress; + this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); + this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); + } + + public SncpClientRequest prepare(long seqid, Uint128 serviceid, int serviceVersion, Uint128 actionid, String traceid, byte[] bodyContent) { + super.prepare(); + this.seqid = seqid; + this.serviceid = serviceid; + this.serviceVersion = serviceVersion; + this.actionid = actionid; + this.traceid = traceid; + this.bodyContent = bodyContent; + return this; + } + + @Override + protected boolean recycle() { + boolean rs = super.recycle(); + this.seqid = 0; + this.serviceVersion = 0; + this.serviceid = null; + this.actionid = null; + this.bodyContent = null; + return rs; + } + + @Override + public void writeTo(ClientConnection conn, ByteArray array) { + + } + + @Override + public String toString() { + return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{" + + "seqid = " + seqid + + ", serviceVersion = " + serviceVersion + + ", serviceid = " + serviceid + + ", actionid = " + actionid + + ", bodyLength = " + (bodyContent == null ? -1 : bodyContent.length) + + "}"; + } + + public long getSeqid() { + return seqid; + } + + public Uint128 getServiceid() { + return serviceid; + } + + public int getServiceVersion() { + return serviceVersion; + } + + public Uint128 getActionid() { + return actionid; + } + + public InetSocketAddress getClientSncpAddress() { + return clientSncpAddress; + } + + public byte[] getBodyContent() { + return bodyContent; + } + +} diff --git a/src/main/java/org/redkale/net/sncp/SncpClientResult.java b/src/main/java/org/redkale/net/sncp/SncpClientResult.java new file mode 100644 index 000000000..2d7cb8221 --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpClientResult.java @@ -0,0 +1,144 @@ +/* + * + */ +package org.redkale.net.sncp; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class SncpClientResult { + + private long seqid; + + private Uint128 serviceid; + + private int serviceVersion; + + private Uint128 actionid; + + private byte[] addrBytes; + + private int addrPort; + + private int bodyLength; + + private byte[] bodyContent; + + private int retcode; + + protected void readHeader(ByteBuffer buffer) { + this.seqid = buffer.getLong(); //8 + buffer.getChar(); //HEADER_SIZE 2 + this.serviceid = Uint128.read(buffer); //16 + this.serviceVersion = buffer.getInt(); //4 + this.actionid = Uint128.read(buffer); //16 + this.addrBytes = new byte[4]; + buffer.get(this.addrBytes); //addr 4 + this.addrPort = buffer.getChar(); //port 2 + this.bodyLength = buffer.getInt(); //4 + this.retcode = buffer.getInt(); //4 + } + + protected void readHeader(ByteArray array) { + int offset = 0; + this.seqid = array.getLong(offset); //8 + offset += 8; + array.getChar(offset); //HEADER_SIZE 2 + offset += 2; + this.serviceid = array.getUint128(offset); //16 + offset += 16; + this.serviceVersion = array.getInt(offset); //4 + offset += 4; + this.actionid = array.getUint128(offset); //16 + offset += 16; + this.addrBytes = array.getBytes(offset, 4); //addr 4 + offset += 4; + this.addrPort = array.getChar(offset); //port 2 + offset += 2; + this.bodyLength = array.getInt(offset); //4 + offset += 4; + this.retcode = array.getInt(offset); //4 + } + + public Serializable getRequestid() { + return seqid; + } + + public long getSeqid() { + return seqid; + } + + public void setSeqid(long seqid) { + this.seqid = seqid; + } + + public Uint128 getServiceid() { + return serviceid; + } + + public void setServiceid(Uint128 serviceid) { + this.serviceid = serviceid; + } + + public int getServiceVersion() { + return serviceVersion; + } + + public void setServiceVersion(int serviceVersion) { + this.serviceVersion = serviceVersion; + } + + public Uint128 getActionid() { + return actionid; + } + + public void setActionid(Uint128 actionid) { + this.actionid = actionid; + } + + public byte[] getAddrBytes() { + return addrBytes; + } + + public void setAddrBytes(byte[] addrBytes) { + this.addrBytes = addrBytes; + } + + public int getAddrPort() { + return addrPort; + } + + public void setAddrPort(int addrPort) { + this.addrPort = addrPort; + } + + public int getBodyLength() { + return bodyLength; + } + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } + + public byte[] getBodyContent() { + return bodyContent; + } + + public void setBodyContent(byte[] bodyContent) { + this.bodyContent = bodyContent; + } + + public int getRetcode() { + return retcode; + } + + public void setRetcode(int retcode) { + this.retcode = retcode; + } + +} diff --git a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java index 8d2c2d8ac..590b4bb24 100644 --- a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java @@ -82,7 +82,7 @@ public class SncpDispatcherServlet extends DispatcherServlet en : SncpClient.parseMethodActions(service.getClass()).entrySet()) { + for (Map.Entry en : SncpOldClient.parseMethodActions(service.getClass()).entrySet()) { SncpServletAction action; try { action = SncpServletAction.create(service, en.getKey(), en.getValue()); @@ -100,7 +100,7 @@ public final class SncpDynServlet extends SncpServlet { @Override @SuppressWarnings("unchecked") public void execute(SncpRequest request, SncpResponse response) throws IOException { - final SncpServletAction action = actions.get(request.getActionid()); + final SncpServletAction action = actions.get(request.getHeader().getActionid()); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java new file mode 100644 index 000000000..94856f8fb --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -0,0 +1,204 @@ +/* + * + */ +package org.redkale.net.sncp; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class SncpHeader { + + public static final int HEADER_SIZE = 60; + + private static final byte[] EMPTY_ADDR = new byte[4]; + + private long seqid; + + private Uint128 serviceid; + + private int serviceVersion; + + private Uint128 actionid; + + //SncpRequest的值是clientSncpAddress,SncpResponse的值是serverSncpAddress + private byte[] addrBytes; + + private int addrPort; + + private int bodyLength; + + private int retcode; + + public SncpHeader() { + } + + public SncpHeader(InetSocketAddress clientSncpAddress) { + this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); + this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); + } + + public SncpHeader(InetSocketAddress clientSncpAddress, Uint128 serviceid, Uint128 actionid) { + this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); + this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); + this.serviceid = serviceid; + this.actionid = actionid; + } + + public boolean read(ByteBuffer buffer) { + this.seqid = buffer.getLong(); //8 + if (buffer.getChar() != HEADER_SIZE) { //HEADER_SIZE 2 + return false; + } + this.serviceid = Uint128.read(buffer); //16 + this.serviceVersion = buffer.getInt(); //4 + this.actionid = Uint128.read(buffer); //16 + this.addrBytes = new byte[4]; + buffer.get(this.addrBytes); //addr 4 + this.addrPort = buffer.getChar(); //port 2 + this.bodyLength = buffer.getInt(); //4 + this.retcode = buffer.getInt(); //4 + return true; + } + + public boolean readHeader(ByteArray array) { + int offset = 0; + this.seqid = array.getLong(offset); //8 + offset += 8; + if (array.getChar(offset) != HEADER_SIZE) { //HEADER_SIZE 2 + return false; + } + offset += 2; + this.serviceid = array.getUint128(offset); //16 + offset += 16; + this.serviceVersion = array.getInt(offset); //4 + offset += 4; + this.actionid = array.getUint128(offset); //16 + offset += 16; + this.addrBytes = array.getBytes(offset, 4); //addr 4 + offset += 4; + this.addrPort = array.getChar(offset); //port 2 + offset += 2; + this.bodyLength = array.getInt(offset); //4 + offset += 4; + this.retcode = array.getInt(offset); //4 + return true; + } + + public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) { + byte[] newAddrBytes = address == null ? EMPTY_ADDR : address.getAddress().getAddress(); + int newAddrPort = address == null ? 0 : address.getPort(); + return write(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode); + } + + public ByteArray write(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) { + int offset = 0; + array.putLong(offset, newSeqid); + offset += 8; + array.putChar(offset, (char) HEADER_SIZE); + offset += 2; + array.putUint128(offset, serviceid); + offset += 16; + array.putInt(offset, serviceVersion); + offset += 4; + array.putUint128(offset, actionid); + offset += 16; + array.put(offset, newAddrBytes); + offset += newAddrBytes.length; //4 + array.putChar(offset, (char) newAddrPort); + offset += 2; + array.putInt(offset, bodyLength); + offset += 4; + array.putInt(offset, retcode); //4 + return array; + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "{seqid=" + this.seqid + + ",serviceid=" + this.serviceid + + ",serviceVersion=" + this.serviceVersion + + ",actionid=" + this.actionid + + ",address=" + getAddress() + + ",bodyLength=" + this.bodyLength + + ",retcode=" + this.retcode + + "}"; + } + + public InetSocketAddress getAddress() { + if (addrBytes == null || addrBytes[0] == 0) { + return null; + } + return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]), addrPort); + } + + public long getSeqid() { + return seqid; + } + + public void setSeqid(long seqid) { + this.seqid = seqid; + } + + public Uint128 getServiceid() { + return serviceid; + } + + public void setServiceid(Uint128 serviceid) { + this.serviceid = serviceid; + } + + public int getServiceVersion() { + return serviceVersion; + } + + public void setServiceVersion(int serviceVersion) { + this.serviceVersion = serviceVersion; + } + + public Uint128 getActionid() { + return actionid; + } + + public void setActionid(Uint128 actionid) { + this.actionid = actionid; + } + + public byte[] getAddrBytes() { + return addrBytes; + } + + public void setAddrBytes(byte[] addrBytes) { + this.addrBytes = addrBytes; + } + + public int getAddrPort() { + return addrPort; + } + + public void setAddrPort(int addrPort) { + this.addrPort = addrPort; + } + + public int getBodyLength() { + return bodyLength; + } + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } + + public int getRetcode() { + return retcode; + } + + public void setRetcode(int retcode) { + this.retcode = retcode; + } + +} diff --git a/src/main/java/org/redkale/net/sncp/SncpOldClient.java b/src/main/java/org/redkale/net/sncp/SncpOldClient.java new file mode 100644 index 000000000..c4fe60010 --- /dev/null +++ b/src/main/java/org/redkale/net/sncp/SncpOldClient.java @@ -0,0 +1,631 @@ +/* + * To change this license header, choose License Headers reader Project Properties. + * To change this template file, choose Tools | Templates + * and open the template reader the editor. + */ +package org.redkale.net.sncp; + +import java.lang.annotation.Annotation; +import java.lang.reflect.*; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.*; +import org.redkale.annotation.Resource; +import org.redkale.convert.bson.*; +import org.redkale.convert.json.*; +import org.redkale.mq.*; +import org.redkale.net.*; +import org.redkale.net.sncp.Sncp.SncpDyn; +import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; +import static org.redkale.net.sncp.SncpRequest.*; +import org.redkale.service.*; +import org.redkale.source.*; +import org.redkale.util.*; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public final class SncpOldClient { + + protected static final Logger logger = Logger.getLogger(SncpOldClient.class.getSimpleName()); + + protected final JsonConvert convert = JsonFactory.root().getConvert(); + + protected final String name; + + protected final boolean remote; + + private final Class serviceClass; + + protected final InetSocketAddress clientSncpAddress; + + private final byte[] addrBytes; + + private final int addrPort; + + protected final Uint128 serviceid; + + protected final int serviceVersion; + + protected final SncpAction[] actions; + + protected final MessageAgent messageAgent; + + protected final SncpMessageClient messageClient; + + protected final String topic; + + @Resource + protected BsonConvert bsonConvert; + + //远程模式, 可能为null + protected Set remoteGroups; + + //远程模式, 可能为null + protected Transport remoteGroupTransport; + + public SncpOldClient(final String serviceResourceName, final Class serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory, + final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) { + this.remote = remote; + this.messageAgent = messageAgent; + this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); + this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service); + this.serviceClass = serviceClass; + this.serviceVersion = 0; //暂不实现Version + this.clientSncpAddress = clientSncpAddress; + this.name = serviceResourceName; + Class serviceResourceType = ResourceFactory.getResourceType(serviceTypeOrImplClass); //serviceResourceType + this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType); + final List methodens = new ArrayList<>(); + //------------------------------------------------------------------------------ + for (Map.Entry en : parseMethodActions(serviceClass).entrySet()) { + methodens.add(new SncpAction(serviceClass, en.getValue(), serviceid, en.getKey())); + } + this.actions = methodens.toArray(new SncpAction[methodens.size()]); + this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); + this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); + if (this.addrBytes.length != 4) { + throw new SncpException("SNCP clientAddress only support IPv4"); + } + } + + static List getSncpActions(final Class serviceClass, Uint128 serviceid) { + final List actions = new ArrayList<>(); + //------------------------------------------------------------------------------ + for (Map.Entry en : parseMethodActions(serviceClass).entrySet()) { + actions.add(new SncpAction(serviceClass, en.getValue(), serviceid, en.getKey())); + } + return actions; + } + + public MessageAgent getMessageAgent() { + return messageAgent; + } + + public InetSocketAddress getClientAddress() { + return clientSncpAddress; + } + + public Uint128 getServiceid() { + return serviceid; + } + + public int getServiceVersion() { + return serviceVersion; + } + + public int getActionCount() { + return actions.length; + } + + public Set getRemoteGroups() { + return remoteGroups; + } + + public void setRemoteGroups(Set remoteGroups) { + this.remoteGroups = remoteGroups; + } + + public Transport getRemoteGroupTransport() { + return remoteGroupTransport; + } + + public void setRemoteGroupTransport(Transport remoteGroupTransport) { + this.remoteGroupTransport = remoteGroupTransport; + } + + @Override + public String toString() { + String service = serviceClass.getName(); + if (remote) { + service = service.replace("DynLocalService", "DynRemoteService"); + } + return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", name = '" + name + + "', address = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) + + ", actions.size = " + actions.length + ")"; + } + + public String toSimpleString() { //给Sncp产生的Service用 + if (DataSource.class.isAssignableFrom(serviceClass) || CacheSource.class.isAssignableFrom(serviceClass)) { + String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); + return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")"; + } + String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); + if (remote) { + service = service.replace("DynLocalService", "DynRemoteService"); + } + return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + + ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) + + ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups) + + (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses())) + + ", actions.size = " + actions.length + ")"; + } + + public static LinkedHashMap parseMethodActions(final Class serviceClass) { + final List list = new ArrayList<>(); + final List multis = new ArrayList<>(); + final Map actionids = new LinkedHashMap<>(); + for (final java.lang.reflect.Method method : serviceClass.getMethods()) { + if (method.isSynthetic()) { + continue; + } + if (Modifier.isStatic(method.getModifiers())) { + continue; + } + if (Modifier.isFinal(method.getModifiers())) { + continue; + } + if (method.getAnnotation(Local.class) != null) { + continue; + } + if (method.getName().equals("getClass") || method.getName().equals("toString")) { + continue; + } + if (method.getName().equals("equals") || method.getName().equals("hashCode")) { + continue; + } + if (method.getName().equals("notify") || method.getName().equals("notifyAll") || method.getName().equals("wait")) { + continue; + } + if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == AnyValue.class) { + if (method.getName().equals("init") || method.getName().equals("stop") || method.getName().equals("destroy")) { + continue; + } + } + + Uint128 actionid = Sncp.actionid(method); + Method old = actionids.get(actionid); + if (old != null) { + if (old.getDeclaringClass().equals(method.getDeclaringClass())) { + throw new SncpException(serviceClass.getName() + " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")"); + } + continue; + } + actionids.put(actionid, method); + if (method.getAnnotation(SncpDyn.class) != null) { + multis.add(method); + } else { + list.add(method); + } + } + multis.sort((m1, m2) -> m1.getAnnotation(SncpDyn.class).index() - m2.getAnnotation(SncpDyn.class).index()); + list.sort((Method o1, Method o2) -> { + if (!o1.getName().equals(o2.getName())) { + return o1.getName().compareTo(o2.getName()); + } + if (o1.getParameterCount() != o2.getParameterCount()) { + return o1.getParameterCount() - o2.getParameterCount(); + } + return 0; + }); + //带SncpDyn必须排在前面 + multis.addAll(list); + final LinkedHashMap rs = new LinkedHashMap<>(); + for (Method method : multis) { + for (Map.Entry en : actionids.entrySet()) { + if (en.getValue() == method) { + rs.put(en.getKey(), en.getValue()); + break; + } + } + } + return rs; + } + + //只给远程模式调用的 + public T remote(final int index, final Object... params) { + final SncpAction action = actions[index]; + final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null; + if (action.handlerFuncParamIndex >= 0) { + params[action.handlerFuncParamIndex] = null; + } + final BsonReader reader = bsonConvert.pollBsonReader(); + CompletableFuture future = remote0(handlerFunc, remoteGroupTransport, null, action, params); + if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥 + CompletableFuture result = action.futureCreator.create(); + future.whenComplete((v, e) -> { + try { + if (e != null) { + result.completeExceptionally(e); + } else { + reader.setBytes(v); + byte i; + while ((i = reader.readByte()) != 0) { + final Attribute attr = action.paramAttrs[i]; + attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); + } + Object rs = bsonConvert.convertFrom(Object.class, reader); + + result.complete(rs); + } + } catch (Exception exp) { + result.completeExceptionally(exp); + } finally { + bsonConvert.offerBsonReader(reader); + } + }); //需要获取 Executor + return (T) result; + } + if (handlerFunc != null) { + return null; + } + try { + reader.setBytes(future.get(5, TimeUnit.SECONDS)); + byte i; + while ((i = reader.readByte()) != 0) { + final Attribute attr = action.paramAttrs[i]; + attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); + } + return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); + } catch (RpcRemoteException re) { + throw re; + } catch (TimeoutException e) { + throw new RpcRemoteException(actions[index].method + " sncp remote timeout, params=" + JsonConvert.root().convertTo(params)); + } catch (InterruptedException | ExecutionException e) { + throw new RpcRemoteException(actions[index].method + " sncp remote error, params=" + JsonConvert.root().convertTo(params), e); + } finally { + bsonConvert.offerBsonReader(reader); + } + } + + private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + final String traceid = Traces.currTraceid(); + final Type[] myparamtypes = action.paramTypes; + final Class[] myparamclass = action.paramClass; + if (action.addressSourceParamIndex >= 0) { + params[action.addressSourceParamIndex] = this.clientSncpAddress; + } + if (bsonConvert == null) { + bsonConvert = BsonConvert.root(); + } + final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入 + writer.writeTo(DEFAULT_HEADER); + for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean + BsonConvert bcc = bsonConvert; + if (params[i] instanceof org.redkale.service.RetResult) { + org.redkale.convert.Convert cc = ((org.redkale.service.RetResult) params[i]).convert(); + if (cc instanceof BsonConvert) { + bcc = (BsonConvert) cc; + } + } + bcc.convertTo(writer, CompletionHandler.class.isAssignableFrom(myparamclass[i]) ? CompletionHandler.class : myparamtypes[i], params[i]); + } + final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 + final long seqid = System.nanoTime(); + final Uint128 actionid = action.actionid; + if (messageAgent != null) { //MQ模式 + final ByteArray reqbytes = writer.toByteArray(); + fillHeader(reqbytes, action, seqid, traceid, reqBodyLength); + String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; + if (targetTopic == null) { + targetTopic = this.topic; + } + MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes.getBytes()); + final String tt = targetTopic; + if (logger.isLoggable(Level.FINER)) { + message.attach(Utility.append(new Object[]{action.actionName()}, params)); + } else { + message.attach(params); + } + return messageClient.sendMessage(message).thenApply(msg -> { + if (msg == null || msg.getContent() == null) { + logger.log(Level.SEVERE, action.method + " sncp mq(params: " + convert.convertTo(params) + ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg); + return null; + } + ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); + checkResult(seqid, action, buffer); + + final int respBodyLength = buffer.getInt(); + final int retcode = buffer.getInt(); + if (retcode != 0) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params)); + throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + } + byte[] body = new byte[respBodyLength]; + buffer.get(body, 0, respBodyLength); + return body; + }); + } + final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; + CompletableFuture connFuture = transport.pollConnection(addr); + return connFuture.thenCompose(conn0 -> { + final CompletableFuture future = new CompletableFuture(); + if (conn0 == null) { + future.completeExceptionally(new RpcRemoteException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect, params=" + JsonConvert.root().convertTo(params))); + return future; + } + if (!conn0.isOpen()) { + conn0.dispose(); + future.completeExceptionally(new RpcRemoteException("sncp " + conn0.getRemoteAddress() + " cannot connect, params=" + JsonConvert.root().convertTo(params))); + return future; + } + final AsyncConnection conn = conn0; + final ByteArray array = writer.toByteArray(); + fillHeader(array, action, seqid, traceid, reqBodyLength); + + conn.write(array, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachments) { + //----------------------- 读取返回结果 ------------------------------------- + conn.read(new CompletionHandler() { + + private byte[] body; + + private int received; + + @Override + public void completed(Integer count, ByteBuffer buffer) { + try { + if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 + future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params))); + conn.offerReadBuffer(buffer); + transport.offerConnection(true, conn); + return; + } + if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 + conn.setReadBuffer(buffer); + conn.read(this); + return; + } + buffer.flip(); + if (received > 0) { + int offset = this.received; + this.received += buffer.remaining(); + buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); + if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 + buffer.clear(); + conn.setReadBuffer(buffer); + conn.read(this); + } else { + conn.offerReadBuffer(buffer); + success(); + } + return; + } + checkResult(seqid, action, buffer); + + final int respBodyLength = buffer.getInt(); + final int retcode = buffer.getInt(); + if (retcode != 0) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params)); + throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + } + + if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 + this.body = new byte[respBodyLength]; + this.received = buffer.remaining(); + buffer.get(body, 0, this.received); + buffer.clear(); + conn.setReadBuffer(buffer); + conn.read(this); + } else { + this.body = new byte[respBodyLength]; + buffer.get(body, 0, respBodyLength); + conn.offerReadBuffer(buffer); + success(); + } + } catch (Throwable e) { + future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params))); + transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(e, handlerAttach); + } + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e); + } + } + + @SuppressWarnings("unchecked") + public void success() { + future.complete(this.body); + transport.offerConnection(false, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + final BsonReader reader = bsonConvert.pollBsonReader(); + try { + reader.setBytes(this.body); + int i; + while ((i = (reader.readByte() & 0xff)) != 0) { + final Attribute attr = action.paramAttrs[i]; + attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); + } + Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); + handler.completed(rs, handlerAttach); + } catch (Exception e) { + handler.failed(e, handlerAttach); + } finally { + bsonConvert.offerBsonReader(reader); + } + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment2) { + future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); + conn.offerReadBuffer(attachment2); + transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(exc, handlerAttach); + } + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed, params=" + JsonConvert.root().convertTo(params), exc); + } + }); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); + transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(exc, handlerAttach); + } + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed, params=" + JsonConvert.root().convertTo(params), exc); + } + }); + return future; + }); + } + + private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) { + long rseqid = buffer.getLong(); + if (rseqid != seqid) { + throw new SncpException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid); + } + if (buffer.getChar() != HEADER_SIZE) { + throw new SncpException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); + } + Uint128 rserviceid = Uint128.read(buffer); + if (!rserviceid.equals(this.serviceid)) { + throw new SncpException("sncp(" + action.method + ") response.serviceid = " + serviceid + ", but request.serviceid =" + rserviceid); + } + int version = buffer.getInt(); + if (version != this.serviceVersion) { + throw new SncpException("sncp(" + action.method + ") response.serviceVersion = " + serviceVersion + ", but request.serviceVersion =" + version); + } + Uint128 raction = Uint128.read(buffer); + Uint128 actid = action.actionid; + if (!actid.equals(raction)) { + throw new SncpException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + raction + ")"); + } + buffer.getInt(); //地址 + buffer.getChar(); //端口 + } + + private void fillHeader(ByteArray buffer, SncpAction action, long seqid, String traceid, int bodyLength) { + action.header.write(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0 + } + + protected static final class SncpAction { + + protected final Uint128 actionid; + + protected final Method method; + + protected final Type resultTypes; //void 必须设为 null + + protected final Type[] paramTypes; + + protected final Class[] paramClass; + + protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 + + protected final int handlerFuncParamIndex; + + protected final int handlerAttachParamIndex; + + protected final int addressTargetParamIndex; + + protected final int addressSourceParamIndex; + + protected final int topicTargetParamIndex; + + protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture + + protected final Creator futureCreator; + + protected final SncpHeader header; + + @SuppressWarnings("unchecked") + public SncpAction(final Class clazz, Method method, Uint128 serviceid, Uint128 actionid) { + this.actionid = actionid == null ? Sncp.actionid(method) : actionid; + Type rt = TypeToken.getGenericType(method.getGenericReturnType(), clazz); + this.resultTypes = rt == void.class ? null : rt; + this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); + this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; + this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), clazz); + this.paramClass = method.getParameterTypes(); + this.method = method; + Annotation[][] anns = method.getParameterAnnotations(); + int tpoicAddrIndex = -1; + int targetAddrIndex = -1; + int sourceAddrIndex = -1; + int handlerAttachIndex = -1; + int handlerFuncIndex = -1; + boolean hasattr = false; + Attribute[] atts = new Attribute[paramTypes.length + 1]; + if (anns.length > 0) { + Class[] params = method.getParameterTypes(); + for (int i = 0; i < params.length; i++) { + if (CompletionHandler.class.isAssignableFrom(params[i])) { + if (boolReturnTypeFuture) { + throw new SncpException(method + " have both CompletionHandler and CompletableFuture"); + } + if (handlerFuncIndex >= 0) { + throw new SncpException(method + " have more than one CompletionHandler type parameter"); + } + Sncp.checkAsyncModifier(params[i], method); + handlerFuncIndex = i; + break; + } + } + for (int i = 0; i < anns.length; i++) { + if (anns[i].length > 0) { + for (Annotation ann : anns[i]) { + if (ann.annotationType() == RpcAttachment.class) { + if (handlerAttachIndex >= 0) { + throw new SncpException(method + " have more than one @RpcAttachment parameter"); + } + handlerAttachIndex = i; + } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + targetAddrIndex = i; + } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + sourceAddrIndex = i; + } else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) { + tpoicAddrIndex = i; + } + } + } + } + } + this.topicTargetParamIndex = tpoicAddrIndex; + this.addressTargetParamIndex = targetAddrIndex; + this.addressSourceParamIndex = sourceAddrIndex; + this.handlerFuncParamIndex = handlerFuncIndex; + this.handlerAttachParamIndex = handlerAttachIndex; + this.paramAttrs = hasattr ? atts : null; + this.header = new SncpHeader(null, serviceid, actionid); + if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { + throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); + } + } + + public String actionName() { + return method.getDeclaringClass().getSimpleName() + "." + method.getName(); + } + + @Override + public String toString() { + return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}"; + } + } +} diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index 210a7308e..e0c6c5a01 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -6,12 +6,11 @@ package org.redkale.net.sncp; import java.io.Serializable; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.logging.*; import org.redkale.convert.bson.BsonConvert; import org.redkale.net.Request; -import static org.redkale.net.sncp.Sncp.HEADER_SIZE; +import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; import org.redkale.util.Uint128; /** @@ -35,17 +34,9 @@ public class SncpRequest extends Request { protected final BsonConvert convert; - private long seqid; - protected int readState = READ_STATE_ROUTE; - private int serviceVersion; - - private Uint128 serviceid; - - private Uint128 actionid; - - private int bodyLength; + private SncpHeader header; private int bodyOffset; @@ -53,8 +44,6 @@ public class SncpRequest extends Request { private byte[] body; - private final byte[] addrBytes = new byte[6]; - protected SncpRequest(SncpContext context) { super(context); this.convert = context.getBsonConvert(); @@ -67,38 +56,33 @@ public class SncpRequest extends Request { if (buffer.remaining() < HEADER_SIZE) { return HEADER_SIZE - buffer.remaining(); //小于60 } - this.seqid = buffer.getLong(); //8 - if (buffer.getChar() != HEADER_SIZE) { //2 + this.header = new SncpHeader(); + if (!this.header.read(buffer)) { if (context.getLogger().isLoggable(Level.FINEST)) { context.getLogger().finest("sncp buffer header.length not " + HEADER_SIZE); } return -1; } - this.serviceid = Uint128.read(buffer); //16 - this.serviceVersion = buffer.getInt(); //4 - this.actionid = Uint128.read(buffer); //16 - buffer.get(addrBytes); //ipaddr //6 - this.bodyLength = buffer.getInt(); //4 - - if (buffer.getInt() != 0) { //4 retcode + if (this.header.getRetcode() != 0) { // retcode if (context.getLogger().isLoggable(Level.FINEST)) { context.getLogger().finest("sncp buffer header.retcode not 0"); } return -1; } - this.body = new byte[this.bodyLength]; + this.body = new byte[this.header.getBodyLength()]; this.readState = READ_STATE_BODY; } //---------------------body---------------------------------- if (this.readState == READ_STATE_BODY) { - if (this.bodyLength == 0) { + int bodyLength = this.header.getBodyLength(); + if (bodyLength == 0) { this.readState = READ_STATE_END; - if (this.seqid == 0 && this.serviceid == Uint128.ZERO && this.actionid == Uint128.ZERO) { + if (this.header.getSeqid() == 0 && this.header.getServiceid() == Uint128.ZERO && this.header.getActionid() == Uint128.ZERO) { this.ping = true; } return 0; } - int len = Math.min(this.bodyLength, buffer.remaining()); + int len = Math.min(bodyLength, buffer.remaining()); buffer.get(body, 0, len); this.bodyOffset = len; int rs = bodyLength - len; @@ -112,7 +96,7 @@ public class SncpRequest extends Request { @Override protected Serializable getRequestid() { - return seqid; + return header.getSeqid(); } @Override @@ -126,24 +110,16 @@ public class SncpRequest extends Request { @Override public String toString() { - return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid - + ",serviceVersion=" + this.serviceVersion + ",serviceid=" + this.serviceid - + ",actionid=" + this.actionid + ",bodyLength=" + this.bodyLength - + ",bodyOffset=" + this.bodyOffset + ",remoteAddress=" + getRemoteAddress() + "}"; + return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + "}"; } @Override protected void recycle() { - this.seqid = 0; this.readState = READ_STATE_ROUTE; - this.serviceid = null; - this.serviceVersion = 0; - this.actionid = null; - this.bodyLength = 0; + this.header = null; this.bodyOffset = 0; this.body = null; this.ping = false; - this.addrBytes[0] = 0; super.recycle(); } @@ -155,28 +131,8 @@ public class SncpRequest extends Request { return body; } - public long getSeqid() { - return seqid; - } - - public int getServiceVersion() { - return serviceVersion; - } - - public Uint128 getServiceid() { - return serviceid; - } - - public Uint128 getActionid() { - return actionid; - } - - public InetSocketAddress getRemoteAddress() { - if (addrBytes[0] == 0) { - return null; - } - return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]), - ((0xff00 & (addrBytes[4] << 8)) | (0xff & addrBytes[5]))); + public SncpHeader getHeader() { + return header; } } diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index 88d9f997c..fa66c0fa0 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -7,8 +7,8 @@ package org.redkale.net.sncp; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; -import static org.redkale.net.sncp.Sncp.HEADER_SIZE; -import org.redkale.util.*; +import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; +import org.redkale.util.ByteArray; /** * @@ -80,32 +80,8 @@ public class SncpResponse extends Response { } protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) { - fillRespHeader(buffer, request.getSeqid(), request.getServiceid(), request.getServiceVersion(), - request.getActionid(), request.getTraceid(), this.addrBytes, this.addrPort, bodyLength, retcode); - } - - protected static void fillRespHeader(ByteArray buffer, long seqid, Uint128 serviceid, int serviceVersion, - Uint128 actionid, String traceid, byte[] addrBytes, int addrPort, int bodyLength, int retcode) { - //---------------------head---------------------------------- - int offset = 0; - buffer.putLong(offset, seqid); - offset += 8; - buffer.putChar(offset, (char) HEADER_SIZE); - offset += 2; - buffer.putUint128(offset, serviceid); - offset += 16; - buffer.putInt(offset, serviceVersion); - offset += 4; - buffer.putUint128(offset, actionid); - offset += 16; - buffer.put(offset, addrBytes); - offset += addrBytes.length; //4 - buffer.putChar(offset, (char) addrPort); - offset += 2; - buffer.putInt(offset, bodyLength); - offset += 4; - buffer.putInt(offset, retcode); - offset += 4; + SncpHeader header = request.getHeader(); + header.write(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode); } } diff --git a/src/main/java/org/redkale/util/ByteArray.java b/src/main/java/org/redkale/util/ByteArray.java index 414710608..5bb57c44c 100644 --- a/src/main/java/org/redkale/util/ByteArray.java +++ b/src/main/java/org/redkale/util/ByteArray.java @@ -941,11 +941,12 @@ public final class ByteArray implements ByteTuple { * 写入ByteBuffer指定长度的数据 * * @param buffer 数据 - * @param len 指定长度 + * @param length 指定长度 * * @return ByteArray */ - public ByteArray put(ByteBuffer buffer, int len) { + public ByteArray put(ByteBuffer buffer, int length) { + int len = Math.min(buffer.remaining(), length); if (len < 1) { return this; } diff --git a/src/main/java/org/redkale/util/Uint128.java b/src/main/java/org/redkale/util/Uint128.java index a4be32448..372aa29fb 100644 --- a/src/main/java/org/redkale/util/Uint128.java +++ b/src/main/java/org/redkale/util/Uint128.java @@ -23,11 +23,11 @@ public final class Uint128 extends Number implements Comparable { protected final byte[] value; - protected Uint128(long v1, long v2) { //暂时不用 - this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32), - (byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32), - (byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2}; - } +// private Uint128(long v1, long v2) { //暂时不用 +// this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32), +// (byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32), +// (byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2}; +// } private Uint128(byte[] bytes) { if (bytes == null || bytes.length != 16) { diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index 17de8e72c..2415a772b 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -120,7 +120,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - for (Method method : SncpClient.parseMethodActions(service.getClass()).values()) { + for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) { System.out.println(method); } System.out.println("-----------------------------------"); @@ -129,7 +129,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - for (Method method : SncpClient.parseMethodActions(service.getClass()).values()) { + for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) { System.out.println(method); } System.out.println("-----------------------------------"); @@ -138,7 +138,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - for (Method method : SncpClient.parseMethodActions(service.getClass()).values()) { + for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) { System.out.println(method); } System.out.println("-----------------------------------"); diff --git a/src/test/java/org/redkale/test/sncp/_DynLocalSncpTestService.java b/src/test/java/org/redkale/test/sncp/_DynLocalSncpTestService.java index d44b5112b..388ebea7c 100644 --- a/src/test/java/org/redkale/test/sncp/_DynLocalSncpTestService.java +++ b/src/test/java/org/redkale/test/sncp/_DynLocalSncpTestService.java @@ -15,6 +15,6 @@ import org.redkale.annotation.ResourceType; @ResourceType(SncpTestIService.class) public class _DynLocalSncpTestService extends SncpTestServiceImpl { - private SncpClient _redkale_client; + private SncpOldClient _redkale_client; }