This commit is contained in:
Redkale
2017-06-03 16:50:40 +08:00
parent 53a35e6397
commit 10e22b0873
12 changed files with 317 additions and 40 deletions

View File

@@ -37,4 +37,9 @@ public class NodeWatchServer extends NodeHttpServer {
protected ClassFilter<Servlet> createServletClassFilter() {
return createClassFilter(null, WebServlet.class, WatchServlet.class, null, null, "servlets", "servlet");
}
@Override
protected ClassFilter createOtherClassFilter() {
return null;
}
}

View File

@@ -0,0 +1,18 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot.watch;
import org.redkale.net.http.RestService;
import org.redkale.watch.WatchService;
/**
*
* @author zhangjx
*/
@RestService(name = "filter", catalog = "watch", repair = false)
public class FilterWatchService implements WatchService {
}

View File

@@ -0,0 +1,18 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot.watch;
import org.redkale.net.http.RestService;
import org.redkale.watch.WatchService;
/**
*
* @author zhangjx
*/
@RestService(name = "server", catalog = "watch", repair = false)
public class ServerWatchService implements WatchService {
}

View File

@@ -0,0 +1,40 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot.watch;
import javax.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.*;
import org.redkale.watch.WatchService;
/**
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@RestService(name = "service", catalog = "watch", repair = false)
public class ServiceWatchService implements WatchService {
@Resource
private Application application;
@Resource
private TransportFactory transportFactory;
// @RestMapping(name = "load", auth = false, comment = "动态增加Service")
// public RetResult loadService(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) {
// //待开发
// return RetResult.success();
// }
//
// @RestMapping(name = "stop", auth = false, comment = "动态停止Service")
// public RetResult stopService(String name, String type) {
// //待开发
// return RetResult.success();
// }
}

View File

@@ -0,0 +1,40 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot.watch;
import javax.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.*;
import org.redkale.watch.WatchService;
/**
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@RestService(name = "servlet", catalog = "watch", repair = false)
public class ServletWatchService implements WatchService {
@Resource
private Application application;
@Resource
private TransportFactory transportFactory;
//
// @RestMapping(name = "load", auth = false, comment = "动态增加Servlet")
// public RetResult loadServlet(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) {
// //待开发
// return RetResult.success();
// }
//
// @RestMapping(name = "stop", auth = false, comment = "动态停止Servlet")
// public RetResult stopServlet(String type) {
// //待开发
// return RetResult.success();
// }
}

View File

@@ -0,0 +1,119 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot.watch;
import java.net.*;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.*;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.util.Comment;
import org.redkale.watch.WatchService;
/**
*
* @author zhangjx
*/
@RestService(name = "transport", catalog = "watch", repair = false)
public class TransportWatchService implements WatchService {
@Comment("不存在的Group节点")
public static final int RET_NO_GROUP = 1605_0001;
@Comment("非法的Node节点IP地址")
public static final int RET_ADDR_ILLEGAL = 1605_0002;
@Comment("Node节点IP地址已存在")
public static final int RET_ADDR_EXISTS = 1605_0003;
@Resource
private Application application;
@Resource
private TransportFactory transportFactory;
@RestMapping(name = "addnode", auth = false, comment = "动态增加指定Group的Node节点")
public RetResult addNode(
@RestParam(name = "group", comment = "Group节点名") final String group,
@RestParam(name = "addr", comment = "节点IP") final String addr,
@RestParam(name = "port", comment = "节点端口") final int port) {
InetSocketAddress address;
try {
address = new InetSocketAddress(addr, port);
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
channel.connect(address).get(2, TimeUnit.SECONDS); //连接超时2秒
channel.close();
} catch (Exception e) {
e.printStackTrace();
return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is illegal");
}
if (transportFactory.findGroupName(address) != null) return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is exists");
synchronized (this) {
if (transportFactory.findGroupInfo(group) == null) {
return new RetResult(RET_NO_GROUP, "not found group (" + group + ")");
}
transportFactory.addGroupInfo(group, address);
for (Service service : transportFactory.getServices()) {
if (!Sncp.isSncpDyn(service)) continue;
SncpClient client = Sncp.getSncpClient(service);
if (Sncp.isRemote(service)) {
if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) {
client.getRemoteGroupTransport().addRemoteAddresses(address);
}
} else {
if (group.equals(client.getSameGroup())) {
client.getSameGroupTransport().addRemoteAddresses(address);
}
if (client.getDiffGroups() != null && client.getDiffGroups().contains(group)) {
for (Transport transport : client.getDiffGroupTransports()) {
transport.addRemoteAddresses(address);
}
}
}
}
}
return RetResult.success();
}
@RestMapping(name = "removenode", auth = false, comment = "动态删除指定Group的Node节点")
public RetResult removeNode(
@RestParam(name = "group", comment = "Group节点名") final String group,
@RestParam(name = "addr", comment = "节点IP") final String addr,
@RestParam(name = "port", comment = "节点端口") final int port) {
if (group == null) return new RetResult(RET_NO_GROUP, "not found group (" + group + ")");
final InetSocketAddress address = new InetSocketAddress(addr, port);
if (!group.equals(transportFactory.findGroupName(address))) return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") not belong to group(" + group + ")");
synchronized (this) {
if (transportFactory.findGroupInfo(group) == null) {
return new RetResult(RET_NO_GROUP, "not found group (" + group + ")");
}
transportFactory.removeGroupInfo(group, address);
for (Service service : transportFactory.getServices()) {
if (!Sncp.isSncpDyn(service)) continue;
SncpClient client = Sncp.getSncpClient(service);
if (Sncp.isRemote(service)) {
if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) {
client.getRemoteGroupTransport().removeRemoteAddresses(address);
}
} else {
if (group.equals(client.getSameGroup())) {
client.getSameGroupTransport().removeRemoteAddresses(address);
}
if (client.getDiffGroups() != null && client.getDiffGroups().contains(group)) {
for (Transport transport : client.getDiffGroupTransports()) {
transport.removeRemoteAddresses(address);
}
}
}
}
}
return RetResult.success();
}
}

