From 10e22b0873852a2cc74a85c01a63fdca0c28b036 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 3 Jun 2017 16:50:40 +0800 Subject: [PATCH] --- src/org/redkale/boot/NodeWatchServer.java | 5 + .../boot/watch/FilterWatchService.java | 18 +++ .../boot/watch/ServerWatchService.java | 18 +++ .../boot/watch/ServiceWatchService.java | 40 ++++++ .../boot/watch/ServletWatchService.java | 40 ++++++ .../boot/watch/TransportWatchService.java | 119 ++++++++++++++++++ src/org/redkale/net/Transport.java | 51 ++++---- src/org/redkale/net/TransportFactory.java | 21 +++- src/org/redkale/net/TransportGroupInfo.java | 26 +++- src/org/redkale/net/http/HttpServlet.java | 4 +- src/org/redkale/net/http/Rest.java | 2 +- src/org/redkale/util/Utility.java | 13 ++ 12 files changed, 317 insertions(+), 40 deletions(-) create mode 100644 src/org/redkale/boot/watch/FilterWatchService.java create mode 100644 src/org/redkale/boot/watch/ServerWatchService.java create mode 100644 src/org/redkale/boot/watch/ServiceWatchService.java create mode 100644 src/org/redkale/boot/watch/ServletWatchService.java create mode 100644 src/org/redkale/boot/watch/TransportWatchService.java diff --git a/src/org/redkale/boot/NodeWatchServer.java b/src/org/redkale/boot/NodeWatchServer.java index 439fa5ae5..e1afd2861 100644 --- a/src/org/redkale/boot/NodeWatchServer.java +++ b/src/org/redkale/boot/NodeWatchServer.java @@ -37,4 +37,9 @@ public class NodeWatchServer extends NodeHttpServer { protected ClassFilter createServletClassFilter() { return createClassFilter(null, WebServlet.class, WatchServlet.class, null, null, "servlets", "servlet"); } + + @Override + protected ClassFilter createOtherClassFilter() { + return null; + } } diff --git a/src/org/redkale/boot/watch/FilterWatchService.java b/src/org/redkale/boot/watch/FilterWatchService.java new file mode 100644 index 000000000..88fbf4a65 --- /dev/null +++ b/src/org/redkale/boot/watch/FilterWatchService.java @@ -0,0 +1,18 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.boot.watch; + +import org.redkale.net.http.RestService; +import org.redkale.watch.WatchService; + +/** + * + * @author zhangjx + */ +@RestService(name = "filter", catalog = "watch", repair = false) +public class FilterWatchService implements WatchService { + +} diff --git a/src/org/redkale/boot/watch/ServerWatchService.java b/src/org/redkale/boot/watch/ServerWatchService.java new file mode 100644 index 000000000..b2580e316 --- /dev/null +++ b/src/org/redkale/boot/watch/ServerWatchService.java @@ -0,0 +1,18 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.boot.watch; + +import org.redkale.net.http.RestService; +import org.redkale.watch.WatchService; + +/** + * + * @author zhangjx + */ +@RestService(name = "server", catalog = "watch", repair = false) +public class ServerWatchService implements WatchService { + +} diff --git a/src/org/redkale/boot/watch/ServiceWatchService.java b/src/org/redkale/boot/watch/ServiceWatchService.java new file mode 100644 index 000000000..a079b1e9c --- /dev/null +++ b/src/org/redkale/boot/watch/ServiceWatchService.java @@ -0,0 +1,40 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.boot.watch; + +import javax.annotation.Resource; +import org.redkale.boot.Application; +import org.redkale.net.TransportFactory; +import org.redkale.net.http.*; +import org.redkale.watch.WatchService; + +/** + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +@RestService(name = "service", catalog = "watch", repair = false) +public class ServiceWatchService implements WatchService { + + @Resource + private Application application; + + @Resource + private TransportFactory transportFactory; + +// @RestMapping(name = "load", auth = false, comment = "动态增加Service") +// public RetResult loadService(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) { +// //待开发 +// return RetResult.success(); +// } +// +// @RestMapping(name = "stop", auth = false, comment = "动态停止Service") +// public RetResult stopService(String name, String type) { +// //待开发 +// return RetResult.success(); +// } +} diff --git a/src/org/redkale/boot/watch/ServletWatchService.java b/src/org/redkale/boot/watch/ServletWatchService.java new file mode 100644 index 000000000..805e94d06 --- /dev/null +++ b/src/org/redkale/boot/watch/ServletWatchService.java @@ -0,0 +1,40 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.boot.watch; + +import javax.annotation.Resource; +import org.redkale.boot.Application; +import org.redkale.net.TransportFactory; +import org.redkale.net.http.*; +import org.redkale.watch.WatchService; + +/** + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +@RestService(name = "servlet", catalog = "watch", repair = false) +public class ServletWatchService implements WatchService { + + @Resource + private Application application; + + @Resource + private TransportFactory transportFactory; +// +// @RestMapping(name = "load", auth = false, comment = "动态增加Servlet") +// public RetResult loadServlet(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) { +// //待开发 +// return RetResult.success(); +// } +// +// @RestMapping(name = "stop", auth = false, comment = "动态停止Servlet") +// public RetResult stopServlet(String type) { +// //待开发 +// return RetResult.success(); +// } +} diff --git a/src/org/redkale/boot/watch/TransportWatchService.java b/src/org/redkale/boot/watch/TransportWatchService.java new file mode 100644 index 000000000..ede14fcb8 --- /dev/null +++ b/src/org/redkale/boot/watch/TransportWatchService.java @@ -0,0 +1,119 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.boot.watch; + +import java.net.*; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.TimeUnit; +import javax.annotation.Resource; +import org.redkale.boot.Application; +import org.redkale.net.*; +import org.redkale.net.http.*; +import org.redkale.net.sncp.*; +import org.redkale.service.*; +import org.redkale.util.Comment; +import org.redkale.watch.WatchService; + +/** + * + * @author zhangjx + */ +@RestService(name = "transport", catalog = "watch", repair = false) +public class TransportWatchService implements WatchService { + + @Comment("不存在的Group节点") + public static final int RET_NO_GROUP = 1605_0001; + + @Comment("非法的Node节点IP地址") + public static final int RET_ADDR_ILLEGAL = 1605_0002; + + @Comment("Node节点IP地址已存在") + public static final int RET_ADDR_EXISTS = 1605_0003; + + @Resource + private Application application; + + @Resource + private TransportFactory transportFactory; + + @RestMapping(name = "addnode", auth = false, comment = "动态增加指定Group的Node节点") + public RetResult addNode( + @RestParam(name = "group", comment = "Group节点名") final String group, + @RestParam(name = "addr", comment = "节点IP") final String addr, + @RestParam(name = "port", comment = "节点端口") final int port) { + InetSocketAddress address; + try { + address = new InetSocketAddress(addr, port); + AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); + channel.connect(address).get(2, TimeUnit.SECONDS); //连接超时2秒 + channel.close(); + } catch (Exception e) { + e.printStackTrace(); + return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is illegal"); + } + if (transportFactory.findGroupName(address) != null) return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is exists"); + synchronized (this) { + if (transportFactory.findGroupInfo(group) == null) { + return new RetResult(RET_NO_GROUP, "not found group (" + group + ")"); + } + transportFactory.addGroupInfo(group, address); + for (Service service : transportFactory.getServices()) { + if (!Sncp.isSncpDyn(service)) continue; + SncpClient client = Sncp.getSncpClient(service); + if (Sncp.isRemote(service)) { + if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) { + client.getRemoteGroupTransport().addRemoteAddresses(address); + } + } else { + if (group.equals(client.getSameGroup())) { + client.getSameGroupTransport().addRemoteAddresses(address); + } + if (client.getDiffGroups() != null && client.getDiffGroups().contains(group)) { + for (Transport transport : client.getDiffGroupTransports()) { + transport.addRemoteAddresses(address); + } + } + } + } + } + return RetResult.success(); + } + + @RestMapping(name = "removenode", auth = false, comment = "动态删除指定Group的Node节点") + public RetResult removeNode( + @RestParam(name = "group", comment = "Group节点名") final String group, + @RestParam(name = "addr", comment = "节点IP") final String addr, + @RestParam(name = "port", comment = "节点端口") final int port) { + if (group == null) return new RetResult(RET_NO_GROUP, "not found group (" + group + ")"); + final InetSocketAddress address = new InetSocketAddress(addr, port); + if (!group.equals(transportFactory.findGroupName(address))) return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") not belong to group(" + group + ")"); + synchronized (this) { + if (transportFactory.findGroupInfo(group) == null) { + return new RetResult(RET_NO_GROUP, "not found group (" + group + ")"); + } + transportFactory.removeGroupInfo(group, address); + for (Service service : transportFactory.getServices()) { + if (!Sncp.isSncpDyn(service)) continue; + SncpClient client = Sncp.getSncpClient(service); + if (Sncp.isRemote(service)) { + if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) { + client.getRemoteGroupTransport().removeRemoteAddresses(address); + } + } else { + if (group.equals(client.getSameGroup())) { + client.getSameGroupTransport().removeRemoteAddresses(address); + } + if (client.getDiffGroups() != null && client.getDiffGroups().contains(group)) { + for (Transport transport : client.getDiffGroupTransports()) { + transport.removeRemoteAddresses(address); + } + } + } + } + } + return RetResult.success(); + } +} diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 213d5ade9..aec11106a 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -11,8 +11,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.redkale.util.ObjectPool; +import org.redkale.util.*; /** * 传输客户端 @@ -76,30 +75,6 @@ public final class Transport { updateRemoteAddresses(addresses); } - public Transport(final Collection transports) { - Transport first = null; - List tmpgroup = new ArrayList<>(); - if (transports != null) { - for (Transport t : transports) { - if (first == null) first = t; - tmpgroup.add(t.name); - } - } - if (first == null) throw new NullPointerException("Collection is null or empty"); - //必须按字母排列顺序确保,相同内容的transport列表组合的name相同,而不会因为list的顺序不同产生不同的name - this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";")); - //this.watch = first.watch; - this.subprotocol = first.subprotocol; - this.protocol = first.protocol; - this.tcp = "TCP".equalsIgnoreCase(first.protocol); - this.group = first.group; - this.bufferPool = first.bufferPool; - this.clientAddress = first.clientAddress; - Set addrs = new HashSet<>(); - transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses()))); - updateRemoteAddresses(addrs); - } - public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { InetSocketAddress[] oldAddresses = this.remoteAddres; List list = new ArrayList<>(); @@ -113,6 +88,30 @@ public final class Transport { return oldAddresses; } + public final boolean addRemoteAddresses(final InetSocketAddress addr) { + if (addr == null) return false; + synchronized (this) { + if (this.remoteAddres == null) { + this.remoteAddres = new InetSocketAddress[]{addr}; + } else { + for (InetSocketAddress i : this.remoteAddres) { + if (addr.equals(i)) return false; + } + this.remoteAddres = Utility.append(remoteAddres, addr); + } + return true; + } + } + + public final boolean removeRemoteAddresses(InetSocketAddress addr) { + if (addr == null) return false; + if (this.remoteAddres == null) return false; + synchronized (this) { + this.remoteAddres = Utility.remove(remoteAddres, addr); + } + return true; + } + public String getName() { return name; } diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 06ffa7dc3..aaf55ff3a 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -12,6 +12,7 @@ import java.nio.channels.AsynchronousChannelGroup; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; +import java.util.stream.Collectors; import org.redkale.service.Service; import org.redkale.util.ObjectPool; @@ -53,14 +54,24 @@ public class TransportFactory { return groupAddrs.get(addr); } - public TransportGroupInfo findGroupInfo2(String group) { + public TransportGroupInfo findGroupInfo(String group) { if (group == null) return null; return groupInfos.get(group); } - public TransportFactory addGroupInfo(String name, InetSocketAddress... addrs) { - addGroupInfo(new TransportGroupInfo(name, addrs)); - return this; + public boolean addGroupInfo(String groupName, InetSocketAddress... addrs) { + addGroupInfo(new TransportGroupInfo(groupName, addrs)); + return true; + } + + public boolean removeGroupInfo(String groupName, InetSocketAddress addr) { + if (groupName == null || groupName.isEmpty() || addr == null) return false; + if (!groupName.equals(groupAddrs.get(addr))) return false; + TransportGroupInfo group = groupInfos.get(groupName); + if (group == null) return false; + group.removeAddress(addr); + groupAddrs.remove(addr); + return true; } public TransportFactory addGroupInfo(String name, Set addrs) { @@ -116,7 +127,7 @@ public class TransportFactory { } if (info == null) return null; if (sncpAddress != null) addresses.remove(sncpAddress); - return new Transport("remotes", info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses); + return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses); } private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) { diff --git a/src/org/redkale/net/TransportGroupInfo.java b/src/org/redkale/net/TransportGroupInfo.java index 747af158d..48885cf54 100644 --- a/src/org/redkale/net/TransportGroupInfo.java +++ b/src/org/redkale/net/TransportGroupInfo.java @@ -90,20 +90,34 @@ public class TransportGroupInfo { } public boolean containsAddress(InetSocketAddress addr) { - if (this.addresses == null) return false; - return this.addresses.contains(addr); + synchronized (this) { + if (this.addresses == null) return false; + return this.addresses.contains(addr); + } + } + + public void removeAddress(InetSocketAddress addr) { + if (addr == null) return; + synchronized (this) { + if (this.addresses == null) return; + this.addresses.remove(addr); + } } public void putAddress(InetSocketAddress addr) { if (addr == null) return; - if (this.addresses == null) this.addresses = new HashSet<>(); - this.addresses.add(addr); + synchronized (this) { + if (this.addresses == null) this.addresses = new HashSet<>(); + this.addresses.add(addr); + } } public void putAddress(Set addrs) { if (addrs == null) return; - if (this.addresses == null) this.addresses = new HashSet<>(); - this.addresses.addAll(addrs); + synchronized (this) { + if (this.addresses == null) this.addresses = new HashSet<>(); + this.addresses.addAll(addrs); + } } @Override diff --git a/src/org/redkale/net/http/HttpServlet.java b/src/org/redkale/net/http/HttpServlet.java index 14fd38cb8..58b8db678 100644 --- a/src/org/redkale/net/http/HttpServlet.java +++ b/src/org/redkale/net/http/HttpServlet.java @@ -27,9 +27,9 @@ import org.redkale.util.*; */ public class HttpServlet extends Servlet { - public static final int RET_SERVER_ERROR = 1800_0001; + public static final int RET_SERVER_ERROR = 1200_0001; - public static final int RET_METHOD_ERROR = 1800_0002; + public static final int RET_METHOD_ERROR = 1200_0002; String _prefix = ""; //当前HttpServlet的path前缀 diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 8d317a6d8..0bd3cc973 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -861,7 +861,7 @@ public final class Rest { } } av0 = mv.visitAnnotation(mappingDesc, true); - String url = "/" + defmodulename.toLowerCase() + "/" + entry.name + (reqpath ? "/" : ""); + String url = (catalog.isEmpty() ? "/" : ("/" + catalog + "/")) + defmodulename.toLowerCase() + "/" + entry.name + (reqpath ? "/" : ""); av0.visit("url", url); av0.visit("auth", entry.auth); av0.visit("cacheseconds", entry.cacheseconds); diff --git a/src/org/redkale/util/Utility.java b/src/org/redkale/util/Utility.java index 94d3f5363..5d03d1e95 100644 --- a/src/org/redkale/util/Utility.java +++ b/src/org/redkale/util/Utility.java @@ -277,6 +277,19 @@ public final class Utility { return news; } + /** + * 将元素从数组中删除 + * + * @param 泛型 + * @param array 原数组 + * @param item 元素 + * + * @return 新数组 + */ + public static T[] remove(final T[] array, final T item) { + return remove(array, (i) -> item.equals(item)); + } + /** * 将符合条件的元素从数组中删除 *