This commit is contained in:
Redkale
2017-06-05 09:18:05 +08:00
parent 2bdf0e4a50
commit 08060a8c86
9 changed files with 60 additions and 49 deletions

View File

@@ -96,7 +96,7 @@
</resources> </resources>
<!-- <!--
protocol: required server所启动的协议Redkale内置的有HTTP、SNCP、WATCHSNCP使用TCP实现; protocol: required server所启动的协议Redkale内置的有HTTP、SNCP、WATCH。协议均使用TCP实现; WATCH服务只能存在一个。
name: 服务的名称用于监控识别一个配置文件中的server.name不能重复,命名规则: 字母、数字、下划线 name: 服务的名称用于监控识别一个配置文件中的server.name不能重复,命名规则: 字母、数字、下划线
host: 服务所占address 默认: 0.0.0.0 host: 服务所占address 默认: 0.0.0.0
port: required 服务所占端口 port: required 服务所占端口

View File

@@ -568,13 +568,12 @@ public final class Application {
others.add(entry); others.add(entry);
} }
} }
if (watchs.size() > 1) throw new RuntimeException("Found one more WATCH Server");
this.watching = !watchs.isEmpty(); this.watching = !watchs.isEmpty();
//单向SNCP服务不需要对等group
//if (!sncps.isEmpty() && globalNodes.isEmpty()) throw new RuntimeException("found SNCP Server node but not found <group> node info.");
runServers(timecd, sncps); //必须确保sncp都启动后再启动其他协议 runServers(timecd, sncps); //必须确保SNCP服务都启动后再启动其他服务
runServers(timecd, others); runServers(timecd, others);
runServers(timecd, watchs); //必须在所有server都启动后再启动 runServers(timecd, watchs); //必须在所有服务都启动后再启动WATCH服务
timecd.await(); timecd.await();
logger.info(this.getClass().getSimpleName() + " started in " + (System.currentTimeMillis() - startTime) + " ms\r\n"); logger.info(this.getClass().getSimpleName() + " started in " + (System.currentTimeMillis() - startTime) + " ms\r\n");
if (!singletonrun) this.serversLatch.await(); if (!singletonrun) this.serversLatch.await();

View File