View File

@@ -11,8 +11,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.redkale.util.ObjectPool;
import org.redkale.util.*;
/**
* 传输客户端
@@ -76,30 +75,6 @@ public final class Transport {
updateRemoteAddresses(addresses);
}
public Transport(final Collection<Transport> transports) {
Transport first = null;
List<String> tmpgroup = new ArrayList<>();
if (transports != null) {
for (Transport t : transports) {
if (first == null) first = t;
tmpgroup.add(t.name);
}
}
if (first == null) throw new NullPointerException("Collection<Transport> is null or empty");
//必须按字母排列顺序确保相同内容的transport列表组合的name相同而不会因为list的顺序不同产生不同的name
this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";"));
//this.watch = first.watch;
this.subprotocol = first.subprotocol;
this.protocol = first.protocol;
this.tcp = "TCP".equalsIgnoreCase(first.protocol);
this.group = first.group;
this.bufferPool = first.bufferPool;
this.clientAddress = first.clientAddress;
Set<InetSocketAddress> addrs = new HashSet<>();
transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
updateRemoteAddresses(addrs);
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
InetSocketAddress[] oldAddresses = this.remoteAddres;
List<InetSocketAddress> list = new ArrayList<>();
@@ -113,6 +88,30 @@ public final class Transport {
return oldAddresses;
}
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
if (addr == null) return false;
synchronized (this) {
if (this.remoteAddres == null) {
this.remoteAddres = new InetSocketAddress[]{addr};
} else {
for (InetSocketAddress i : this.remoteAddres) {
if (addr.equals(i)) return false;
}
this.remoteAddres = Utility.append(remoteAddres, addr);
}
return true;
}
}
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
if (this.remoteAddres == null) return false;
synchronized (this) {
this.remoteAddres = Utility.remove(remoteAddres, addr);
}
return true;
}
public String getName() {
return name;
}

View File

@@ -12,6 +12,7 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import java.util.stream.Collectors;
import org.redkale.service.Service;
import org.redkale.util.ObjectPool;
@@ -53,14 +54,24 @@ public class TransportFactory {
return groupAddrs.get(addr);
}
public TransportGroupInfo findGroupInfo2(String group) {
public TransportGroupInfo findGroupInfo(String group) {
if (group == null) return null;
return groupInfos.get(group);
}
public TransportFactory addGroupInfo(String name, InetSocketAddress... addrs) {
addGroupInfo(new TransportGroupInfo(name, addrs));
return this;
public boolean addGroupInfo(String groupName, InetSocketAddress... addrs) {
addGroupInfo(new TransportGroupInfo(groupName, addrs));
return true;
}
public boolean removeGroupInfo(String groupName, InetSocketAddress addr) {
if (groupName == null || groupName.isEmpty() || addr == null) return false;
if (!groupName.equals(groupAddrs.get(addr))) return false;
TransportGroupInfo group = groupInfos.get(groupName);
if (group == null) return false;
group.removeAddress(addr);
groupAddrs.remove(addr);
return true;
}
public TransportFactory addGroupInfo(String name, Set<InetSocketAddress> addrs) {
@@ -116,7 +127,7 @@ public class TransportFactory {
}
if (info == null) return null;
if (sncpAddress != null) addresses.remove(sncpAddress);
return new Transport("remotes", info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses);
}
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {

View File

@@ -90,20 +90,34 @@ public class TransportGroupInfo {
}
public boolean containsAddress(InetSocketAddress addr) {
if (this.addresses == null) return false;
return this.addresses.contains(addr);
synchronized (this) {
if (this.addresses == null) return false;
return this.addresses.contains(addr);
}
}
public void removeAddress(InetSocketAddress addr) {
if (addr == null) return;
synchronized (this) {
if (this.addresses == null) return;
this.addresses.remove(addr);
}
}
public void putAddress(InetSocketAddress addr) {
if (addr == null) return;
if (this.addresses == null) this.addresses = new HashSet<>();
this.addresses.add(addr);
synchronized (this) {
if (this.addresses == null) this.addresses = new HashSet<>();
this.addresses.add(addr);
}
}
public void putAddress(Set<InetSocketAddress> addrs) {
if (addrs == null) return;
if (this.addresses == null) this.addresses = new HashSet<>();
this.addresses.addAll(addrs);
synchronized (this) {
if (this.addresses == null) this.addresses = new HashSet<>();
this.addresses.addAll(addrs);
}
}
@Override

View File

@@ -27,9 +27,9 @@ import org.redkale.util.*;
*/
public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse> {
public static final int RET_SERVER_ERROR = 1800_0001;
public static final int RET_SERVER_ERROR = 1200_0001;
public static final int RET_METHOD_ERROR = 1800_0002;
public static final int RET_METHOD_ERROR = 1200_0002;
String _prefix = ""; //当前HttpServlet的path前缀

View File

@@ -861,7 +861,7 @@ public final class Rest {
}
}
av0 = mv.visitAnnotation(mappingDesc, true);
String url = "/" + defmodulename.toLowerCase() + "/" + entry.name + (reqpath ? "/" : "");
String url = (catalog.isEmpty() ? "/" : ("/" + catalog + "/")) + defmodulename.toLowerCase() + "/" + entry.name + (reqpath ? "/" : "");
av0.visit("url", url);
av0.visit("auth", entry.auth);
av0.visit("cacheseconds", entry.cacheseconds);

View File

@@ -277,6 +277,19 @@ public final class Utility {
return news;
}
/**
* 将元素从数组中删除
*
* @param <T> 泛型
* @param array 原数组
* @param item 元素
*
* @return 新数组
*/
public static <T> T[] remove(final T[] array, final T item) {
return remove(array, (i) -> item.equals(item));
}
/**
* 将符合条件的元素从数组中删除
*