From 6c2ddf49a7c3f89bdd588e1218d63d2ed0616146 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Thu, 13 Aug 2015 10:55:22 +0800 Subject: [PATCH] --- src/META-INF/application-template.xml | 8 +- src/com/wentch/redkale/boot/Application.java | 2 + .../wentch/redkale/boot/NodeHttpServer.java | 12 +-- src/com/wentch/redkale/boot/NodeServer.java | 95 +++++++++---------- .../wentch/redkale/net/sncp/SncpRequest.java | 1 + 5 files changed, 60 insertions(+), 58 deletions(-) diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index ba993210d..5ea766724 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -38,7 +38,7 @@ 一个组包含多个NODE, 同一Service服务可以由多个进程提供,这些进程称为一个GROUP,且同一GROUP内的进程必须在同一机房或局域网内 name: 服务组ID,长度不能超过11个字节. 默认为空字符串。 protocol:值只能是UDP TCP, 默认TCP - 注意: 一个node只能所属一个group。 + 注意: 一个node只能所属一个group。只要存在protocol == SNCP的Server节点信息, 就必须有group节点信息。 --> diff --git a/src/com/wentch/redkale/boot/Application.java b/src/com/wentch/redkale/boot/Application.java index 0ee49a529..06caaf91a 100644 --- a/src/com/wentch/redkale/boot/Application.java +++ b/src/com/wentch/redkale/boot/Application.java @@ -385,6 +385,8 @@ public final class Application { others.add(entry); } } + if (!sncps.isEmpty() && globalNodes.isEmpty()) throw new RuntimeException("found SNCP Server node bug not found node info."); + factory.register(RESNAME_SNCP_NODES, new TypeToken>() { }.getType(), globalNodes); factory.register(RESNAME_SNCP_NODES, new TypeToken>() { diff --git a/src/com/wentch/redkale/boot/NodeHttpServer.java b/src/com/wentch/redkale/boot/NodeHttpServer.java index 2bb42436a..85196883d 100644 --- a/src/com/wentch/redkale/boot/NodeHttpServer.java +++ b/src/com/wentch/redkale/boot/NodeHttpServer.java @@ -58,7 +58,7 @@ public final class NodeHttpServer extends NodeServer { for (NodeServer ns : application.servers) { if (!ns.isSNCP()) continue; if (sncpServer0 == null) sncpServer0 = (NodeSncpServer) ns; - if (ns.getNodeGroup().equals(getNodeGroup())) { + if (ns.getSncpGroup().equals(getSncpGroup())) { sncpServer0 = (NodeSncpServer) ns; break; } @@ -77,20 +77,20 @@ public final class NodeHttpServer extends NodeServer { if (nodeService == null) { Class sc = (Class) application.webSocketNodeClass; nodeService = Sncp.createLocalService(rcname, (Class) (sc == null ? WebSocketNodeService.class : sc), - getNodeAddress(), (sc == null ? null : nodeSameGroupTransports), (sc == null ? null : nodeDiffGroupTransports)); + getSncpAddress(), (sc == null ? null : nodeSameGroupTransports), (sc == null ? null : nodeDiffGroupTransports)); regFactory.register(rcname, WebSocketNode.class, nodeService); WebSocketNode wsn = (WebSocketNode) nodeService; - wsn.setLocalSncpAddress(getNodeAddress()); + wsn.setLocalSncpAddress(getSncpAddress()); final Set alladdrs = new HashSet<>(); application.globalNodes.forEach((k, v) -> alladdrs.add(k)); - alladdrs.remove(getNodeAddress()); + alladdrs.remove(getSncpAddress()); WebSocketNode remoteNode = (WebSocketNode) Sncp.createRemoteService(rcname, (Class) (sc == null ? WebSocketNodeService.class : sc), - getNodeAddress(), (sc == null ? null : loadTransport(getNodeGroup(), getNodeProtocol(), alladdrs))); + getSncpAddress(), (sc == null ? null : loadTransport(getSncpGroup(), getNodeProtocol(), alladdrs))); wsn.setRemoteWebSocketNode(remoteNode); factory.inject(nodeService); factory.inject(remoteNode); if (sncpServer != null) { - ServiceWrapper wrapper = new ServiceWrapper((Class) (sc == null ? WebSocketNodeService.class : sc), nodeService, getNodeGroup(), rcname, null); + ServiceWrapper wrapper = new ServiceWrapper((Class) (sc == null ? WebSocketNodeService.class : sc), nodeService, getSncpGroup(), rcname, null); sncpServer.getSncpServer().addService(wrapper); } } diff --git a/src/com/wentch/redkale/boot/NodeServer.java b/src/com/wentch/redkale/boot/NodeServer.java index 5f4cf11ce..1442f226f 100644 --- a/src/com/wentch/redkale/boot/NodeServer.java +++ b/src/com/wentch/redkale/boot/NodeServer.java @@ -45,9 +45,9 @@ public abstract class NodeServer { private final Server server; - private InetSocketAddress nodeAddress; //HttpServer中的nodeAddress 为所属group对应的SncpServer, 为null表示没有分布式结构 + private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点,没有分布式结构 - private String nodeGroup = ""; //当前Server的SNCP协议的组 + private String sncpGroup = ""; //当前Server的SNCP协议的组 private AnyValue nodeConf; @@ -77,31 +77,34 @@ public abstract class NodeServer { this.nodeConf = config == null ? AnyValue.create() : config; if (isSNCP()) { // SNCP协议 String host = this.nodeConf.getValue("host", "0.0.0.0").replace("0.0.0.0", ""); - this.nodeAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port")); - this.nodeGroup = application.globalNodes.getOrDefault(this.nodeAddress, ""); + this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port")); + this.sncpGroup = application.globalNodes.getOrDefault(this.sncpAddress, ""); if (server != null) this.nodeProtocol = server.getProtocol(); } else { // HTTP协议 + this.sncpAddress = null; + this.sncpGroup = ""; + this.nodeProtocol = Sncp.DEFAULT_PROTOCOL; String mbgroup = this.nodeConf.getValue("group", ""); NodeServer sncpServer = null; //有匹配的就取匹配的, 没有且SNCP只有一个,则取此SNCP。 for (NodeServer ns : application.servers) { if (!ns.isSNCP()) continue; if (sncpServer == null) sncpServer = ns; - if (ns.getNodeGroup().equals(mbgroup)) { + if (ns.getSncpGroup().equals(mbgroup)) { sncpServer = ns; break; } } if (sncpServer != null) { - this.nodeAddress = sncpServer.getNodeAddress(); - this.nodeGroup = sncpServer.getNodeGroup(); + this.sncpAddress = sncpServer.getSncpAddress(); + this.sncpGroup = sncpServer.getSncpGroup(); this.nodeProtocol = sncpServer.getNodeProtocol(); } } - if (this.nodeAddress != null) { // 无分布式结构下 HTTP协议的nodeAddress 为 null - this.factory.register(RESNAME_SNCP_NODE, SocketAddress.class, this.nodeAddress); - this.factory.register(RESNAME_SNCP_NODE, InetSocketAddress.class, this.nodeAddress); - this.factory.register(RESNAME_SNCP_NODE, String.class, this.nodeAddress.getAddress().getHostAddress()); - this.factory.register(RESNAME_SNCP_GROUP, this.nodeGroup); + if (this.sncpAddress != null) { // 无分布式结构下 HTTP协议的sncpAddress 为 null + this.factory.register(RESNAME_SNCP_NODE, SocketAddress.class, this.sncpAddress); + this.factory.register(RESNAME_SNCP_NODE, InetSocketAddress.class, this.sncpAddress); + this.factory.register(RESNAME_SNCP_NODE, String.class, this.sncpAddress.getAddress().getHostAddress()); + this.factory.register(RESNAME_SNCP_GROUP, this.sncpGroup); } { //设置root文件夹 @@ -135,9 +138,9 @@ public abstract class NodeServer { regFactory.register(rs.name(), DataSource.class, source); Class sc = (Class) application.dataCacheListenerClass; if (sc != null) { - Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.nodeAddress, nodeSameGroupTransports, nodeDiffGroupTransports); + Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.sncpAddress, nodeSameGroupTransports, nodeDiffGroupTransports); regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService); - ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, nodeGroup, rs.name(), null); + ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, sncpGroup, rs.name(), null); localServices.add(wrapper); if (consumer != null) consumer.accept(wrapper); rf.inject(cacheListenerService); @@ -151,18 +154,18 @@ public abstract class NodeServer { } protected List[] parseTransport(final String[] groups) { - final Set sameGroupAddrs = application.findGlobalGroup(this.nodeGroup); + final Set sameGroupAddrs = application.findGlobalGroup(this.sncpGroup); final Map> diffGroupAddrs = new HashMap<>(); for (String groupitem : groups) { final Set addrs = application.findGlobalGroup(groupitem); - if (addrs == null || groupitem.equals(this.nodeGroup)) continue; + if (addrs == null || groupitem.equals(this.sncpGroup)) continue; diffGroupAddrs.put(groupitem, addrs); } final List sameGroupTransports0 = new ArrayList<>(); if (sameGroupAddrs != null) { - sameGroupAddrs.remove(this.nodeAddress); + sameGroupAddrs.remove(this.sncpAddress); for (InetSocketAddress iaddr : sameGroupAddrs) { - sameGroupTransports0.add(loadTransport(this.nodeGroup, getNodeProtocol(), iaddr)); + sameGroupTransports0.add(loadTransport(this.sncpGroup, getNodeProtocol(), iaddr)); } } final List diffGroupTransports0 = new ArrayList<>(); @@ -174,12 +177,12 @@ public abstract class NodeServer { public abstract boolean isSNCP(); - public InetSocketAddress getNodeAddress() { - return nodeAddress; + public InetSocketAddress getSncpAddress() { + return sncpAddress; } - public String getNodeGroup() { - return nodeGroup; + public String getSncpGroup() { + return sncpGroup; } public String getNodeProtocol() { @@ -235,7 +238,7 @@ public abstract class NodeServer { if (serviceFilter == null) return; final String threadName = "[" + Thread.currentThread().getName() + "] "; final Set> entrys = serviceFilter.getFilterEntrys(); - final String defgroup = nodeConf == null ? "" : nodeConf.getValue("group", ""); //Server节点获取group信息 + final String defgroups = nodeConf == null ? "" : nodeConf.getValue("group", ""); //Server节点获取group信息 ResourceFactory regFactory = isSNCP() ? application.factory : factory; for (FilterEntry entry : entrys) { //service实现类 final Class type = entry.getType(); @@ -245,50 +248,44 @@ public abstract class NodeServer { if (Modifier.isAbstract(type.getModifiers())) continue; if (type.getAnnotation(Ignore.class) != null) continue; if (!isSNCP() && factory.find(entry.getName(), type) != null) continue; - String group = entry.getGroup(); - if (group == null || group.isEmpty()) group = defgroup; + String groups = entry.getGroup(); + if (groups == null || groups.isEmpty()) groups = defgroups; final Set sameGroupAddrs = new LinkedHashSet<>(); final Map> diffGroupAddrs = new HashMap<>(); - for (String str : group.split(";")) { - application.globalNodes.forEach((k, v) -> { - if (v.equals(str)) { - if (v.equals(this.nodeGroup)) { - sameGroupAddrs.add(k); - } else { - Set set = diffGroupAddrs.get(v); - if (set == null) { - set = new LinkedHashSet<>(); - diffGroupAddrs.put(v, set); - } - set.add(k); - } - } - }); + for (String g : groups.split(";")) { + if (g.isEmpty()) continue; + Set set = application.findGlobalGroup(g); + if (set == null) throw new RuntimeException(type.getName() + " has illegal group (" + groups + ")"); + if (g.equals(this.sncpGroup)) { + sameGroupAddrs.addAll(set); + } else { + diffGroupAddrs.put(g, set); + } } - final boolean localable = sameGroupAddrs.contains(this.nodeAddress); + final boolean localable = this.sncpAddress == null || sameGroupAddrs.contains(this.sncpAddress); Service service; List diffGroupTransports = new ArrayList<>(); diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v))); if (localable || (sameGroupAddrs.isEmpty() && diffGroupTransports.isEmpty())) { - sameGroupAddrs.remove(this.nodeAddress); + sameGroupAddrs.remove(this.sncpAddress); List sameGroupTransports = new ArrayList<>(); for (InetSocketAddress iaddr : sameGroupAddrs) { Set tset = new HashSet<>(); tset.add(iaddr); - sameGroupTransports.add(loadTransport(this.nodeGroup, server.getProtocol(), tset)); + sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset)); } - service = Sncp.createLocalService(entry.getName(), type, this.nodeAddress, sameGroupTransports, diffGroupTransports); + service = Sncp.createLocalService(entry.getName(), type, this.sncpAddress, sameGroupTransports, diffGroupTransports); } else { - StringBuilder g = new StringBuilder(this.nodeGroup); + StringBuilder g = new StringBuilder(this.sncpGroup); diffGroupAddrs.forEach((k, v) -> { if (g.length() > 0) g.append(';'); g.append(k); sameGroupAddrs.addAll(v); }); - if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type + ":" + group); - service = Sncp.createRemoteService(entry.getName(), type, this.nodeAddress, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs)); + if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")"); + service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs)); } ServiceWrapper wrapper = new ServiceWrapper(type, service, entry); if (factory.find(wrapper.getName(), wrapper.getType()) == null) { @@ -300,7 +297,7 @@ public abstract class NodeServer { if (consumer != null) consumer.accept(wrapper); } } else if (isSNCP()) { - throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + group + ") is repeat."); + throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat."); } } servicecdl.countDown(); @@ -327,7 +324,7 @@ public abstract class NodeServer { } protected ClassFilter createServiceClassFilter(final AnyValue config) { - return createClassFilter(this.nodeGroup, config, null, Service.class, Annotation.class, "services", "service"); + return createClassFilter(this.sncpGroup, config, null, Service.class, Annotation.class, "services", "service"); } protected static ClassFilter createClassFilter(final String localGroup, final AnyValue config, Class ref, diff --git a/src/com/wentch/redkale/net/sncp/SncpRequest.java b/src/com/wentch/redkale/net/sncp/SncpRequest.java index a2315f4c2..0544a74cd 100644 --- a/src/com/wentch/redkale/net/sncp/SncpRequest.java +++ b/src/com/wentch/redkale/net/sncp/SncpRequest.java @@ -112,6 +112,7 @@ public final class SncpRequest extends Request { @Override protected void prepare() { + this.keepAlive = true; if (this.body == null) return; byte[] bytes = this.body; int pos = 0;