@@ -6,9 +6,9 @@
package org.redkale.boot.watch; package org.redkale.boot.watch;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.redkale.boot.*; import org.redkale.boot.*;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.service.RetResult; import org.redkale.service.RetResult;
import org.redkale.util.Comment; import org.redkale.util.Comment;
@@ -20,32 +20,30 @@ import org.redkale.util.Comment;
@RestService(name = "filter", catalog = "watch", repair = false) @RestService(name = "filter", catalog = "watch", repair = false)
public class FilterWatchService extends AbstractWatchService { public class FilterWatchService extends AbstractWatchService {
@Comment("不存在的Group节点")
public static final int RET_NO_GROUP = 1601_0001;
@Comment("Filter类名不存在") @Comment("Filter类名不存在")
public static final int RET_FILTER_TYPE_ILLEGAL = 1601_0002; public static final int RET_FILTER_TYPE_NOT_EXISTS = 1601_0002;
@Comment("Node节点IP地址已存在") @Comment("Filter类名不合法")
public static final int RET_FILTER_EXISTS = 1601_0003; public static final int RET_FILTER_TYPE_ILLEGAL = 1601_0003;
@Comment("Node节点IP地址已存在") @Comment("Filter类名已存在")
public static final int RET_JAR_ILLEGAL = 1601_0004; public static final int RET_FILTER_EXISTS = 1601_0004;
@Comment("Filter的JAR包不存在")
public static final int RET_FILTER_JAR_ILLEGAL = 1601_0005;
@Resource @Resource
private Application application; private Application application;
@Resource
private TransportFactory transportFactory;
@RestMapping(name = "addfilter", auth = false, comment = "动态增加Filter") @RestMapping(name = "addfilter", auth = false, comment = "动态增加Filter")
public RetResult addFilter(@RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar, public RetResult addFilter(@RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar,
@RestParam(name = "server", comment = "Server节点名, 不指定名称则所有符合条件的Server都会增加Filter") final String serverName, @RestParam(name = "server", comment = "Server节点名") final String serverName,
@RestParam(name = "type", comment = "Filter类名") final String filterType) throws IOException { @RestParam(name = "type", comment = "Filter类名") final String filterType) throws IOException {
if (filterType == null) return new RetResult(RET_FILTER_TYPE_ILLEGAL, "Filter Type (" + filterType + ") is illegal"); if (filterType == null) return new RetResult(RET_FILTER_TYPE_NOT_EXISTS, "Not found Filter Type (" + filterType + ")");
if (jar == null) return new RetResult(RET_JAR_ILLEGAL, "Not found jar file"); if (jar == null) return new RetResult(RET_FILTER_JAR_ILLEGAL, "Not found jar file");
for (NodeServer node : application.getNodeServers()) { List<NodeServer> nodes = application.getNodeServers();
if (node.getServer().containsFilter(filterType)) return new RetResult(RET_JAR_ILLEGAL, "Filter(" + filterType + ") exists"); for (NodeServer node : nodes) {
if (node.getServer().containsFilter(filterType)) return new RetResult(RET_FILTER_EXISTS, "Filter(" + filterType + ") exists");
} }
return RetResult.success(); return RetResult.success();
} }

View File

@@ -27,13 +27,13 @@ import org.redkale.util.AnyValue.DefaultAnyValue;
public class TransportWatchService extends AbstractWatchService { public class TransportWatchService extends AbstractWatchService {
@Comment("不存在的Group节点") @Comment("不存在的Group节点")
public static final int RET_NO_GROUP = 1606_0001; public static final int RET_TRANSPORT_GROUP_NOT_EXISTS = 1606_0001;
@Comment("非法的Node节点IP地址") @Comment("非法的Node节点IP地址")
public static final int RET_ADDR_ILLEGAL = 1606_0002; public static final int RET_TRANSPORT_ADDR_ILLEGAL = 1606_0002;
@Comment("Node节点IP地址已存在") @Comment("Node节点IP地址已存在")
public static final int RET_ADDR_EXISTS = 1606_0003; public static final int RET_TRANSPORT_ADDR_EXISTS = 1606_0003;
@Resource @Resource
private Application application; private Application application;
@@ -41,14 +41,13 @@ public class TransportWatchService extends AbstractWatchService {
@Resource @Resource
private TransportFactory transportFactory; private TransportFactory transportFactory;
@RestMapping(name = "nodes", auth = false, comment = "获取所有Node节点") @RestMapping(name = "listnodes", auth = false, comment = "获取所有Node节点")
public RetResult<List<TransportGroupInfo>> addNode() { public List<TransportGroupInfo> listNodes() {
return new RetResult<>(transportFactory.getGroupInfos()); return transportFactory.getGroupInfos();
} }
@RestMapping(name = "addnode", auth = false, comment = "动态增加指定Group的Node节点") @RestMapping(name = "addnode", auth = false, comment = "动态增加指定Group的Node节点")
public RetResult addNode( public RetResult addNode(@RestParam(name = "group", comment = "Group节点名") final String group,
@RestParam(name = "group", comment = "Group节点名") final String group,
@RestParam(name = "addr", comment = "节点IP") final String addr, @RestParam(name = "addr", comment = "节点IP") final String addr,
@RestParam(name = "port", comment = "节点端口") final int port) throws IOException { @RestParam(name = "port", comment = "节点端口") final int port) throws IOException {
InetSocketAddress address; InetSocketAddress address;
@@ -58,12 +57,12 @@ public class TransportWatchService extends AbstractWatchService {
channel.connect(address).get(2, TimeUnit.SECONDS); //连接超时2秒 channel.connect(address).get(2, TimeUnit.SECONDS); //连接超时2秒
channel.close(); channel.close();
} catch (Exception e) { } catch (Exception e) {
return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is illegal or cannot connect"); return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is illegal or cannot connect");
} }
if (transportFactory.findGroupName(address) != null) return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is exists"); if (transportFactory.findGroupName(address) != null) return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is exists");
synchronized (this) { synchronized (this) {
if (transportFactory.findGroupInfo(group) == null) { if (transportFactory.findGroupInfo(group) == null) {
return new RetResult(RET_NO_GROUP, "not found group (" + group + ")"); return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")");
} }
transportFactory.addGroupInfo(group, address); transportFactory.addGroupInfo(group, address);
for (Service service : transportFactory.getServices()) { for (Service service : transportFactory.getServices()) {
@@ -97,16 +96,15 @@ public class TransportWatchService extends AbstractWatchService {
} }
@RestMapping(name = "removenode", auth = false, comment = "动态删除指定Group的Node节点") @RestMapping(name = "removenode", auth = false, comment = "动态删除指定Group的Node节点")
public RetResult removeNode( public RetResult removeNode(@RestParam(name = "group", comment = "Group节点名") final String group,
@RestParam(name = "group", comment = "Group节点名") final String group,
@RestParam(name = "addr", comment = "节点IP") final String addr, @RestParam(name = "addr", comment = "节点IP") final String addr,
@RestParam(name = "port", comment = "节点端口") final int port) throws IOException { @RestParam(name = "port", comment = "节点端口") final int port) throws IOException {
if (group == null) return new RetResult(RET_NO_GROUP, "not found group (" + group + ")"); if (group == null) return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")");
final InetSocketAddress address = new InetSocketAddress(addr, port); final InetSocketAddress address = new InetSocketAddress(addr, port);
if (!group.equals(transportFactory.findGroupName(address))) return new RetResult(RET_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") not belong to group(" + group + ")"); if (!group.equals(transportFactory.findGroupName(address))) return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") not belong to group(" + group + ")");
synchronized (this) { synchronized (this) {
if (transportFactory.findGroupInfo(group) == null) { if (transportFactory.findGroupInfo(group) == null) {
return new RetResult(RET_NO_GROUP, "not found group (" + group + ")"); return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")");
} }
transportFactory.removeGroupInfo(group, address); transportFactory.removeGroupInfo(group, address);
for (Service service : transportFactory.getServices()) { for (Service service : transportFactory.getServices()) {

View File

@@ -150,6 +150,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
filter._conf = conf; filter._conf = conf;
synchronized (filters) { synchronized (filters) {
this.filters.add(filter); this.filters.add(filter);
Collections.sort(this.filters);
} }
} }
@@ -197,6 +198,10 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
} }
} }
public <T extends Filter<C, R, P>> List<T> getFilters() {
return (List) new ArrayList<>(filters);
}
public abstract void addServlet(S servlet, Object attachment, AnyValue conf, K... mappings); public abstract void addServlet(S servlet, Object attachment, AnyValue conf, K... mappings);
public final void prepare(final ByteBuffer buffer, final R request, final P response) throws IOException { public final void prepare(final ByteBuffer buffer, final R request, final P response) throws IOException {
@@ -258,7 +263,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
servlet._conf = conf; servlet._conf = conf;
} }
public Set<S> getServlets() { public List<S> getServlets() {
return new LinkedHashSet<>(servlets); return new ArrayList<>(servlets);
} }
} }

View File

@@ -38,6 +38,14 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
super.init(config); super.init(config);
} }
public List<HttpServlet> getHttpServlets() {
return this.prepare.getServlets();
}
public List<HttpFilter> getHttpFilters() {
return this.prepare.getFilters();
}
/** /**
* 获取静态资源HttpServlet * 获取静态资源HttpServlet
* *

View File

@@ -9,7 +9,6 @@ import org.redkale.net.PrepareServlet;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.*; import org.redkale.util.*;
@@ -55,10 +54,6 @@ public class SncpPrepareServlet extends PrepareServlet<DLong, SncpContext, SncpR
return rs; return rs;
} }
public List<SncpServlet> getSncpServlets() {
return new ArrayList<>(getServlets());
}
@Override @Override
public void init(SncpContext context, AnyValue config) { public void init(SncpContext context, AnyValue config) {
super.init(context, config); //必须要执行 super.init(context, config); //必须要执行

View File

@@ -37,6 +37,14 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
super.init(config); super.init(config);
} }
public List<SncpServlet> getSncpServlets() {
return this.prepare.getServlets();
}
public List<SncpFilter> getSncpFilters() {
return this.prepare.getFilters();
}
/** /**
* 删除SncpFilter * 删除SncpFilter
* *
@@ -78,10 +86,6 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
this.prepare.addServlet(sds, null, Sncp.getConf(sncpService)); this.prepare.addServlet(sds, null, Sncp.getConf(sncpService));
} }
public List<SncpServlet> getSncpServlets() {
return ((SncpPrepareServlet) this.prepare).getSncpServlets();
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected SncpContext createContext() { protected SncpContext createContext() {

View File

@@ -40,6 +40,10 @@ public abstract class SncpServlet extends Servlet<SncpContext, SncpRequest, Sncp
return serviceName; return serviceName;
} }
public Class getServiceType() {
return type;
}
public abstract DLong getServiceid(); public abstract DLong getServiceid();
protected ExecutorService getExecutor() { protected ExecutorService getExecutor() {