From 06773ccdc03f06897db2d8c1e6003d589970ff07 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 29 May 2017 14:03:11 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 63 +--- src/org/redkale/boot/GroupInfo.java | 80 ----- src/org/redkale/boot/NodeHttpServer.java | 2 +- src/org/redkale/boot/NodeServer.java | 145 ++++---- src/org/redkale/net/TransportFactory.java | 135 +++++++ src/org/redkale/net/TransportGroupInfo.java | 103 ++++++ src/org/redkale/net/http/WebSocket.java | 6 +- src/org/redkale/net/sncp/Sncp.java | 380 +++----------------- src/org/redkale/net/sncp/SncpClient.java | 367 +++++++++++-------- 9 files changed, 599 insertions(+), 682 deletions(-) delete mode 100644 src/org/redkale/boot/GroupInfo.java create mode 100644 src/org/redkale/net/TransportFactory.java create mode 100644 src/org/redkale/net/TransportGroupInfo.java diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index cf3a46259..a5071d659 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -5,6 +5,7 @@ */ package org.redkale.boot; +import org.redkale.net.TransportGroupInfo; import java.io.*; import java.lang.reflect.*; import java.net.*; @@ -89,12 +90,6 @@ public final class Application { */ public static final String RESNAME_SERVER_ROOT = Server.RESNAME_SERVER_ROOT; - //每个地址对应的Group名 - final Map globalNodes = new HashMap<>(); - - //协议地址的Group集合 - final Map globalGroups = new HashMap<>(); - //本地IP地址 final InetAddress localAddress; @@ -107,14 +102,8 @@ public final class Application { //NodeServer 资源 final List servers = new CopyOnWriteArrayList<>(); - //传输端的ByteBuffer对象池 - final ObjectPool transportBufferPool; - - //传输端的线程池 - final ExecutorService transportExecutor; - - //传输端的ChannelGroup - final AsynchronousChannelGroup transportChannelGroup; + //传输端的TransportFactory + final TransportFactory transportFactory; //全局根ResourceFactory final ResourceFactory resourceFactory = ResourceFactory.root(); @@ -271,18 +260,21 @@ public final class Application { logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); } } - this.transportBufferPool = transportPool; - this.transportExecutor = transportExec; - this.transportChannelGroup = transportGroup; + this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup); } public ResourceFactory getResourceFactory() { return resourceFactory; } + public TransportFactory getTransportFactory() { + return transportFactory; + } + // public WatchFactory getWatchFactory() { // return watchFactory; // } + public List getNodeServers() { return new ArrayList<>(servers); } @@ -421,25 +413,18 @@ public final class Application { final AnyValue resources = config.getAnyValue("resources"); if (resources != null) { //------------------------------------------------------------------------ - for (AnyValue conf : resources.getAnyValues("group")) { final String group = conf.getValue("name", ""); final String protocol = conf.getValue("protocol", Transport.DEFAULT_PROTOCOL).toUpperCase(); if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) { throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol")); } - GroupInfo ginfo = globalGroups.get(group); - if (ginfo == null) { - ginfo = new GroupInfo(group, protocol, conf.getValue("subprotocol", ""), new LinkedHashSet<>()); - globalGroups.put(group, ginfo); - } + TransportGroupInfo ginfo = new TransportGroupInfo(group, protocol, conf.getValue("subprotocol", ""), new LinkedHashSet<>()); for (AnyValue node : conf.getAnyValues("node")) { final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port")); - ginfo.addrs.add(addr); - String oldgroup = globalNodes.get(addr); - if (oldgroup != null) throw new RuntimeException(addr + " had one more group " + (globalNodes.get(addr))); - globalNodes.put(addr, group); + ginfo.putAddress(addr); } + transportFactory.addGroupInfo(ginfo); } } //------------------------------------------------------------------------ @@ -696,17 +681,6 @@ public final class Application { System.exit(0); } - Set findSncpGroups(Transport sameGroupTransport, Collection diffGroupTransports) { - Set gs = new HashSet<>(); - if (sameGroupTransport != null) gs.add(sameGroupTransport.getName()); - if (diffGroupTransports != null) { - for (Transport t : diffGroupTransports) { - gs.add(t.getName()); - } - } - return gs; - } - NodeSncpServer findNodeSncpServer(final InetSocketAddress sncpAddr) { for (NodeServer node : servers) { if (node.isSNCP() && sncpAddr.equals(node.getSncpAddress())) { @@ -716,11 +690,6 @@ public final class Application { return null; } - GroupInfo findGroupInfo(String group) { - if (group == null) return null; - return globalGroups.get(group); - } - private void shutdown() throws Exception { servers.stream().forEach((server) -> { try { @@ -748,13 +717,7 @@ public final class Application { logger.log(Level.FINER, source.getClass() + " close CacheSource erroneous", e); } } - if (this.transportChannelGroup != null) { - try { - this.transportChannelGroup.shutdownNow(); - } catch (Exception e) { - logger.log(Level.FINER, "close transportChannelGroup erroneous", e); - } - } + this.transportFactory.shutdownNow(); } private static AnyValue load(final InputStream in0) { diff --git a/src/org/redkale/boot/GroupInfo.java b/src/org/redkale/boot/GroupInfo.java deleted file mode 100644 index af19499c4..000000000 --- a/src/org/redkale/boot/GroupInfo.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.boot; - -import java.net.InetSocketAddress; -import java.util.*; -import org.redkale.convert.json.JsonConvert; - -/** - * 协议地址组合对象, 对应application.xml 中 resources->group 节点信息 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public class GroupInfo { - - protected String name; //地址 - - protected String protocol; //协议 取值范围: TCP、UDP - - protected String subprotocol; //子协议,预留使用 - - protected Set addrs; //地址列表, 对应 resources->group->node节点信息 - - public GroupInfo() { - } - - public GroupInfo(String name, String protocol, String subprotocol, Set addrs) { - this.name = name; - this.protocol = protocol; - this.subprotocol = subprotocol; - this.addrs = addrs; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getProtocol() { - return protocol; - } - - public void setProtocol(String protocol) { - this.protocol = protocol; - } - - public String getSubprotocol() { - return subprotocol; - } - - public void setSubprotocol(String subprotocol) { - this.subprotocol = subprotocol; - } - - public Set getAddrs() { - return addrs; - } - - public Set copyAddrs() { - return addrs == null ? null : new LinkedHashSet<>(addrs); - } - - public void setAddrs(Set addrs) { - this.addrs = addrs; - } - - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } -} diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 29b41af8f..fb7ea0560 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -110,7 +110,7 @@ public class NodeHttpServer extends NodeServer { synchronized (regFactory) { Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class); if (nodeService == null) { - nodeService = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), WebSocketNodeService.class, (InetSocketAddress) null, (String) null, (Set) null, (AnyValue) null, (Transport) null, (Collection) null); + nodeService = Sncp.createLocalService(resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); } resourceFactory.inject(nodeService, self); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 08c53d241..9c6dfa9e4 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -14,7 +14,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.function.*; import java.util.logging.*; -import java.util.stream.Collectors; import javax.annotation.Resource; import javax.persistence.Transient; import static org.redkale.boot.Application.*; @@ -129,7 +128,7 @@ public abstract class NodeServer { if (isSNCP()) { // SNCP协议 String host = this.serverConf.getValue("host", "0.0.0.0").replace("0.0.0.0", ""); this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port")); - this.sncpGroup = application.globalNodes.get(this.sncpAddress); + this.sncpGroup = application.transportFactory.findGroupName(this.sncpAddress); //单向SNCP服务不需要对等group //if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found info"); } @@ -189,6 +188,7 @@ public abstract class NodeServer { final NodeServer self = this; //--------------------------------------------------------------------------------------------- final ResourceFactory appResFactory = application.getResourceFactory(); + final TransportFactory appTranFactory = application.getTransportFactory(); final AnyValue resources = application.config.getAnyValue("resources"); final Map cacheResource = new HashMap<>(); //final Map dataResources = new HashMap<>(); @@ -245,13 +245,13 @@ public abstract class NodeServer { appResFactory.register(resourceName, DataSource.class, source); SncpClient client = Sncp.getSncpClient((Service) src); - Transport sameGroupTransport = Sncp.getSameGroupTransport((Service) src); - List diffGroupTransports = Arrays.asList(Sncp.getDiffGroupTransports((Service) src)); final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); if ((src instanceof DataSource) && sncpAddr != null && resourceFactory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); - Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); - Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, DataCacheListenerService.class, sncpAddr, sncpServer.getSncpGroup(), gs, Sncp.getConf((Service) src), sameGroupTransport, diffGroupTransports); + final Set groups = new HashSet<>(); + if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup()); + if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups()); + Service cacheListenerService = Sncp.createLocalService(resourceName, DataCacheListenerService.class, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf((Service) src)); appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService); localServices.add(cacheListenerService); sncpServer.consumerAccept(cacheListenerService); @@ -275,15 +275,13 @@ public abstract class NodeServer { if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource final Service srcService = (Service) src; SncpClient client = Sncp.getSncpClient(srcService); - Transport sameGroupTransport = Sncp.getSameGroupTransport(srcService); - Transport[] dts = Sncp.getDiffGroupTransports((Service) src); - List diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts); final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); final AnyValue sourceConf = cacheResource.get(resourceName); final Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type")); - @SuppressWarnings("unchecked") - final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, getExecutor(), appResFactory, (Class) sourceType, - sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports); + final Set groups = new HashSet<>(); + if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup()); + if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups()); + final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; @@ -301,7 +299,6 @@ public abstract class NodeServer { if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); - Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); sncpServer.getSncpServer().addSncpServlet((Service) source); //logger.info("[" + Thread.currentThread().getName() + "] Load Service " + source); } @@ -323,7 +320,8 @@ public abstract class NodeServer { final String threadName = "[" + Thread.currentThread().getName() + "] "; final Set> entrys = (Set) serviceFilter.getAllFilterEntrys(); ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory; - + final ResourceFactory appResourceFactory = application.getResourceFactory(); + final TransportFactory appTransportFactory = application.getTransportFactory(); for (FilterEntry entry : entrys) { //service实现类 final Class serviceImplClass = entry.getType(); if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过 @@ -354,13 +352,12 @@ public abstract class NodeServer { Service service; boolean ws = src instanceof WebSocketServlet; if (ws || localed) { //本地模式 - service = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), serviceImplClass, - NodeServer.this.sncpAddress, NodeServer.this.sncpGroup, groups, entry.getProperty(), loadTransport(NodeServer.this.sncpGroup), loadTransports(groups)); + service = Sncp.createLocalService(resourceName, serviceImplClass, appResourceFactory, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } else { - service = Sncp.createRemoteService(resourceName, getExecutor(), serviceImplClass, NodeServer.this.sncpAddress, null, groups, entry.getProperty(), loadTransport(groups)); + service = Sncp.createRemoteService(resourceName, serviceImplClass, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } if (SncpClient.parseMethod(serviceImplClass).isEmpty()) return; //class没有可用的方法, 通常为BaseService - //final ServiceWrapper wrapper = new ServiceWrapper(serviceImplClass, service, resourceName, localed ? NodeServer.this.sncpGroup : null, groups, entry.getProperty()); + final Class restype = Sncp.getResourceType(service); if (rf.find(resourceName, restype) == null) { regFactory.register(resourceName, restype, service); @@ -447,62 +444,62 @@ public abstract class NodeServer { maxClassNameLength = Math.max(maxClassNameLength, Sncp.getResourceType(y).getName().length() + 1); } - protected List loadTransports(final HashSet groups) { - if (groups == null) return null; - final List transports = new ArrayList<>(); - for (String group : groups) { - if (this.sncpGroup == null || !this.sncpGroup.equals(group)) { - transports.add(loadTransport(group)); - } - } - return transports; - } - - protected Transport loadTransport(final HashSet groups) { - if (groups == null || groups.isEmpty()) return null; - final String groupid = new ArrayList<>(groups).stream().sorted().collect(Collectors.joining(";")); //按字母排列顺序 - Transport transport = application.resourceFactory.find(groupid, Transport.class); - if (transport != null) return transport; - final List transports = new ArrayList<>(); - for (String group : groups) { - transports.add(loadTransport(group)); - } - Set addrs = new HashSet(); - transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses()))); - Transport first = transports.get(0); - GroupInfo ginfo = application.findGroupInfo(first.getName()); - Transport newTransport = new Transport(groupid, ginfo.getProtocol(), - ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); - synchronized (application.resourceFactory) { - transport = application.resourceFactory.find(groupid, Transport.class); - if (transport == null) { - transport = newTransport; - application.resourceFactory.register(groupid, transport); - } - } - return transport; - } - - protected Transport loadTransport(final String group) { - if (group == null) return null; - Transport transport; - synchronized (application.resourceFactory) { - transport = application.resourceFactory.find(group, Transport.class); - if (transport != null) { - if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) { - throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress()); - } - return transport; - } - GroupInfo ginfo = application.findGroupInfo(group); - Set addrs = ginfo.copyAddrs(); - if (addrs == null) throw new RuntimeException("Not found = " + group + " on "); - transport = new Transport(group, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); - application.resourceFactory.register(group, transport); - } - return transport; - } - + /* + * protected List loadTransports(final HashSet groups) { + * if (groups == null) return null; + * final List transports = new ArrayList<>(); + * for (String group : groups) { + * if (this.sncpGroup == null || !this.sncpGroup.equals(group)) { + * transports.add(loadTransport(group)); + * } + * } + * return transports; + * } + * protected Transport loadTransport(final HashSet groups) { + * if (groups == null || groups.isEmpty()) return null; + * final String groupid = new ArrayList<>(groups).stream().sorted().collect(Collectors.joining(";")); //按字母排列顺序 + * Transport transport = application.resourceFactory.find(groupid, Transport.class); + * if (transport != null) return transport; + * final List transports = new ArrayList<>(); + * for (String group : groups) { + * transports.add(loadTransport(group)); + * } + * Set addrs = new HashSet(); + * transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses()))); + * Transport first = transports.get(0); + * TransportGroupInfo ginfo = application.transportFactory.findGroupInfo(first.getName()); + * Transport newTransport = new Transport(groupid, ginfo.getProtocol(), + * ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); + * synchronized (application.resourceFactory) { + * transport = application.resourceFactory.find(groupid, Transport.class); + * if (transport == null) { + * transport = newTransport; + * application.resourceFactory.register(groupid, transport); + * } + * } + * return transport; + * } + * + * protected Transport loadTransport(final String group) { + * if (group == null) return null; + * Transport transport; + * synchronized (application.resourceFactory) { + * transport = application.resourceFactory.find(group, Transport.class); + * if (transport != null) { + * if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) { + * throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress()); + * } + * return transport; + * } + * TransportGroupInfo ginfo = application.findGroupInfo(group); + * Set addrs = ginfo.copyAddresses(); + * if (addrs == null) throw new RuntimeException("Not found = " + group + " on "); + * transport = new Transport(group, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); + * application.resourceFactory.register(group, transport); + * } + * return transport; + * } + */ protected abstract ClassFilter createFilterClassFilter(); protected abstract ClassFilter createServletClassFilter(); diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java new file mode 100644 index 000000000..886d2242e --- /dev/null +++ b/src/org/redkale/net/TransportFactory.java @@ -0,0 +1,135 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousChannelGroup; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.logging.*; +import org.redkale.util.ObjectPool; + +/** + * + * @author zhangjx + */ +public class TransportFactory { + + protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName()); + + //传输端的线程池 + protected final ExecutorService executor; + + //传输端的ByteBuffer对象池 + protected final ObjectPool bufferPool; + + //传输端的ChannelGroup + protected final AsynchronousChannelGroup channelGroup; + + //每个地址对应的Group名 + protected final Map groupAddrs = new HashMap<>(); + + //协议地址的Group集合 + protected final Map groupInfos = new HashMap<>(); + + public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { + this.executor = executor; + this.bufferPool = bufferPool; + this.channelGroup = channelGroup; + } + + public String findGroupName(InetSocketAddress addr) { + if (addr == null) return null; + return groupAddrs.get(addr); + } + + public TransportGroupInfo findGroupInfo2(String group) { + if (group == null) return null; + return groupInfos.get(group); + } + + public boolean addGroupInfo(TransportGroupInfo info) { + if (info == null) throw new RuntimeException("TransportGroupInfo can not null"); + if (info.addresses == null) throw new RuntimeException("TransportGroupInfo.addresses can not null"); + if (!checkName(info.name)) throw new RuntimeException("Transport.group.name only 0-9 a-z A-Z _ cannot begin 0-9"); + TransportGroupInfo old = groupInfos.get(info.name); + if (old != null && !old.protocol.equals(info.protocol)) throw new RuntimeException("Transport.group.name repeat but protocol is different"); + if (old != null && !old.subprotocol.equals(info.subprotocol)) throw new RuntimeException("Transport.group.name repeat but subprotocol is different"); + for (InetSocketAddress addr : info.addresses) { + if (!groupAddrs.getOrDefault(addr, info.name).equals(info.name)) throw new RuntimeException(addr + " repeat but different group.name"); + } + if (old == null) { + groupInfos.put(info.name, info); + } else { + old.putAddress(info.addresses); + } + for (InetSocketAddress addr : info.addresses) { + groupAddrs.put(addr, info.name); + } + return true; + } + + public Transport loadSameGroupTransport(InetSocketAddress sncpAddress) { + return loadTransport(groupAddrs.get(sncpAddress), sncpAddress); + } + + public Transport[] loadDiffGroupTransports(InetSocketAddress sncpAddress, final Set diffGroups) { + if (diffGroups == null) return null; + final String sncpGroup = groupAddrs.get(sncpAddress); + final List transports = new ArrayList<>(); + for (String group : diffGroups) { + if (sncpGroup == null || !sncpGroup.equals(group)) { + transports.add(loadTransport(group, sncpAddress)); + } + } + return transports.toArray(new Transport[transports.size()]); + } + + public Transport loadRemoteTransport(InetSocketAddress sncpAddress, final Set groups) { + if (groups == null) return null; + Set addresses = new HashSet<>(); + TransportGroupInfo info = null; + for (String group : groups) { + info = groupInfos.get(group); + if (info == null) continue; + addresses.addAll(info.addresses); + } + if (info == null) return null; + if (sncpAddress != null) addresses.remove(sncpAddress); + return new Transport("remotes", info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses); + } + + private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) { + if (groupName == null) return null; + TransportGroupInfo info = groupInfos.get(groupName); + if (info == null) return null; + return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses); + } + + public ExecutorService getExecutor() { + return executor; + } + + public void shutdownNow() { + try { + this.channelGroup.shutdownNow(); + } catch (Exception e) { + logger.log(Level.FINER, "close transportChannelGroup erroneous", e); + } + } + + private static boolean checkName(String name) { //不能含特殊字符 + if (name.isEmpty()) return false; + if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false; + for (char ch : name.toCharArray()) { + if (!((ch >= '0' && ch <= '9') || ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符 + return false; + } + } + return true; + } +} diff --git a/src/org/redkale/net/TransportGroupInfo.java b/src/org/redkale/net/TransportGroupInfo.java new file mode 100644 index 000000000..ed0073f2d --- /dev/null +++ b/src/org/redkale/net/TransportGroupInfo.java @@ -0,0 +1,103 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.net.InetSocketAddress; +import java.util.*; +import org.redkale.convert.json.JsonConvert; + +/** + * 协议地址组合对象, 对应application.xml 中 resources->group 节点信息 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class TransportGroupInfo { + + protected String name; //地址 + + protected String protocol; //协议 取值范围: TCP、UDP + + protected String subprotocol; //子协议,预留使用 + + protected Set addresses; //地址列表, 对应 resources->group->node节点信息 + + public TransportGroupInfo() { + } + + public TransportGroupInfo(String name, String protocol, String subprotocol, Set addrs) { + Objects.requireNonNull(name, "Transport.group.name can not null"); + Objects.requireNonNull(protocol, "Transport.group.protocol can not null"); + Objects.requireNonNull(subprotocol, "Transport.group.subprotocol can not null"); + this.name = name; + this.protocol = protocol; + this.subprotocol = subprotocol; + this.addresses = addrs; + } + + public String getName() { + return name; + } + + public void setName(String name) { + Objects.requireNonNull(name, "Transport.group.name can not null"); + this.name = name; + } + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + Objects.requireNonNull(protocol, "Transport.group.protocol can not null"); + this.protocol = protocol; + } + + public String getSubprotocol() { + return subprotocol; + } + + public void setSubprotocol(String subprotocol) { + Objects.requireNonNull(subprotocol, "Transport.group.subprotocol can not null"); + this.subprotocol = subprotocol; + } + + public Set getAddresses() { + return addresses; + } + + public Set copyAddresses() { + return addresses == null ? null : new LinkedHashSet<>(addresses); + } + + public void setAddresses(Set addresses) { + this.addresses = addresses; + } + + public boolean containsAddress(InetSocketAddress addr) { + if (this.addresses == null) return false; + return this.addresses.contains(addr); + } + + public void putAddress(InetSocketAddress addr) { + if (addr == null) return; + if (this.addresses == null) this.addresses = new HashSet<>(); + this.addresses.add(addr); + } + + public void putAddress(Set addrs) { + if (addrs == null) return; + if (this.addresses == null) this.addresses = new HashSet<>(); + this.addresses.addAll(addrs); + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } +} diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 5dc515b9b..78214c124 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -184,7 +184,7 @@ public abstract class WebSocket { */ CompletableFuture sendPacket(WebSocketPacket packet) { CompletableFuture rs = this._runner.sendMessage(packet); - if (_engine.finest) _engine.logger.finest("userid:" + userid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); + if (_engine.finest) _engine.logger.finest("userid:" + userid() + " send websocket message(" + packet + ")" + " on " + this); return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; } @@ -204,7 +204,7 @@ public abstract class WebSocket { return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids)); } CompletableFuture rs = _engine.node.sendMessage(message, last, userids); - if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + _jsonConvert.convertTo(message) + ")"); return rs; } @@ -233,7 +233,7 @@ public abstract class WebSocket { return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last)); } CompletableFuture rs = _engine.node.broadcastMessage(message, last); - if (_engine.finest) _engine.logger.finest("broadcast send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + _jsonConvert.convertTo(message) + ")"); return rs; } diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 1729fb58e..785b130b2 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -10,15 +10,12 @@ import java.lang.reflect.*; import java.net.InetSocketAddress; import java.security.*; import java.util.*; -import java.util.function.Consumer; import javax.annotation.Resource; import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.Opcodes.*; import jdk.internal.org.objectweb.asm.Type; -import org.redkale.convert.bson.BsonConvert; -import org.redkale.convert.json.JsonConvert; -import org.redkale.net.Transport; +import org.redkale.net.*; import org.redkale.net.sncp.SncpClient.SncpAction; import org.redkale.service.*; import org.redkale.util.*; @@ -99,14 +96,8 @@ public abstract class Sncp { } public static Class getServiceType(Service service) { - if (service == null) return null; - try { - Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_service_type"); - ts.setAccessible(true); - return (Class) ts.get(service); - } catch (Exception e) { - throw new RuntimeException(service + " not found " + FIELDPREFIX + "_service_type"); - } + ResourceType rt = service.getClass().getAnnotation(ResourceType.class); + return rt == null ? service.getClass() : rt.value(); } public static Class getResourceType(Service service) { @@ -115,28 +106,6 @@ public abstract class Sncp { return type == null ? getServiceType(service) : type.value(); } - public static String getSncpGroup(Service service) { - if (service == null) return null; - try { - Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_sncpGroup"); - ts.setAccessible(true); - return (String) ts.get(service); - } catch (Exception e) { - throw new RuntimeException(service + " not found " + FIELDPREFIX + "_sncpGroup"); - } - } - - public static Set getGroups(Service service) { - if (service == null) return null; - try { - Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_groups"); - ts.setAccessible(true); - return (Set) ts.get(service); - } catch (Exception e) { - throw new RuntimeException(service + " not found " + FIELDPREFIX + "_groups"); - } - } - public static AnyValue getConf(Service service) { if (service == null) return null; try { @@ -159,28 +128,6 @@ public abstract class Sncp { } } - public static Transport getSameGroupTransport(Service service) { - if (service == null) return null; - try { - Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_sameGroupTransport"); - ts.setAccessible(true); - return (Transport) ts.get(service); - } catch (Exception e) { - throw new RuntimeException(service + " not found " + FIELDPREFIX + "_sameGroupTransport"); - } - } - - public static Transport[] getDiffGroupTransports(Service service) { - if (service == null) return null; - try { - Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_diffGroupTransports"); - ts.setAccessible(true); - return (Transport[]) ts.get(service); - } catch (Exception e) { - throw new RuntimeException(service + " not found " + FIELDPREFIX + "_diffGroupTransports"); - } - } - static void checkAsyncModifier(Class param, Method method) { if (param == AsyncHandler.class) return; if (Modifier.isFinal(param.getModifiers())) { @@ -339,13 +286,8 @@ public abstract class Sncp { final String supDynName = serviceImplClass.getName().replace('.', '/'); final String clientName = SncpClient.class.getName().replace('.', '/'); final String clientDesc = Type.getDescriptor(SncpClient.class); - final String bsonConvertDesc = Type.getDescriptor(BsonConvert.class); - final String jsonConvertDesc = Type.getDescriptor(JsonConvert.class); - final String stringDesc = Type.getDescriptor(String.class); final String anyValueDesc = Type.getDescriptor(AnyValue.class); final String sncpDynDesc = Type.getDescriptor(SncpDyn.class); - final String transportDesc = Type.getDescriptor(Transport.class); - final String transportsDesc = Type.getDescriptor(Transport[].class); ClassLoader loader = Sncp.class.getClassLoader(); String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + LOCALPREFIX + serviceImplClass.getSimpleName(); if (!name.isEmpty()) { @@ -389,59 +331,14 @@ public abstract class Sncp { av0.visit("value", Type.getType(Type.getDescriptor(rty == null ? serviceImplClass : rty.value()))); av0.visitEnd(); } - { - fv = cw.visitField(ACC_PRIVATE + ACC_FINAL + ACC_STATIC, FIELDPREFIX + "_service_type", "Ljava/lang/Class;", null, null); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_bsonConvert", bsonConvertDesc, null, null); - av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true); - av0.visitEnd(); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_jsonConvert", jsonConvertDesc, null, null); - av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true); - av0.visitEnd(); - fv.visitEnd(); - } { fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_conf", anyValueDesc, null, null); fv.visitEnd(); } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_sncpGroup", stringDesc, null, null); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_groups", "Ljava/util/Set;", "Ljava/util/Set;", null); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_sameGroupTransport", transportDesc, null, null); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_diffGroupTransports", transportsDesc, null, null); - fv.visitEnd(); - } { fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_client", clientDesc, null, null); fv.visitEnd(); } - - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_selfstring", "Ljava/lang/String;", null, null); - fv.visitEnd(); - } - {//静态构造函数 - mv = new AsmMethodVisitor(cw.visitMethod(ACC_STATIC, "", "()V", null, null)); - mv.visitLdcInsn(Type.getType(Type.getDescriptor(serviceImplClass))); - mv.visitFieldInsn(PUTSTATIC, newDynName, FIELDPREFIX + "_service_type", "Ljava/lang/Class;"); - mv.visitInsn(RETURN); - mv.visitMaxs(1, 0); - mv.visitEnd(); - } { //构造函数 mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "", "()V", null, null)); //mv.setDebug(true); @@ -454,16 +351,18 @@ public abstract class Sncp { { // toString() mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_selfstring", "Ljava/lang/String;"); + mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); Label l1 = new Label(); mv.visitJumpInsn(IFNONNULL, l1); mv.visitVarInsn(ALOAD, 0); - mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "toString", "()Ljava/lang/String;", false); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Object", "getClass", "()Ljava/lang/Class;", false); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Class", "getName", "()Ljava/lang/String;", false); Label l2 = new Label(); mv.visitJumpInsn(GOTO, l2); mv.visitLabel(l1); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_selfstring", "Ljava/lang/String;"); + mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); + mv.visitMethodInsn(INVOKEVIRTUAL, clientName, "toSimpleString", "()Ljava/lang/String;", false); mv.visitLabel(l2); mv.visitInsn(ARETURN); mv.visitMaxs(1, 1); @@ -639,14 +538,7 @@ public abstract class Sncp { mv.visitVarInsn(ALOAD, 0);//调用 _client mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); - mv.visitVarInsn(ALOAD, 0); //传递 _bsonConvert - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_bsonConvert", bsonConvertDesc); - mv.visitVarInsn(ALOAD, 0); //传递 _jsonConvert - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_jsonConvert", jsonConvertDesc); - mv.visitVarInsn(ALOAD, 0); //传递 _sameGroupTransport - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_sameGroupTransport", transportDesc); - - final int preparams = 3; //调用selfrunnable之前的参数个数; _client/_bsonConvert/_jsonConvert/_sameGroupTransport + final int preparams = 3; //调用selfrunnable之前的参数个数; _client if (index <= 5) { //第几个 SncpAction mv.visitInsn(ICONST_0 + index); @@ -706,7 +598,7 @@ public abstract class Sncp { } mv.visitInsn(AASTORE); } - mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemoteSameGroup" : "remoteSameGroup", "(" + bsonConvertDesc + jsonConvertDesc + transportDesc + "I[Ljava/lang/Object;)V", false); + mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemoteSameGroup" : "remoteSameGroup", "(I[Ljava/lang/Object;)V", false); mv.visitLabel(sameLabel); //---------------------------- 调用diffrun --------------------------------- mv.visitVarInsn(ILOAD, 3); //读取 diffrunnable @@ -715,12 +607,6 @@ public abstract class Sncp { mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); - mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_bsonConvert", bsonConvertDesc); - mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_jsonConvert", jsonConvertDesc); - mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_diffGroupTransports", transportsDesc); if (index <= 5) { //第几个 SncpAction mv.visitInsn(ICONST_0 + index); @@ -780,7 +666,7 @@ public abstract class Sncp { } mv.visitInsn(AASTORE); } - mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemoteDiffGroup" : "remoteDiffGroup", "(" + bsonConvertDesc + jsonConvertDesc + transportsDesc + "I[Ljava/lang/Object;)V", false); + mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemoteDiffGroup" : "remoteDiffGroup", "(I[Ljava/lang/Object;)V", false); mv.visitLabel(diffLabel); if (returnType == void.class) { @@ -864,46 +750,35 @@ public abstract class Sncp { } } - public static T createSimpleLocalService(final String name, final Class serviceImplClass, final InetSocketAddress clientSncpAddress, final Transport sameGroupTransport) { - return createLocalService(name, null, ResourceFactory.root(), serviceImplClass, clientSncpAddress, null, new HashSet<>(), null, sameGroupTransport, null); - } - /** * * 创建本地模式Service实例 * - * @param Service泛型 - * @param name 资源名 - * @param executor 线程池 - * @param resourceFactory 资源容器 - * @param serviceImplClass Service类 - * @param clientSncpAddress 本地IP地址 - * @param sncpGroup 自身的组节点名 可能为null - * @param groups 所有的组节点,包含自身 - * @param conf 启动配置项 - * @param sameGroupTransport 同组的通信组件 - * @param diffGroupTransports 异组的通信组件列表 + * @param Service泛型 + * @param name 资源名 + * @param serviceImplClass Service类 + * @param resourceFactory ResourceFactory + * @param transportFactory TransportFactory + * @param clientSncpAddress 本地IP地址 + * @param groups 所有的组节点,包含自身 + * @param conf 启动配置项 * * @return Service的本地模式实例 */ @SuppressWarnings("unchecked") public static T createLocalService( final String name, - final Consumer executor, - final ResourceFactory resourceFactory, final Class serviceImplClass, + final ResourceFactory resourceFactory, + final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, - final String sncpGroup, final Set groups, - final AnyValue conf, - final Transport sameGroupTransport, - final Collection diffGroupTransports) { + final AnyValue conf) { try { final Class newClazz = createLocalServiceClass(name, serviceImplClass); T rs = (T) newClazz.newInstance(); //-------------------------------------- Service remoteService = null; - Transport remoteTransport = null; { Class loop = newClazz; do { @@ -913,26 +788,7 @@ public abstract class Sncp { if (field.getAnnotation(RpcRemote.class) == null) continue; if (!field.getType().isAssignableFrom(newClazz)) continue; field.setAccessible(true); - if (remoteTransport == null) { - List list = new ArrayList<>(); - if (sameGroupTransport != null) list.add(sameGroupTransport); - if (diffGroupTransports != null) list.addAll(diffGroupTransports); - if (!list.isEmpty()) { - Transport tmp = new Transport(list); - synchronized (resourceFactory) { - Transport old = resourceFactory.find(tmp.getName(), Transport.class); - if (old != null) { - remoteTransport = old; - } else { - remoteTransport = tmp; - resourceFactory.register(tmp.getName(), tmp); - } - } - } - } - if (remoteService == null && remoteTransport != null) { - remoteService = createRemoteService(name, executor, serviceImplClass, clientSncpAddress, sncpGroup, groups, conf, remoteTransport); - } + if (remoteService == null) remoteService = createRemoteService(name, serviceImplClass, transportFactory, clientSncpAddress, groups, conf); if (remoteService != null) field.set(rs, remoteService); } } while ((loop = loop.getSuperclass()) != Object.class); @@ -942,80 +798,31 @@ public abstract class Sncp { try { Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client"); e.setAccessible(true); - client = new SncpClient(name, serviceImplClass, rs, executor, false, newClazz, clientSncpAddress); + client = new SncpClient(name, serviceImplClass, rs, transportFactory, false, newClazz, clientSncpAddress); + Set diffGroups = groups == null ? new HashSet<>() : new HashSet<>(groups); + String sameGroup = transportFactory.findGroupName(clientSncpAddress); + if (sameGroup != null) diffGroups.remove(sameGroup); + client.setSameGroup(sameGroup); + client.setDiffGroups(diffGroups); + client.setSameGroupTransport(transportFactory.loadSameGroupTransport(clientSncpAddress)); + client.setDiffGroupTransports(transportFactory.loadDiffGroupTransports(clientSncpAddress, diffGroups)); e.set(rs, client); } catch (NoSuchFieldException ne) { + ne.printStackTrace(); } } - { - StringBuilder sb = new StringBuilder(); - sb.append(newClazz.getName()).append("{name = '").append(name).append("'"); - if (client != null) { - sb.append(", serviceid = ").append(client.getServiceid()); - sb.append(", serviceversion = ").append(client.getServiceversion()); - sb.append(", action.size = ").append(client.getActionCount()); -// List groups = new ArrayList<>(); -// if (sameGroupTransport != null) groups.add(sameGroupTransport.getName()); -// if (diffGroupTransports != null) { -// for (Transport t : diffGroupTransports) { -// groups.add(t.getName()); -// } -// } - sb.append(", address = ").append(clientSncpAddress).append(", groups = ").append(groups); - sb.append(", sameaddrs = ").append(sameGroupTransport == null ? null : Arrays.asList(sameGroupTransport.getRemoteAddresses())); - - List addrs = new ArrayList<>(); - if (diffGroupTransports != null) { - for (Transport t : diffGroupTransports) { - addrs.addAll(Arrays.asList(t.getRemoteAddresses())); - } - } - sb.append(", diffaddrs = ").append(addrs); - } else { - sb.append(", ").append(RpcMultiRun.class.getSimpleName().toLowerCase()).append(" = false"); - } - sb.append("}"); - Field s = newClazz.getDeclaredField(FIELDPREFIX + "_selfstring"); - s.setAccessible(true); - s.set(rs, sb.toString()); - } if (client == null) return rs; - { - Field c = newClazz.getDeclaredField(FIELDPREFIX + "_sncpGroup"); - c.setAccessible(true); - c.set(rs, sncpGroup); - } - { - Field c = newClazz.getDeclaredField(FIELDPREFIX + "_groups"); - c.setAccessible(true); - c.set(rs, groups); - } { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf"); c.setAccessible(true); c.set(rs, conf); } - { - Field c = newClazz.getDeclaredField(FIELDPREFIX + "_sameGroupTransport"); - c.setAccessible(true); - c.set(rs, sameGroupTransport); - } - if (diffGroupTransports != null) { - Field t = newClazz.getDeclaredField(FIELDPREFIX + "_diffGroupTransports"); - t.setAccessible(true); - t.set(rs, diffGroupTransports.toArray(new Transport[diffGroupTransports.size()])); - } return rs; } catch (RuntimeException rex) { throw rex; } catch (Exception ex) { throw new RuntimeException(ex); } - - } - - public static T createSimpleRemoteService(final String name, final Class serviceTypeOrImplClass, final InetSocketAddress clientAddress, final Transport transport) { - return createRemoteService(name, null, serviceTypeOrImplClass, clientAddress, (String) null, new HashSet<>(), (AnyValue) null, transport); } /** @@ -1081,26 +888,22 @@ public abstract class Sncp { * * @param Service泛型 * @param name 资源名 - * @param executor 线程池 * @param serviceTypeOrImplClass Service类 + * @param transportFactory TransportFactory * @param clientAddress 本地IP地址 - * @param sncpGroup 自身的组节点名 可能为null * @param groups 所有的组节点,包含自身 * @param conf 启动配置项 - * @param transport 通信组件 * * @return Service的远程模式实例 */ @SuppressWarnings("unchecked") public static T createRemoteService( final String name, - final Consumer executor, final Class serviceTypeOrImplClass, + final TransportFactory transportFactory, final InetSocketAddress clientAddress, - final String sncpGroup, final Set groups, - final AnyValue conf, - final Transport transport) { + final AnyValue conf) { if (serviceTypeOrImplClass == null) return null; if (!Service.class.isAssignableFrom(serviceTypeOrImplClass)) return null; int mod = serviceTypeOrImplClass.getModifiers(); @@ -1110,36 +913,18 @@ public abstract class Sncp { final String clientName = SncpClient.class.getName().replace('.', '/'); final String clientDesc = Type.getDescriptor(SncpClient.class); final String sncpDynDesc = Type.getDescriptor(SncpDyn.class); - final String bsonConvertDesc = Type.getDescriptor(BsonConvert.class); - final String jsonConvertDesc = Type.getDescriptor(JsonConvert.class); - final String transportDesc = Type.getDescriptor(Transport.class); - final String stringDesc = Type.getDescriptor(String.class); final String anyValueDesc = Type.getDescriptor(AnyValue.class); ClassLoader loader = Sncp.class.getClassLoader(); String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceTypeOrImplClass.getSimpleName(); try { Class newClazz = Class.forName(newDynName.replace('/', '.')); T rs = (T) newClazz.newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, executor, true, realed ? createLocalServiceClass(name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + client.setRemoteGroups(groups); + client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups)); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); c.set(rs, client); - Field t = newClazz.getDeclaredField(FIELDPREFIX + "_transport"); - t.setAccessible(true); - t.set(rs, transport); - { - StringBuilder sb = new StringBuilder(); - sb.append(newClazz.getName()).append("{name = '").append(name); - sb.append("', serviceid = '").append(client.getServiceid()); - sb.append("', serviceversion = ").append(client.getServiceversion()); - sb.append(", action.size = ").append(client.getActionCount()); - sb.append(", address = ").append(clientAddress).append(", groups = ").append(transport == null ? null : transport.getName()); - sb.append(", remoteaddrs = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddresses())); - sb.append("}"); - Field s = newClazz.getDeclaredField(FIELDPREFIX + "_selfstring"); - s.setAccessible(true); - s.set(rs, sb.toString()); - } return rs; } catch (Throwable ex) { } @@ -1172,54 +957,14 @@ public abstract class Sncp { visitAnnotation(cw.visitAnnotation(Type.getDescriptor(ann.annotationType()), true), ann); } } - { - fv = cw.visitField(ACC_PRIVATE + ACC_FINAL + ACC_STATIC, FIELDPREFIX + "_service_type", "Ljava/lang/Class;", null, null); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_bsonConvert", bsonConvertDesc, null, null); - av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true); - av0.visitEnd(); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_jsonConvert", jsonConvertDesc, null, null); - av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true); - av0.visitEnd(); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_sncpGroup", stringDesc, null, null); - fv.visitEnd(); - } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_groups", "Ljava/util/Set;", "Ljava/util/Set;", null); - fv.visitEnd(); - } { fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_conf", anyValueDesc, null, null); fv.visitEnd(); } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_transport", transportDesc, null, null); - fv.visitEnd(); - } { fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_client", clientDesc, null, null); fv.visitEnd(); } - { - fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_selfstring", "Ljava/lang/String;", null, null); - fv.visitEnd(); - } - {//静态构造函数 - mv = new AsmMethodVisitor(cw.visitMethod(ACC_STATIC, "", "()V", null, null)); - mv.visitLdcInsn(Type.getType(Type.getDescriptor(serviceTypeOrImplClass))); - mv.visitFieldInsn(PUTSTATIC, newDynName, FIELDPREFIX + "_service_type", "Ljava/lang/Class;"); - mv.visitInsn(RETURN); - mv.visitMaxs(1, 0); - mv.visitEnd(); - } { //构造函数 mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "", "()V", null, null)); //mv.setDebug(true); @@ -1244,16 +989,18 @@ public abstract class Sncp { { // toString() mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_selfstring", "Ljava/lang/String;"); + mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); Label l1 = new Label(); mv.visitJumpInsn(IFNONNULL, l1); mv.visitVarInsn(ALOAD, 0); - mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "toString", "()Ljava/lang/String;", false); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Object", "getClass", "()Ljava/lang/Class;", false); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Class", "getName", "()Ljava/lang/String;", false); Label l2 = new Label(); mv.visitJumpInsn(GOTO, l2); mv.visitLabel(l1); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_selfstring", "Ljava/lang/String;"); + mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); + mv.visitMethodInsn(INVOKEVIRTUAL, clientName, "toSimpleString", "()Ljava/lang/String;", false); mv.visitLabel(l2); mv.visitInsn(ARETURN); mv.visitMaxs(1, 1); @@ -1276,12 +1023,7 @@ public abstract class Sncp { } mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_client", clientDesc); - mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_bsonConvert", bsonConvertDesc); - mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_jsonConvert", jsonConvertDesc); - mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, FIELDPREFIX + "_transport", transportDesc); + if (index <= 5) { mv.visitInsn(ICONST_0 + index); } else { @@ -1326,7 +1068,7 @@ public abstract class Sncp { } } - mv.visitMethodInsn(INVOKEVIRTUAL, clientName, "remote", "(" + bsonConvertDesc + jsonConvertDesc + transportDesc + "I[Ljava/lang/Object;)Ljava/lang/Object;", false); + mv.visitMethodInsn(INVOKEVIRTUAL, clientName, "remote", "(I[Ljava/lang/Object;)Ljava/lang/Object;", false); //mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false); if (method.getGenericReturnType() == void.class) { mv.visitInsn(POP); @@ -1369,7 +1111,9 @@ public abstract class Sncp { }.loadClass(newDynName.replace('/', '.'), bytes); try { T rs = (T) newClazz.newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, executor, true, realed ? createLocalServiceClass(name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + client.setRemoteGroups(groups); + client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups)); { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); @@ -1380,34 +1124,6 @@ public abstract class Sncp { c.setAccessible(true); c.set(rs, conf); } - { - Field c = newClazz.getDeclaredField(FIELDPREFIX + "_sncpGroup"); - c.setAccessible(true); - c.set(rs, sncpGroup); - } - { - Field c = newClazz.getDeclaredField(FIELDPREFIX + "_groups"); - c.setAccessible(true); - c.set(rs, groups); - } - { - Field t = newClazz.getDeclaredField(FIELDPREFIX + "_transport"); - t.setAccessible(true); - t.set(rs, transport); - } - { - StringBuilder sb = new StringBuilder(); - sb.append(newClazz.getName()).append("{name = '").append(name); - sb.append("', serviceid = ").append(client.getServiceid()); - sb.append(", serviceversion = ").append(client.getServiceversion()); - sb.append(", action.size = ").append(client.getActionCount()); - sb.append(", address = ").append(clientAddress).append(", groups = ").append(transport == null ? null : transport.getName()); - sb.append(", remotes = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddresses())); - sb.append("}"); - Field s = newClazz.getDeclaredField(FIELDPREFIX + "_selfstring"); - s.setAccessible(true); - s.set(rs, sb.toString()); - } return rs; } catch (Exception ex) { throw new RuntimeException(ex); diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index f1ee7d9e3..e36c39411 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -12,8 +12,8 @@ import java.nio.*; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -import java.util.function.*; import java.util.logging.*; +import javax.annotation.Resource; import org.redkale.convert.bson.*; import org.redkale.convert.json.*; import org.redkale.net.*; @@ -31,111 +31,6 @@ import org.redkale.service.RpcCall; */ public final class SncpClient { - protected static final class SncpAction { - - protected final DLong actionid; - - protected final Method method; - - protected final Type resultTypes; //void 必须设为 null - - protected final Type[] paramTypes; - - protected final Class[] paramClass; - - protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 - - protected final int handlerFuncParamIndex; - - protected final int handlerAttachParamIndex; - - protected final int addressTargetParamIndex; - - protected final int addressSourceParamIndex; - - protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture - - protected final Creator futureCreator; - - public SncpAction(final Class clazz, Method method, DLong actionid) { - this.actionid = actionid == null ? Sncp.hash(method) : actionid; - Type rt = method.getGenericReturnType(); - if (rt instanceof TypeVariable) { - TypeVariable tv = (TypeVariable) rt; - if (tv.getBounds().length == 1) rt = tv.getBounds()[0]; - } - this.resultTypes = rt == void.class ? null : rt; - this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); - this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; - this.paramTypes = method.getGenericParameterTypes(); - this.paramClass = method.getParameterTypes(); - this.method = method; - Annotation[][] anns = method.getParameterAnnotations(); - int targetAddrIndex = -1; - int sourceAddrIndex = -1; - int handlerAttachIndex = -1; - int handlerFuncIndex = -1; - boolean hasattr = false; - Attribute[] atts = new Attribute[paramTypes.length + 1]; - if (anns.length > 0) { - Class[] params = method.getParameterTypes(); - for (int i = 0; i < params.length; i++) { - if (AsyncHandler.class.isAssignableFrom(params[i])) { - if (boolReturnTypeFuture) { - throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture"); - } - if (handlerFuncIndex >= 0) { - throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); - } - Sncp.checkAsyncModifier(params[i], method); - handlerFuncIndex = i; - break; - } - } - for (int i = 0; i < anns.length; i++) { - if (anns[i].length > 0) { - for (Annotation ann : anns[i]) { - if (ann.annotationType() == RpcAttachment.class) { - if (handlerAttachIndex >= 0) { - throw new RuntimeException(method + " have more than one @RpcAttachment parameter"); - } - handlerAttachIndex = i; - } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - targetAddrIndex = i; - } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - sourceAddrIndex = i; - } - } - for (Annotation ann : anns[i]) { - if (ann.annotationType() == RpcCall.class) { - try { - atts[i + 1] = ((RpcCall) ann).value().newInstance(); - hasattr = true; - } catch (Exception e) { - logger.log(Level.SEVERE, RpcCall.class.getSimpleName() + ".attribute cannot a newInstance for" + method, e); - } - break; - } - } - } - } - } - this.addressTargetParamIndex = targetAddrIndex; - this.addressSourceParamIndex = sourceAddrIndex; - this.handlerFuncParamIndex = handlerFuncIndex; - this.handlerAttachParamIndex = handlerAttachIndex; - this.paramAttrs = hasattr ? atts : null; - if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { - throw new RuntimeException(method + " have AsyncHandler type parameter but return type is not void"); - } - } - - @Override - public String toString() { - return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}"; - } - } - protected static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName()); protected final boolean finest = logger.isLoggable(Level.FINEST); @@ -160,12 +55,36 @@ public final class SncpClient { protected final SncpAction[] actions; - protected final Consumer executor; + protected final ExecutorService executor; - public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final Consumer executor, + @Resource + protected JsonConvert jsonConvert; + + @Resource + protected BsonConvert bsonConvert; + + //远程模式 + protected Set remoteGroups; + + //远程模式 + protected Transport remoteGroupTransport; + + //本地模式 + protected String sameGroup; + + //本地模式 + protected Transport sameGroupTransport; + + //本地模式 + protected Set diffGroups; + + //本地模式 + protected Transport[] diffGroupTransports; + + public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final TransportFactory factory, final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) { this.remote = remote; - this.executor = executor; + this.executor = factory.getExecutor(); this.serviceClass = serviceClass; this.serviceversion = 0; this.clientAddress = clientAddress; @@ -209,6 +128,54 @@ public final class SncpClient { return actions.length; } + public Set getRemoteGroups() { + return remoteGroups; + } + + public void setRemoteGroups(Set remoteGroups) { + this.remoteGroups = remoteGroups; + } + + public Transport getRemoteGroupTransport() { + return remoteGroupTransport; + } + + public void setRemoteGroupTransport(Transport remoteGroupTransport) { + this.remoteGroupTransport = remoteGroupTransport; + } + + public String getSameGroup() { + return sameGroup; + } + + public void setSameGroup(String sameGroup) { + this.sameGroup = sameGroup; + } + + public Transport getSameGroupTransport() { + return sameGroupTransport; + } + + public void setSameGroupTransport(Transport sameGroupTransport) { + this.sameGroupTransport = sameGroupTransport; + } + + public Set getDiffGroups() { + return diffGroups; + } + + public void setDiffGroups(Set diffGroups) { + this.diffGroups = diffGroups; + } + + public Transport[] getDiffGroupTransports() { + return diffGroupTransports; + } + + public void setDiffGroupTransports(Transport[] diffGroupTransports) { + this.diffGroupTransports = diffGroupTransports; + } + @Override public String toString() { String service = serviceClass.getName(); @@ -218,6 +185,26 @@ public final class SncpClient { + ", actions.size = " + actions.length + ")"; } + public String toSimpleString() { //给Sncp产生的Service用 + String service = serviceClass.getName(); + if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX); + List diffaddrs = new ArrayList<>(); + if (diffGroupTransports != null && diffGroupTransports.length > 1) { + for (Transport t : diffGroupTransports) { + diffaddrs.addAll(Utility.ofList(t.getRemoteAddresses())); + } + } + return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceversion = " + serviceversion + + ", clientaddr = " + (clientAddress == null ? "" : (clientAddress.getHostString() + ":" + clientAddress.getPort())) + + ((sameGroup == null || sameGroup.isEmpty()) ? "" : ", sameGroup = " + sameGroup) + + (sameGroupTransport == null ? "" : ", sameGroupTransport = " + Arrays.toString(sameGroupTransport.getRemoteAddresses())) + + ((diffGroups == null || diffGroups.isEmpty()) ? "" : ", diffGroups = " + diffGroups) + + ((diffGroupTransports == null || diffGroupTransports.length < 1) ? "" : ", diffGroupTransports = " + diffaddrs) + + ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups) + + (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses())) + + ", actions.size = " + actions.length + ")"; + } + public static List parseMethod(final Class serviceClass) { final List list = new ArrayList<>(); final List multis = new ArrayList<>(); @@ -260,53 +247,53 @@ public final class SncpClient { return multis; } - public void remoteSameGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) { - if (transport == null) return; + public void remoteSameGroup(final int index, final Object... params) { + if (sameGroupTransport == null) return; final SncpAction action = actions[index]; if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了 - for (InetSocketAddress addr : transport.getRemoteAddresses()) { - remote0(null, bsonConvert, jsonConvert, transport, addr, action, params); + for (InetSocketAddress addr : sameGroupTransport.getRemoteAddresses()) { + remote0(null, sameGroupTransport, addr, action, params); } } - public void asyncRemoteSameGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) { - if (transport == null) return; + public void asyncRemoteSameGroup(final int index, final Object... params) { + if (sameGroupTransport == null) return; if (executor != null) { - executor.accept(() -> { - remoteSameGroup(bsonConvert, jsonConvert, transport, index, params); + executor.execute(() -> { + remoteSameGroup(index, params); }); } else { - remoteSameGroup(bsonConvert, jsonConvert, transport, index, params); + remoteSameGroup(index, params); } } - public void remoteDiffGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport[] transports, final int index, final Object... params) { - if (transports == null || transports.length < 1) return; + public void remoteDiffGroup(final int index, final Object... params) { + if (diffGroupTransports == null || diffGroupTransports.length < 1) return; final SncpAction action = actions[index]; if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了 - for (Transport transport : transports) { - remote0(null, bsonConvert, jsonConvert, transport, null, action, params); + for (Transport transport : diffGroupTransports) { + remote0(null, transport, null, action, params); } } - public void asyncRemoteDiffGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport[] transports, final int index, final Object... params) { - if (transports == null || transports.length < 1) return; + public void asyncRemoteDiffGroup(final int index, final Object... params) { + if (diffGroupTransports == null || diffGroupTransports.length < 1) return; if (executor != null) { - executor.accept(() -> { - remoteDiffGroup(bsonConvert, jsonConvert, transports, index, params); + executor.execute(() -> { + remoteDiffGroup(index, params); }); } else { - remoteDiffGroup(bsonConvert, jsonConvert, transports, index, params); + remoteDiffGroup(index, params); } } //只给远程模式调用的 - public T remote(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) { + public T remote(final int index, final Object... params) { final SncpAction action = actions[index]; final AsyncHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (AsyncHandler) params[action.handlerFuncParamIndex] : null; if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; final BsonReader reader = bsonConvert.pollBsonReader(); - CompletableFuture future = remote0(handlerFunc, bsonConvert, jsonConvert, transport, null, action, params); + CompletableFuture future = remote0(handlerFunc, remoteGroupTransport, null, action, params); if (action.boolReturnTypeFuture) { CompletableFuture result = action.futureCreator.create(); future.whenComplete((v, e) -> { @@ -349,27 +336,19 @@ public final class SncpClient { } } - public void remote(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport[] transports, final int index, final Object... params) { - if (transports == null || transports.length < 1) return; - remote(bsonConvert, jsonConvert, transports[0], index, params); - for (int i = 1; i < transports.length; i++) { - remote0(null, bsonConvert, jsonConvert, transports[i], null, actions[index], params); - } - } - - private CompletableFuture remote0(final AsyncHandler handler, final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remote0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { if ("rest".equalsIgnoreCase(transport.getSubprotocol())) { - return remoteRest0(handler, jsonConvert, transport, addr0, action, params); + return remoteRest0(handler, transport, addr0, action, params); } - return remoteSncp0(handler, bsonConvert, transport, addr0, action, params); + return remoteSncp0(handler, transport, addr0, action, params); } //尚未实现 - private CompletableFuture remoteRest0(final AsyncHandler handler, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remoteRest0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { return null; } - private CompletableFuture remoteSncp0(final AsyncHandler handler, final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remoteSncp0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { final Type[] myparamtypes = action.paramTypes; final Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; @@ -555,4 +534,108 @@ public final class SncpClient { buffer.position(currentpos); } + protected static final class SncpAction { + + protected final DLong actionid; + + protected final Method method; + + protected final Type resultTypes; //void 必须设为 null + + protected final Type[] paramTypes; + + protected final Class[] paramClass; + + protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 + + protected final int handlerFuncParamIndex; + + protected final int handlerAttachParamIndex; + + protected final int addressTargetParamIndex; + + protected final int addressSourceParamIndex; + + protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture + + protected final Creator futureCreator; + + public SncpAction(final Class clazz, Method method, DLong actionid) { + this.actionid = actionid == null ? Sncp.hash(method) : actionid; + Type rt = method.getGenericReturnType(); + if (rt instanceof TypeVariable) { + TypeVariable tv = (TypeVariable) rt; + if (tv.getBounds().length == 1) rt = tv.getBounds()[0]; + } + this.resultTypes = rt == void.class ? null : rt; + this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); + this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; + this.paramTypes = method.getGenericParameterTypes(); + this.paramClass = method.getParameterTypes(); + this.method = method; + Annotation[][] anns = method.getParameterAnnotations(); + int targetAddrIndex = -1; + int sourceAddrIndex = -1; + int handlerAttachIndex = -1; + int handlerFuncIndex = -1; + boolean hasattr = false; + Attribute[] atts = new Attribute[paramTypes.length + 1]; + if (anns.length > 0) { + Class[] params = method.getParameterTypes(); + for (int i = 0; i < params.length; i++) { + if (AsyncHandler.class.isAssignableFrom(params[i])) { + if (boolReturnTypeFuture) { + throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture"); + } + if (handlerFuncIndex >= 0) { + throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); + } + Sncp.checkAsyncModifier(params[i], method); + handlerFuncIndex = i; + break; + } + } + for (int i = 0; i < anns.length; i++) { + if (anns[i].length > 0) { + for (Annotation ann : anns[i]) { + if (ann.annotationType() == RpcAttachment.class) { + if (handlerAttachIndex >= 0) { + throw new RuntimeException(method + " have more than one @RpcAttachment parameter"); + } + handlerAttachIndex = i; + } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + targetAddrIndex = i; + } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + sourceAddrIndex = i; + } + } + for (Annotation ann : anns[i]) { + if (ann.annotationType() == RpcCall.class) { + try { + atts[i + 1] = ((RpcCall) ann).value().newInstance(); + hasattr = true; + } catch (Exception e) { + logger.log(Level.SEVERE, RpcCall.class.getSimpleName() + ".attribute cannot a newInstance for" + method, e); + } + break; + } + } + } + } + } + this.addressTargetParamIndex = targetAddrIndex; + this.addressSourceParamIndex = sourceAddrIndex; + this.handlerFuncParamIndex = handlerFuncIndex; + this.handlerAttachParamIndex = handlerAttachIndex; + this.paramAttrs = hasattr ? atts : null; + if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { + throw new RuntimeException(method + " have AsyncHandler type parameter but return type is not void"); + } + } + + @Override + public String toString() { + return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}"; + } + } }