This commit is contained in:
@@ -37,9 +37,10 @@
|
|||||||
<!--
|
<!--
|
||||||
一个组包含多个NODE, 同一Service服务可以由多个进程提供,这些进程称为一个GROUP,且同一GROUP内的进程必须在同一机房或局域网内
|
一个组包含多个NODE, 同一Service服务可以由多个进程提供,这些进程称为一个GROUP,且同一GROUP内的进程必须在同一机房或局域网内
|
||||||
name: 服务组ID,长度不能超过11个字节. 默认为空字符串。
|
name: 服务组ID,长度不能超过11个字节. 默认为空字符串。
|
||||||
protocol:值只能是UDP TCP, 默认UDP
|
protocol:值只能是UDP TCP, 默认TCP
|
||||||
|
注意: 一个node只能所属一个group。
|
||||||
-->
|
-->
|
||||||
<group name="" protocol="UDP">
|
<group name="" protocol="TCP">
|
||||||
<!--
|
<!--
|
||||||
需要将本地node的addr与port列在此处。
|
需要将本地node的addr与port列在此处。
|
||||||
addr: required IP地址
|
addr: required IP地址
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import com.wentch.redkale.util.*;
|
|||||||
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
|
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
|
||||||
import com.wentch.redkale.watch.*;
|
import com.wentch.redkale.watch.*;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.lang.reflect.*;
|
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.nio.*;
|
import java.nio.*;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
@@ -58,22 +57,18 @@ public final class Application {
|
|||||||
public static final String RESNAME_SNCP_GROUP = "SNCP_GROUP";
|
public static final String RESNAME_SNCP_GROUP = "SNCP_GROUP";
|
||||||
|
|
||||||
//当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String
|
//当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String
|
||||||
public static final String RESNAME_SNCP_ADDR = "SNCP_ADDR";
|
public static final String RESNAME_SNCP_NODE = "SNCP_NODE";
|
||||||
|
|
||||||
//当前SNCP Server的IP地址+端口集合 类型: Map<InetSocketAddress, String>、HashMap<InetSocketAddress, String>
|
//当前SNCP Server的IP地址+端口集合 类型: Map<InetSocketAddress, String>、HashMap<InetSocketAddress, String>
|
||||||
public static final String RESNAME_SNCP_NODES = "SNCP_NODES";
|
public static final String RESNAME_SNCP_NODES = "SNCP_NODES";
|
||||||
|
|
||||||
private static final Type NODES1TYPE = new TypeToken<Map<InetSocketAddress, String>>() {
|
|
||||||
}.getType();
|
|
||||||
|
|
||||||
private static final Type NODES2TYPE = new TypeToken<HashMap<InetSocketAddress, String>>() {
|
|
||||||
}.getType();
|
|
||||||
|
|
||||||
protected final ResourceFactory factory = ResourceFactory.root();
|
protected final ResourceFactory factory = ResourceFactory.root();
|
||||||
|
|
||||||
protected final WatchFactory watch = WatchFactory.root();
|
protected final WatchFactory watch = WatchFactory.root();
|
||||||
|
|
||||||
protected final Map<InetSocketAddress, String> addrGroups = new HashMap<>();
|
protected final Map<InetSocketAddress, String> globalNodes = new HashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, Set<InetSocketAddress>> globalGroups = new HashMap<>();
|
||||||
|
|
||||||
protected final List<Transport> transports = new ArrayList<>();
|
protected final List<Transport> transports = new ArrayList<>();
|
||||||
|
|
||||||
@@ -96,10 +91,11 @@ public final class Application {
|
|||||||
|
|
||||||
private final long startTime = System.currentTimeMillis();
|
private final long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
private CountDownLatch serverscdl;
|
private final CountDownLatch serversLatch;
|
||||||
|
|
||||||
private Application(final AnyValue config) {
|
private Application(final AnyValue config) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
||||||
final File root = new File(System.getProperty(RESNAME_HOME));
|
final File root = new File(System.getProperty(RESNAME_HOME));
|
||||||
this.factory.register(RESNAME_TIME, long.class, this.startTime);
|
this.factory.register(RESNAME_TIME, long.class, this.startTime);
|
||||||
this.factory.register(RESNAME_HOME, Path.class, root.toPath());
|
this.factory.register(RESNAME_HOME, Path.class, root.toPath());
|
||||||
@@ -174,7 +170,8 @@ public final class Application {
|
|||||||
Logger.getLogger(this.getClass().getSimpleName()).log(Level.WARNING, "init logger configuration error", e);
|
Logger.getLogger(this.getClass().getSimpleName()).log(Level.WARNING, "init logger configuration error", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger = Logger.getLogger(this.getClass().getSimpleName());
|
this.logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
|
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public File getHome() {
|
public File getHome() {
|
||||||
@@ -244,44 +241,65 @@ public final class Application {
|
|||||||
initResources();
|
initResources();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends Service> T singleton(Class<T> serviceClass) throws Exception {
|
private void initResources() throws Exception {
|
||||||
return singleton(serviceClass, false);
|
//-------------------------------------------------------------------------
|
||||||
|
final AnyValue resources = config.getAnyValue("resources");
|
||||||
|
if (resources != null) {
|
||||||
|
//------------------------------------------------------------------------
|
||||||
|
AnyValue datacachelistenerConf = resources.getAnyValue("datacachelistener");
|
||||||
|
if (datacachelistenerConf != null) {
|
||||||
|
String val = datacachelistenerConf.getValue("service", "");
|
||||||
|
if (!val.isEmpty()) {
|
||||||
|
if ("none".equalsIgnoreCase(val)) {
|
||||||
|
this.dataCacheListenerClass = null;
|
||||||
|
} else {
|
||||||
|
Class clazz = Class.forName(val);
|
||||||
|
if (!DataCacheListener.class.isAssignableFrom(clazz) || !Service.class.isAssignableFrom(clazz)) {
|
||||||
|
throw new RuntimeException("datacachelistener service (" + val + ") is illegal");
|
||||||
}
|
}
|
||||||
|
this.dataCacheListenerClass = clazz;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//------------------------------------------------------------------------
|
||||||
|
AnyValue websocketnodeConf = resources.getAnyValue("websocketnode");
|
||||||
|
if (websocketnodeConf != null) {
|
||||||
|
String val = websocketnodeConf.getValue("service", "");
|
||||||
|
if (!val.isEmpty()) {
|
||||||
|
if ("none".equalsIgnoreCase(val)) {
|
||||||
|
this.webSocketNodeClass = null;
|
||||||
|
} else {
|
||||||
|
Class clazz = Class.forName(val);
|
||||||
|
if (!WebSocketNode.class.isAssignableFrom(clazz) || !Service.class.isAssignableFrom(clazz)) {
|
||||||
|
throw new RuntimeException("websocketnode service (" + val + ") is illegal");
|
||||||
|
}
|
||||||
|
this.webSocketNodeClass = clazz;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//------------------------------------------------------------------------
|
||||||
|
|
||||||
public static <T extends Service> T singleton(Class<T> serviceClass, boolean remote) throws Exception {
|
for (AnyValue conf : resources.getAnyValues("group")) {
|
||||||
final Application application = Application.create();
|
final String group = conf.getValue("name", "");
|
||||||
T service = remote ? Sncp.createRemoteService("", serviceClass, null, null) : Sncp.createLocalService("", serviceClass, null, null, null);
|
String protocol = conf.getValue("protocol", Sncp.DEFAULT_PROTOCOL).toUpperCase();
|
||||||
application.init();
|
if (!"TCP".equalsIgnoreCase(protocol) && "UDP".equalsIgnoreCase(protocol)) {
|
||||||
application.factory.register(service);
|
throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol"));
|
||||||
new NodeSncpServer(application, null, new CountDownLatch(1), null).init(application.config);
|
|
||||||
application.factory.inject(service);
|
|
||||||
return service;
|
|
||||||
}
|
}
|
||||||
|
Set<InetSocketAddress> addrs = globalGroups.get(group);
|
||||||
private static Application create() throws IOException {
|
if (addrs == null) {
|
||||||
final String home = new File(System.getProperty(RESNAME_HOME, "")).getCanonicalPath();
|
addrs = new LinkedHashSet<>();
|
||||||
System.setProperty(RESNAME_HOME, home);
|
globalGroups.put(group, addrs);
|
||||||
File appfile = new File(home, "conf/application.xml");
|
|
||||||
//System.setProperty(DataConnection.PERSIST_FILEPATH, appfile.getCanonicalPath());
|
|
||||||
return new Application(load(new FileInputStream(appfile)));
|
|
||||||
}
|
}
|
||||||
|
for (AnyValue node : conf.getAnyValues("node")) {
|
||||||
public static void main(String[] args) throws Exception {
|
final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"));
|
||||||
//运行主程序
|
addrs.add(addr);
|
||||||
final Application application = Application.create();
|
String oldgroup = globalNodes.get(addr);
|
||||||
if (System.getProperty("SHUTDOWN") != null) {
|
if (oldgroup != null) throw new RuntimeException(addr + " had one more group " + (globalNodes.get(addr)));
|
||||||
application.sendShutDown();
|
globalNodes.put(addr, group);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
application.init();
|
|
||||||
application.startSelfServer();
|
|
||||||
try {
|
|
||||||
application.start();
|
|
||||||
} catch (Exception e) {
|
|
||||||
application.logger.log(Level.SEVERE, "Application start error", e);
|
|
||||||
System.exit(0);
|
|
||||||
}
|
}
|
||||||
System.exit(0);
|
}
|
||||||
|
//------------------------------------------------------------------------
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startSelfServer() throws Exception {
|
private void startSelfServer() throws Exception {
|
||||||
@@ -317,7 +335,7 @@ public final class Application {
|
|||||||
channel.send(buffer, address);
|
channel.send(buffer, address);
|
||||||
long e = System.currentTimeMillis() - s;
|
long e = System.currentTimeMillis() - s;
|
||||||
logger.info(application.getClass().getSimpleName() + " shutdown in " + e + " ms");
|
logger.info(application.getClass().getSimpleName() + " shutdown in " + e + " ms");
|
||||||
application.serverscdl.countDown();
|
application.serversLatch.countDown();
|
||||||
System.exit(0);
|
System.exit(0);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.log(Level.INFO, "SHUTDOWN FAIL", ex);
|
logger.log(Level.INFO, "SHUTDOWN FAIL", ex);
|
||||||
@@ -357,7 +375,6 @@ public final class Application {
|
|||||||
|
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
final AnyValue[] entrys = config.getAnyValues("server");
|
final AnyValue[] entrys = config.getAnyValues("server");
|
||||||
this.serverscdl = new CountDownLatch(entrys.length + 1);
|
|
||||||
CountDownLatch timecd = new CountDownLatch(entrys.length);
|
CountDownLatch timecd = new CountDownLatch(entrys.length);
|
||||||
final List<AnyValue> sncps = new ArrayList<>();
|
final List<AnyValue> sncps = new ArrayList<>();
|
||||||
final List<AnyValue> others = new ArrayList<>();
|
final List<AnyValue> others = new ArrayList<>();
|
||||||
@@ -368,20 +385,21 @@ public final class Application {
|
|||||||
others.add(entry);
|
others.add(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (AnyValue sncpconf : sncps) {
|
factory.register(RESNAME_SNCP_NODES, new TypeToken<Map<InetSocketAddress, String>>() {
|
||||||
String host = sncpconf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
|
}.getType(), globalNodes);
|
||||||
InetSocketAddress addr = new InetSocketAddress(host.isEmpty() ? this.localAddress.getHostAddress() : host, sncpconf.getIntValue("port", 80));
|
factory.register(RESNAME_SNCP_NODES, new TypeToken<HashMap<InetSocketAddress, String>>() {
|
||||||
String oldgroup = addrGroups.get(addr);
|
}.getType(), globalNodes);
|
||||||
if (oldgroup != null && !((sncpconf.getValue("group", "") + ";").contains(oldgroup + ";"))) throw new RuntimeException(addr + " has one more group " + (addrGroups.get(addr)));
|
|
||||||
if (oldgroup == null) addrGroups.put(addr, "");
|
factory.register(RESNAME_SNCP_NODES, new TypeToken<Map<String, Set<InetSocketAddress>>>() {
|
||||||
}
|
}.getType(), globalGroups);
|
||||||
factory.register(RESNAME_SNCP_NODES, NODES1TYPE, new HashMap<>(addrGroups));
|
factory.register(RESNAME_SNCP_NODES, new TypeToken<HashMap<String, Set<InetSocketAddress>>>() {
|
||||||
factory.register(RESNAME_SNCP_NODES, NODES2TYPE, new HashMap<>(addrGroups));
|
}.getType(), globalGroups);
|
||||||
runServers(timecd, sncps); //确保sncp都启动后再启动其他协议
|
|
||||||
|
runServers(timecd, sncps); //必须确保sncp都启动后再启动其他协议
|
||||||
runServers(timecd, others);
|
runServers(timecd, others);
|
||||||
timecd.await();
|
timecd.await();
|
||||||
logger.info(this.getClass().getSimpleName() + " started in " + (System.currentTimeMillis() - startTime) + " ms");
|
logger.info(this.getClass().getSimpleName() + " started in " + (System.currentTimeMillis() - startTime) + " ms");
|
||||||
this.serverscdl.await();
|
this.serversLatch.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -392,7 +410,7 @@ public final class Application {
|
|||||||
Thread thread = new Thread() {
|
Thread thread = new Thread() {
|
||||||
{
|
{
|
||||||
String host = serconf.getValue("host", "").replace("0.0.0.0", "[0]");
|
String host = serconf.getValue("host", "").replace("0.0.0.0", "[0]");
|
||||||
setName(serconf.getValue("protocol", "Server").toUpperCase() + "-" + host + ":" + serconf.getIntValue("port", 80) + "-Thread");
|
setName(serconf.getValue("protocol", "Server").toUpperCase() + "-" + host + ":" + serconf.getIntValue("port") + "-Thread");
|
||||||
this.setDaemon(true);
|
this.setDaemon(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -409,12 +427,10 @@ public final class Application {
|
|||||||
protocol = protocol.substring(0, pos);
|
protocol = protocol.substring(0, pos);
|
||||||
}
|
}
|
||||||
NodeServer server = null;
|
NodeServer server = null;
|
||||||
String host = serconf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
|
|
||||||
InetSocketAddress addr = new InetSocketAddress(host.isEmpty() ? localAddress.getHostAddress() : host, serconf.getIntValue("port", 80));
|
|
||||||
if ("SNCP".equalsIgnoreCase(protocol)) {
|
if ("SNCP".equalsIgnoreCase(protocol)) {
|
||||||
server = new NodeSncpServer(Application.this, addr, servicecdl, new SncpServer(startTime, subprotocol, addr, watch));
|
server = new NodeSncpServer(Application.this, servicecdl, new SncpServer(startTime, subprotocol, watch));
|
||||||
} else if ("HTTP".equalsIgnoreCase(protocol)) {
|
} else if ("HTTP".equalsIgnoreCase(protocol)) {
|
||||||
server = new NodeHttpServer(Application.this, addr, servicecdl, new HttpServer(startTime, watch));
|
server = new NodeHttpServer(Application.this, servicecdl, new HttpServer(startTime, watch));
|
||||||
}
|
}
|
||||||
if (server == null) {
|
if (server == null) {
|
||||||
logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol"));
|
logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol"));
|
||||||
@@ -427,7 +443,7 @@ public final class Application {
|
|||||||
sercdl.countDown();
|
sercdl.countDown();
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.log(Level.WARNING, serconf + " runServers error", ex);
|
logger.log(Level.WARNING, serconf + " runServers error", ex);
|
||||||
serverscdl.countDown();
|
Application.this.serversLatch.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -436,61 +452,49 @@ public final class Application {
|
|||||||
sercdl.await();
|
sercdl.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initResources() throws Exception {
|
public static <T extends Service> T singleton(Class<T> serviceClass) throws Exception {
|
||||||
//-------------------------------------------------------------------------
|
return singleton(serviceClass, false);
|
||||||
final AnyValue resources = config.getAnyValue("resources");
|
|
||||||
if (resources != null) {
|
|
||||||
//------------------------------------------------------------------------
|
|
||||||
AnyValue datacachelistenerConf = resources.getAnyValue("datacachelistener");
|
|
||||||
if (datacachelistenerConf != null) {
|
|
||||||
String val = datacachelistenerConf.getValue("service", "");
|
|
||||||
if (!val.isEmpty()) {
|
|
||||||
if ("none".equalsIgnoreCase(val)) {
|
|
||||||
this.dataCacheListenerClass = null;
|
|
||||||
} else {
|
|
||||||
Class clazz = Class.forName(val);
|
|
||||||
if (!DataCacheListener.class.isAssignableFrom(clazz) || !Service.class.isAssignableFrom(clazz)) {
|
|
||||||
throw new RuntimeException("datacachelistener service (" + val + ") is illegal");
|
|
||||||
}
|
}
|
||||||
this.dataCacheListenerClass = clazz;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//------------------------------------------------------------------------
|
|
||||||
AnyValue websocketnodeConf = resources.getAnyValue("websocketnode");
|
|
||||||
if (websocketnodeConf != null) {
|
|
||||||
String val = websocketnodeConf.getValue("service", "");
|
|
||||||
if (!val.isEmpty()) {
|
|
||||||
if ("none".equalsIgnoreCase(val)) {
|
|
||||||
this.webSocketNodeClass = null;
|
|
||||||
} else {
|
|
||||||
Class clazz = Class.forName(val);
|
|
||||||
if (!WebSocketNode.class.isAssignableFrom(clazz) || !Service.class.isAssignableFrom(clazz)) {
|
|
||||||
throw new RuntimeException("websocketnode service (" + val + ") is illegal");
|
|
||||||
}
|
|
||||||
this.webSocketNodeClass = clazz;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//------------------------------------------------------------------------
|
|
||||||
final Map<String, Set<String>> groups = new HashMap<>();
|
|
||||||
|
|
||||||
for (AnyValue conf : resources.getAnyValues("group")) {
|
public static <T extends Service> T singleton(Class<T> serviceClass, boolean remote) throws Exception {
|
||||||
final String group = conf.getValue("name", "");
|
final Application application = Application.create();
|
||||||
String protocol = conf.getValue("protocol", Sncp.DEFAULT_PROTOCOL).toUpperCase();
|
T service = remote ? Sncp.createRemoteService("", serviceClass, null, null) : Sncp.createLocalService("", serviceClass, null, null, null);
|
||||||
if (!"TCP".equalsIgnoreCase(protocol) && !Sncp.DEFAULT_PROTOCOL.equalsIgnoreCase(protocol)) {
|
application.init();
|
||||||
throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol"));
|
application.factory.register(service);
|
||||||
|
new NodeSncpServer(application, new CountDownLatch(1), null).init(application.config);
|
||||||
|
application.factory.inject(service);
|
||||||
|
return service;
|
||||||
}
|
}
|
||||||
List<InetSocketAddress> addrs = new ArrayList<>();
|
|
||||||
for (AnyValue node : conf.getAnyValues("node")) {
|
private static Application create() throws IOException {
|
||||||
InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"));
|
final String home = new File(System.getProperty(RESNAME_HOME, "")).getCanonicalPath();
|
||||||
if (addrGroups.containsKey(addr)) throw new RuntimeException(addr + " had one more group " + (addrGroups.get(addr)));
|
System.setProperty(RESNAME_HOME, home);
|
||||||
addrGroups.put(addr, group);
|
File appfile = new File(home, "conf/application.xml");
|
||||||
addrs.add(addr);
|
//System.setProperty(DataConnection.PERSIST_FILEPATH, appfile.getCanonicalPath());
|
||||||
|
return new Application(load(new FileInputStream(appfile)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
//运行主程序
|
||||||
|
final Application application = Application.create();
|
||||||
|
if (System.getProperty("SHUTDOWN") != null) {
|
||||||
|
application.sendShutDown();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
application.init();
|
||||||
|
application.startSelfServer();
|
||||||
|
try {
|
||||||
|
application.start();
|
||||||
|
} catch (Exception e) {
|
||||||
|
application.logger.log(Level.SEVERE, "Application start error", e);
|
||||||
|
System.exit(0);
|
||||||
}
|
}
|
||||||
//------------------------------------------------------------------------
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<InetSocketAddress> findGlobalGroup(String group) {
|
||||||
|
Set<InetSocketAddress> set = globalGroups.get(group);
|
||||||
|
return set == null ? null : new LinkedHashSet<>(set);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void shutdown() throws Exception {
|
private void shutdown() throws Exception {
|
||||||
@@ -500,7 +504,7 @@ public final class Application {
|
|||||||
} catch (Exception t) {
|
} catch (Exception t) {
|
||||||
logger.log(Level.WARNING, " shutdown server(" + server.getSocketAddress() + ") error", t);
|
logger.log(Level.WARNING, " shutdown server(" + server.getSocketAddress() + ") error", t);
|
||||||
} finally {
|
} finally {
|
||||||
serverscdl.countDown();
|
serversLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
for (DataSource source : sources) {
|
for (DataSource source : sources) {
|
||||||
|
|||||||
@@ -10,12 +10,10 @@ import com.wentch.redkale.net.http.HttpServer;
|
|||||||
import com.wentch.redkale.net.http.HttpServlet;
|
import com.wentch.redkale.net.http.HttpServlet;
|
||||||
import com.wentch.redkale.util.AnyValue;
|
import com.wentch.redkale.util.AnyValue;
|
||||||
import com.wentch.redkale.boot.ClassFilter.FilterEntry;
|
import com.wentch.redkale.boot.ClassFilter.FilterEntry;
|
||||||
import com.wentch.redkale.net.*;
|
|
||||||
import com.wentch.redkale.net.http.*;
|
import com.wentch.redkale.net.http.*;
|
||||||
import com.wentch.redkale.net.sncp.*;
|
import com.wentch.redkale.net.sncp.*;
|
||||||
import com.wentch.redkale.service.*;
|
import com.wentch.redkale.service.*;
|
||||||
import com.wentch.redkale.util.*;
|
import com.wentch.redkale.util.*;
|
||||||
import java.io.*;
|
|
||||||
import java.lang.reflect.*;
|
import java.lang.reflect.*;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@@ -32,12 +30,9 @@ public final class NodeHttpServer extends NodeServer {
|
|||||||
|
|
||||||
private final HttpServer server;
|
private final HttpServer server;
|
||||||
|
|
||||||
private final File home;
|
public NodeHttpServer(Application application, CountDownLatch servicecdl, HttpServer server) {
|
||||||
|
|
||||||
public NodeHttpServer(Application application, InetSocketAddress addr, CountDownLatch servicecdl, HttpServer server) {
|
|
||||||
super(application, application.factory.createChild(), servicecdl, server);
|
super(application, application.factory.createChild(), servicecdl, server);
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.home = application.getHome();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -50,7 +45,7 @@ public final class NodeHttpServer extends NodeServer {
|
|||||||
ClassFilter<HttpServlet> httpFilter = createClassFilter(null, config, WebServlet.class, HttpServlet.class, null, "servlets", "servlet");
|
ClassFilter<HttpServlet> httpFilter = createClassFilter(null, config, WebServlet.class, HttpServlet.class, null, "servlets", "servlet");
|
||||||
ClassFilter<Service> serviceFilter = createServiceClassFilter(config);
|
ClassFilter<Service> serviceFilter = createServiceClassFilter(config);
|
||||||
long s = System.currentTimeMillis();
|
long s = System.currentTimeMillis();
|
||||||
ClassFilter.Loader.load(home, serviceFilter, httpFilter);
|
ClassFilter.Loader.load(application.getHome(), serviceFilter, httpFilter);
|
||||||
long e = System.currentTimeMillis() - s;
|
long e = System.currentTimeMillis() - s;
|
||||||
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
|
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
|
||||||
loadService(serviceFilter); //必须在servlet之前
|
loadService(serviceFilter); //必须在servlet之前
|
||||||
@@ -59,31 +54,17 @@ public final class NodeHttpServer extends NodeServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void initWebSocketService() {
|
private void initWebSocketService() {
|
||||||
String defgroup = servconf.getValue("group", ""); //Server节点获取group信息
|
NodeSncpServer sncpServer0 = null;
|
||||||
NodeSncpServer firstnss = null;
|
|
||||||
NodeSncpServer sncpServer = null;
|
|
||||||
for (NodeServer ns : application.servers) {
|
for (NodeServer ns : application.servers) {
|
||||||
if (!(ns instanceof NodeSncpServer)) continue;
|
if (!ns.isSNCP()) continue;
|
||||||
final NodeSncpServer nss = (NodeSncpServer) ns;
|
if (sncpServer0 == null) sncpServer0 = (NodeSncpServer) ns;
|
||||||
if (firstnss == null) firstnss = nss;
|
if (ns.getNodeGroup().equals(getNodeGroup())) {
|
||||||
if (defgroup.equals(nss.nodeGroup)) {
|
sncpServer0 = (NodeSncpServer) ns;
|
||||||
sncpServer = nss;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (sncpServer == null) sncpServer = firstnss;
|
final NodeSncpServer sncpServer = sncpServer0;
|
||||||
if (defgroup.isEmpty() && sncpServer != null) {
|
|
||||||
defgroup = sncpServer.servconf.getValue("group", "");
|
|
||||||
}
|
|
||||||
final String localGroup = sncpServer == null ? this.nodeGroup : sncpServer.nodeGroup;
|
|
||||||
final InetSocketAddress localAddr = sncpServer == null ? this.servaddr : sncpServer.servaddr;
|
|
||||||
final List<Transport>[] transportses = parseTransport(defgroup, localGroup, localAddr);
|
|
||||||
final List<Transport> sameGroupTransports = transportses[0];
|
|
||||||
final List<Transport> diffGroupTransports = transportses[1];
|
|
||||||
//---------------------------------------------------------------------------------------------
|
|
||||||
final NodeSncpServer onesncpServer = sncpServer;
|
|
||||||
final ResourceFactory regFactory = application.factory;
|
final ResourceFactory regFactory = application.factory;
|
||||||
final String subprotocol = sncpServer == null ? Sncp.DEFAULT_PROTOCOL : sncpServer.getSncpServer().getProtocol();
|
|
||||||
factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, Field field) -> {
|
factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, Field field) -> {
|
||||||
try {
|
try {
|
||||||
Resource rs = field.getAnnotation(Resource.class);
|
Resource rs = field.getAnnotation(Resource.class);
|
||||||
@@ -96,21 +77,21 @@ public final class NodeHttpServer extends NodeServer {
|
|||||||
if (nodeService == null) {
|
if (nodeService == null) {
|
||||||
Class<? extends Service> sc = (Class<? extends Service>) application.webSocketNodeClass;
|
Class<? extends Service> sc = (Class<? extends Service>) application.webSocketNodeClass;
|
||||||
nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc),
|
nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc),
|
||||||
localAddr, (sc == null ? null : sameGroupTransports), (sc == null ? null : diffGroupTransports));
|
getNodeAddress(), (sc == null ? null : nodeSameGroupTransports), (sc == null ? null : nodeDiffGroupTransports));
|
||||||
regFactory.register(rcname, WebSocketNode.class, nodeService);
|
regFactory.register(rcname, WebSocketNode.class, nodeService);
|
||||||
WebSocketNode wsn = (WebSocketNode) nodeService;
|
WebSocketNode wsn = (WebSocketNode) nodeService;
|
||||||
wsn.setLocalSncpAddress(localAddr);
|
wsn.setLocalSncpAddress(getNodeAddress());
|
||||||
final Set<InetSocketAddress> alladdrs = new HashSet<>();
|
final Set<InetSocketAddress> alladdrs = new HashSet<>();
|
||||||
application.addrGroups.forEach((k, v) -> alladdrs.add(k));
|
application.globalNodes.forEach((k, v) -> alladdrs.add(k));
|
||||||
alladdrs.remove(localAddr);
|
alladdrs.remove(getNodeAddress());
|
||||||
WebSocketNode remoteNode = (WebSocketNode) Sncp.createRemoteService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc),
|
WebSocketNode remoteNode = (WebSocketNode) Sncp.createRemoteService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc),
|
||||||
localAddr, (sc == null ? null : loadTransport(localGroup, subprotocol, alladdrs)));
|
getNodeAddress(), (sc == null ? null : loadTransport(getNodeGroup(), getNodeProtocol(), alladdrs)));
|
||||||
wsn.setRemoteWebSocketNode(remoteNode);
|
wsn.setRemoteWebSocketNode(remoteNode);
|
||||||
factory.inject(nodeService);
|
factory.inject(nodeService);
|
||||||
factory.inject(remoteNode);
|
factory.inject(remoteNode);
|
||||||
if (onesncpServer != null) {
|
if (sncpServer != null) {
|
||||||
ServiceWrapper wrapper = new ServiceWrapper((Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc), nodeService, localGroup, rcname, null);
|
ServiceWrapper wrapper = new ServiceWrapper((Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc), nodeService, getNodeGroup(), rcname, null);
|
||||||
onesncpServer.getSncpServer().addService(wrapper);
|
sncpServer.getSncpServer().addService(wrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
field.set(src, nodeService);
|
field.set(src, nodeService);
|
||||||
|
|||||||
@@ -5,7 +5,7 @@
|
|||||||
*/
|
*/
|
||||||
package com.wentch.redkale.boot;
|
package com.wentch.redkale.boot;
|
||||||
|
|
||||||
import static com.wentch.redkale.boot.Application.RESNAME_SNCP_GROUP;
|
import static com.wentch.redkale.boot.Application.*;
|
||||||
import com.wentch.redkale.net.sncp.ServiceWrapper;
|
import com.wentch.redkale.net.sncp.ServiceWrapper;
|
||||||
import com.wentch.redkale.net.Server;
|
import com.wentch.redkale.net.Server;
|
||||||
import com.wentch.redkale.net.sncp.Sncp;
|
import com.wentch.redkale.net.sncp.Sncp;
|
||||||
@@ -20,7 +20,7 @@ import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
|
|||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.lang.annotation.Annotation;
|
import java.lang.annotation.Annotation;
|
||||||
import java.lang.reflect.*;
|
import java.lang.reflect.*;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
@@ -45,14 +45,20 @@ public abstract class NodeServer {
|
|||||||
|
|
||||||
private final Server server;
|
private final Server server;
|
||||||
|
|
||||||
protected AnyValue servconf;
|
private InetSocketAddress nodeAddress; //HttpServer中的nodeAddress 为所属group对应的SncpServer, 为null表示没有分布式结构
|
||||||
|
|
||||||
protected InetSocketAddress servaddr;
|
private String nodeGroup = ""; //当前Server的SNCP协议的组
|
||||||
|
|
||||||
protected String nodeGroup = "";
|
private AnyValue nodeConf;
|
||||||
|
|
||||||
|
private String nodeProtocol = Sncp.DEFAULT_PROTOCOL;
|
||||||
|
|
||||||
protected Consumer<ServiceWrapper> consumer;
|
protected Consumer<ServiceWrapper> consumer;
|
||||||
|
|
||||||
|
protected final List<Transport> nodeSameGroupTransports = new ArrayList<>();
|
||||||
|
|
||||||
|
protected final List<Transport> nodeDiffGroupTransports = new ArrayList<>();
|
||||||
|
|
||||||
protected final Set<ServiceWrapper> localServices = new LinkedHashSet<>();
|
protected final Set<ServiceWrapper> localServices = new LinkedHashSet<>();
|
||||||
|
|
||||||
protected final Set<ServiceWrapper> remoteServices = new LinkedHashSet<>();
|
protected final Set<ServiceWrapper> remoteServices = new LinkedHashSet<>();
|
||||||
@@ -68,7 +74,36 @@ public abstract class NodeServer {
|
|||||||
protected abstract void prepare(final AnyValue config) throws Exception;
|
protected abstract void prepare(final AnyValue config) throws Exception;
|
||||||
|
|
||||||
public void init(AnyValue config) throws Exception {
|
public void init(AnyValue config) throws Exception {
|
||||||
this.servconf = config == null ? new AnyValue.DefaultAnyValue() : config;
|
this.nodeConf = config == null ? AnyValue.create() : config;
|
||||||
|
if (isSNCP()) { // SNCP协议
|
||||||
|
String host = this.nodeConf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
|
||||||
|
this.nodeAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port"));
|
||||||
|
this.nodeGroup = application.globalNodes.getOrDefault(this.nodeAddress, "");
|
||||||
|
if (server != null) this.nodeProtocol = server.getProtocol();
|
||||||
|
} else { // HTTP协议
|
||||||
|
String mbgroup = this.nodeConf.getValue("group", "");
|
||||||
|
NodeServer sncpServer = null; //有匹配的就取匹配的, 没有且SNCP只有一个,则取此SNCP。
|
||||||
|
for (NodeServer ns : application.servers) {
|
||||||
|
if (!ns.isSNCP()) continue;
|
||||||
|
if (sncpServer == null) sncpServer = ns;
|
||||||
|
if (ns.getNodeGroup().equals(mbgroup)) {
|
||||||
|
sncpServer = ns;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sncpServer != null) {
|
||||||
|
this.nodeAddress = sncpServer.getNodeAddress();
|
||||||
|
this.nodeGroup = sncpServer.getNodeGroup();
|
||||||
|
this.nodeProtocol = sncpServer.getNodeProtocol();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this.nodeAddress != null) { // 无分布式结构下 HTTP协议的nodeAddress 为 null
|
||||||
|
this.factory.register(RESNAME_SNCP_NODE, SocketAddress.class, this.nodeAddress);
|
||||||
|
this.factory.register(RESNAME_SNCP_NODE, InetSocketAddress.class, this.nodeAddress);
|
||||||
|
this.factory.register(RESNAME_SNCP_NODE, String.class, this.nodeAddress.getAddress().getHostAddress());
|
||||||
|
this.factory.register(RESNAME_SNCP_GROUP, this.nodeGroup);
|
||||||
|
}
|
||||||
|
{
|
||||||
//设置root文件夹
|
//设置root文件夹
|
||||||
String webroot = config.getValue("root", "root");
|
String webroot = config.getValue("root", "root");
|
||||||
File myroot = new File(webroot);
|
File myroot = new File(webroot);
|
||||||
@@ -78,17 +113,16 @@ public abstract class NodeServer {
|
|||||||
final String homepath = myroot.getCanonicalPath();
|
final String homepath = myroot.getCanonicalPath();
|
||||||
Server.loadLib(logger, config.getValue("lib", "") + ";" + homepath + "/lib/*;" + homepath + "/classes");
|
Server.loadLib(logger, config.getValue("lib", "") + ";" + homepath + "/lib/*;" + homepath + "/classes");
|
||||||
if (server != null) server.init(config);
|
if (server != null) server.init(config);
|
||||||
|
}
|
||||||
initResource();
|
initResource();
|
||||||
prepare(config);
|
prepare(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initResource() {
|
private void initResource() {
|
||||||
final String defgroup = servconf.getValue("group", ""); //Server节点获取group信息
|
final List<Transport>[] transportses = parseTransport(this.nodeConf.getValue("group", "").split(";"));
|
||||||
final List<Transport>[] transportses = parseTransport(defgroup, this.nodeGroup, this.servaddr);
|
this.nodeSameGroupTransports.addAll(transportses[0]);
|
||||||
final List<Transport> sameGroupTransports = transportses[0];
|
this.nodeDiffGroupTransports.addAll(transportses[1]);
|
||||||
final List<Transport> diffGroupTransports = transportses[1];
|
|
||||||
|
|
||||||
this.factory.register(RESNAME_SNCP_GROUP, !defgroup.isEmpty() ? defgroup : (this.nodeGroup == null ? "" : this.nodeGroup));
|
|
||||||
//---------------------------------------------------------------------------------------------
|
//---------------------------------------------------------------------------------------------
|
||||||
final ResourceFactory regFactory = application.factory;
|
final ResourceFactory regFactory = application.factory;
|
||||||
factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field) -> {
|
factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field) -> {
|
||||||
@@ -101,7 +135,7 @@ public abstract class NodeServer {
|
|||||||
regFactory.register(rs.name(), DataSource.class, source);
|
regFactory.register(rs.name(), DataSource.class, source);
|
||||||
Class<? extends Service> sc = (Class<? extends Service>) application.dataCacheListenerClass;
|
Class<? extends Service> sc = (Class<? extends Service>) application.dataCacheListenerClass;
|
||||||
if (sc != null) {
|
if (sc != null) {
|
||||||
Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.servaddr, sameGroupTransports, diffGroupTransports);
|
Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.nodeAddress, nodeSameGroupTransports, nodeDiffGroupTransports);
|
||||||
regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService);
|
regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService);
|
||||||
ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, nodeGroup, rs.name(), null);
|
ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, nodeGroup, rs.name(), null);
|
||||||
localServices.add(wrapper);
|
localServices.add(wrapper);
|
||||||
@@ -116,41 +150,42 @@ public abstract class NodeServer {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<Transport>[] parseTransport(final String group, final String localGroup, final InetSocketAddress addr) {
|
protected List<Transport>[] parseTransport(final String[] groups) {
|
||||||
final Set<InetSocketAddress> sameGroupAddrs = new LinkedHashSet<>();
|
final Set<InetSocketAddress> sameGroupAddrs = application.findGlobalGroup(this.nodeGroup);
|
||||||
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
|
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
|
||||||
for (String str : group.split(";")) {
|
for (String groupitem : groups) {
|
||||||
application.addrGroups.forEach((k, v) -> {
|
final Set<InetSocketAddress> addrs = application.findGlobalGroup(groupitem);
|
||||||
if (v.equals(str)) {
|
if (addrs == null || groupitem.equals(this.nodeGroup)) continue;
|
||||||
if (v.equals(localGroup)) {
|
diffGroupAddrs.put(groupitem, addrs);
|
||||||
sameGroupAddrs.add(k);
|
|
||||||
} else {
|
|
||||||
Set<InetSocketAddress> set = diffGroupAddrs.get(v);
|
|
||||||
if (set == null) {
|
|
||||||
set = new LinkedHashSet<>();
|
|
||||||
diffGroupAddrs.put(v, set);
|
|
||||||
}
|
}
|
||||||
set.add(k);
|
final List<Transport> sameGroupTransports0 = new ArrayList<>();
|
||||||
}
|
if (sameGroupAddrs != null) {
|
||||||
}
|
sameGroupAddrs.remove(this.nodeAddress);
|
||||||
});
|
|
||||||
}
|
|
||||||
sameGroupAddrs.remove(addr);
|
|
||||||
final List<Transport> sameGroupTransports = new ArrayList<>();
|
|
||||||
for (InetSocketAddress iaddr : sameGroupAddrs) {
|
for (InetSocketAddress iaddr : sameGroupAddrs) {
|
||||||
Set<InetSocketAddress> tset = new HashSet<>();
|
sameGroupTransports0.add(loadTransport(this.nodeGroup, getNodeProtocol(), iaddr));
|
||||||
tset.add(iaddr);
|
|
||||||
sameGroupTransports.add(loadTransport(localGroup, server.getProtocol(), tset));
|
|
||||||
}
|
}
|
||||||
final List<Transport> diffGroupTransports = new ArrayList<>();
|
}
|
||||||
diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v)));
|
final List<Transport> diffGroupTransports0 = new ArrayList<>();
|
||||||
return new List[]{sameGroupTransports, diffGroupTransports};
|
diffGroupAddrs.forEach((k, v) -> diffGroupTransports0.add(loadTransport(k, getNodeProtocol(), v)));
|
||||||
|
return new List[]{sameGroupTransports0, diffGroupTransports0};
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract InetSocketAddress getSocketAddress();
|
public abstract InetSocketAddress getSocketAddress();
|
||||||
|
|
||||||
public abstract boolean isSNCP();
|
public abstract boolean isSNCP();
|
||||||
|
|
||||||
|
public InetSocketAddress getNodeAddress() {
|
||||||
|
return nodeAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNodeGroup() {
|
||||||
|
return nodeGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNodeProtocol() {
|
||||||
|
return nodeProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
server.start();
|
server.start();
|
||||||
}
|
}
|
||||||
@@ -162,13 +197,20 @@ public abstract class NodeServer {
|
|||||||
y.getService().destroy(y.getConf());
|
y.getService().destroy(y.getConf());
|
||||||
long e = System.currentTimeMillis() - s;
|
long e = System.currentTimeMillis() - s;
|
||||||
if (e > 2 && sb != null) {
|
if (e > 2 && sb != null) {
|
||||||
sb.append("LocalServices(").append(y.getType()).append(':').append(y.getName()).append(") destroy ").append(e).append("ms").append(LINE_SEPARATOR);
|
sb.append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") destroy ").append(e).append("ms").append(LINE_SEPARATOR);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
|
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Transport loadTransport(String group, String protocol, InetSocketAddress addr) {
|
||||||
|
if (addr == null) return null;
|
||||||
|
Set<InetSocketAddress> set = new HashSet<>();
|
||||||
|
set.add(addr);
|
||||||
|
return loadTransport(group, protocol, set);
|
||||||
|
}
|
||||||
|
|
||||||
protected Transport loadTransport(String group, String protocol, Set<InetSocketAddress> addrs) {
|
protected Transport loadTransport(String group, String protocol, Set<InetSocketAddress> addrs) {
|
||||||
Transport transport = null;
|
Transport transport = null;
|
||||||
if (!addrs.isEmpty()) {
|
if (!addrs.isEmpty()) {
|
||||||
@@ -193,7 +235,7 @@ public abstract class NodeServer {
|
|||||||
if (serviceFilter == null) return;
|
if (serviceFilter == null) return;
|
||||||
final String threadName = "[" + Thread.currentThread().getName() + "] ";
|
final String threadName = "[" + Thread.currentThread().getName() + "] ";
|
||||||
final Set<FilterEntry<Service>> entrys = serviceFilter.getFilterEntrys();
|
final Set<FilterEntry<Service>> entrys = serviceFilter.getFilterEntrys();
|
||||||
final String defgroup = servconf == null ? "" : servconf.getValue("group", ""); //Server节点获取group信息
|
final String defgroup = nodeConf == null ? "" : nodeConf.getValue("group", ""); //Server节点获取group信息
|
||||||
ResourceFactory regFactory = isSNCP() ? application.factory : factory;
|
ResourceFactory regFactory = isSNCP() ? application.factory : factory;
|
||||||
for (FilterEntry<Service> entry : entrys) { //service实现类
|
for (FilterEntry<Service> entry : entrys) { //service实现类
|
||||||
final Class<? extends Service> type = entry.getType();
|
final Class<? extends Service> type = entry.getType();
|
||||||
@@ -208,7 +250,7 @@ public abstract class NodeServer {
|
|||||||
final Set<InetSocketAddress> sameGroupAddrs = new LinkedHashSet<>();
|
final Set<InetSocketAddress> sameGroupAddrs = new LinkedHashSet<>();
|
||||||
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
|
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
|
||||||
for (String str : group.split(";")) {
|
for (String str : group.split(";")) {
|
||||||
application.addrGroups.forEach((k, v) -> {
|
application.globalNodes.forEach((k, v) -> {
|
||||||
if (v.equals(str)) {
|
if (v.equals(str)) {
|
||||||
if (v.equals(this.nodeGroup)) {
|
if (v.equals(this.nodeGroup)) {
|
||||||
sameGroupAddrs.add(k);
|
sameGroupAddrs.add(k);
|
||||||
@@ -223,21 +265,21 @@ public abstract class NodeServer {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
final boolean localable = sameGroupAddrs.contains(this.servaddr);
|
final boolean localable = sameGroupAddrs.contains(this.nodeAddress);
|
||||||
Service service;
|
Service service;
|
||||||
|
|
||||||
List<Transport> diffGroupTransports = new ArrayList<>();
|
List<Transport> diffGroupTransports = new ArrayList<>();
|
||||||
diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v)));
|
diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v)));
|
||||||
|
|
||||||
if (localable || (sameGroupAddrs.isEmpty() && diffGroupTransports.isEmpty())) {
|
if (localable || (sameGroupAddrs.isEmpty() && diffGroupTransports.isEmpty())) {
|
||||||
sameGroupAddrs.remove(this.servaddr);
|
sameGroupAddrs.remove(this.nodeAddress);
|
||||||
List<Transport> sameGroupTransports = new ArrayList<>();
|
List<Transport> sameGroupTransports = new ArrayList<>();
|
||||||
for (InetSocketAddress iaddr : sameGroupAddrs) {
|
for (InetSocketAddress iaddr : sameGroupAddrs) {
|
||||||
Set<InetSocketAddress> tset = new HashSet<>();
|
Set<InetSocketAddress> tset = new HashSet<>();
|
||||||
tset.add(iaddr);
|
tset.add(iaddr);
|
||||||
sameGroupTransports.add(loadTransport(this.nodeGroup, server.getProtocol(), tset));
|
sameGroupTransports.add(loadTransport(this.nodeGroup, server.getProtocol(), tset));
|
||||||
}
|
}
|
||||||
service = Sncp.createLocalService(entry.getName(), type, this.servaddr, sameGroupTransports, diffGroupTransports);
|
service = Sncp.createLocalService(entry.getName(), type, this.nodeAddress, sameGroupTransports, diffGroupTransports);
|
||||||
} else {
|
} else {
|
||||||
StringBuilder g = new StringBuilder(this.nodeGroup);
|
StringBuilder g = new StringBuilder(this.nodeGroup);
|
||||||
diffGroupAddrs.forEach((k, v) -> {
|
diffGroupAddrs.forEach((k, v) -> {
|
||||||
@@ -246,7 +288,7 @@ public abstract class NodeServer {
|
|||||||
sameGroupAddrs.addAll(v);
|
sameGroupAddrs.addAll(v);
|
||||||
});
|
});
|
||||||
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type + ":" + group);
|
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type + ":" + group);
|
||||||
service = Sncp.createRemoteService(entry.getName(), type, this.servaddr, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
|
service = Sncp.createRemoteService(entry.getName(), type, this.nodeAddress, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
|
||||||
}
|
}
|
||||||
ServiceWrapper wrapper = new ServiceWrapper(type, service, entry);
|
ServiceWrapper wrapper = new ServiceWrapper(type, service, entry);
|
||||||
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
|
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
|
||||||
|
|||||||
@@ -5,11 +5,9 @@
|
|||||||
*/
|
*/
|
||||||
package com.wentch.redkale.boot;
|
package com.wentch.redkale.boot;
|
||||||
|
|
||||||
import static com.wentch.redkale.boot.Application.*;
|
|
||||||
import com.wentch.redkale.net.sncp.*;
|
import com.wentch.redkale.net.sncp.*;
|
||||||
import com.wentch.redkale.util.AnyValue;
|
import com.wentch.redkale.util.AnyValue;
|
||||||
import com.wentch.redkale.service.Service;
|
import com.wentch.redkale.service.Service;
|
||||||
import java.io.*;
|
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
@@ -22,20 +20,10 @@ public final class NodeSncpServer extends NodeServer {
|
|||||||
|
|
||||||
private final SncpServer server;
|
private final SncpServer server;
|
||||||
|
|
||||||
private final File home;
|
public NodeSncpServer(Application application, CountDownLatch regcdl, SncpServer server) {
|
||||||
|
|
||||||
public NodeSncpServer(Application application, InetSocketAddress addr, CountDownLatch regcdl, SncpServer server) {
|
|
||||||
super(application, application.factory.createChild(), regcdl, server);
|
super(application, application.factory.createChild(), regcdl, server);
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.home = application.getHome();
|
|
||||||
this.servaddr = addr;
|
|
||||||
this.nodeGroup = application.addrGroups.getOrDefault(addr, "");
|
|
||||||
this.consumer = server == null ? null : x -> server.addService(x);
|
this.consumer = server == null ? null : x -> server.addService(x);
|
||||||
if (this.servaddr != null) {
|
|
||||||
this.factory.register(RESNAME_SNCP_ADDR, SocketAddress.class, this.servaddr);
|
|
||||||
this.factory.register(RESNAME_SNCP_ADDR, InetSocketAddress.class, this.servaddr);
|
|
||||||
this.factory.register(RESNAME_SNCP_ADDR, String.class, this.servaddr.getAddress().getHostAddress());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -47,7 +35,7 @@ public final class NodeSncpServer extends NodeServer {
|
|||||||
public void prepare(AnyValue config) throws Exception {
|
public void prepare(AnyValue config) throws Exception {
|
||||||
ClassFilter<Service> serviceFilter = createServiceClassFilter(config);
|
ClassFilter<Service> serviceFilter = createServiceClassFilter(config);
|
||||||
long s = System.currentTimeMillis();
|
long s = System.currentTimeMillis();
|
||||||
ClassFilter.Loader.load(home, serviceFilter);
|
ClassFilter.Loader.load(application.getHome(), serviceFilter);
|
||||||
long e = System.currentTimeMillis() - s;
|
long e = System.currentTimeMillis() - s;
|
||||||
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
|
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
|
||||||
loadService(serviceFilter); //必须在servlet之前
|
loadService(serviceFilter); //必须在servlet之前
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import com.wentch.redkale.convert.bson.*;
|
|||||||
import com.wentch.redkale.net.*;
|
import com.wentch.redkale.net.*;
|
||||||
import com.wentch.redkale.util.*;
|
import com.wentch.redkale.util.*;
|
||||||
import com.wentch.redkale.watch.*;
|
import com.wentch.redkale.watch.*;
|
||||||
import java.net.*;
|
|
||||||
import java.nio.*;
|
import java.nio.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.*;
|
||||||
@@ -21,27 +20,17 @@ import java.util.concurrent.atomic.*;
|
|||||||
*/
|
*/
|
||||||
public final class SncpServer extends Server {
|
public final class SncpServer extends Server {
|
||||||
|
|
||||||
protected InetSocketAddress nodeAddress;
|
|
||||||
|
|
||||||
public SncpServer(String protocol) {
|
public SncpServer(String protocol) {
|
||||||
this(System.currentTimeMillis(), protocol, null, null);
|
this(System.currentTimeMillis(), protocol, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SncpServer(long serverStartTime, String protocol, InetSocketAddress nodeAddress, final WatchFactory watch) {
|
public SncpServer(long serverStartTime, String protocol, final WatchFactory watch) {
|
||||||
super(serverStartTime, protocol, new SncpPrepareServlet(), watch);
|
super(serverStartTime, protocol, new SncpPrepareServlet(), watch);
|
||||||
this.nodeAddress = nodeAddress;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(AnyValue config) throws Exception {
|
public void init(AnyValue config) throws Exception {
|
||||||
super.init(config);
|
super.init(config);
|
||||||
if (this.nodeAddress == null) {
|
|
||||||
if ("0.0.0.0".equals(this.address.getHostString())) {
|
|
||||||
this.nodeAddress = new InetSocketAddress(Utility.localInetAddress().getHostAddress(), this.address.getPort());
|
|
||||||
} else {
|
|
||||||
this.nodeAddress = this.address;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addService(ServiceWrapper entry) {
|
public void addService(ServiceWrapper entry) {
|
||||||
@@ -52,16 +41,6 @@ public final class SncpServer extends Server {
|
|||||||
return ((SncpPrepareServlet) this.prepare).getSncpServlets();
|
return ((SncpPrepareServlet) this.prepare).getSncpServlets();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* 对外的IP地址
|
|
||||||
*
|
|
||||||
@return
|
|
||||||
*/
|
|
||||||
public InetSocketAddress getNodeAddress() {
|
|
||||||
return nodeAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected Context createContext() {
|
protected Context createContext() {
|
||||||
|
|||||||
@@ -232,6 +232,10 @@ public interface AnyValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static AnyValue create() {
|
||||||
|
return new DefaultAnyValue();
|
||||||
|
}
|
||||||
|
|
||||||
default String toString(int len) {
|
default String toString(int len) {
|
||||||
if (len < 0) len = 0;
|
if (len < 0) len = 0;
|
||||||
char[] chars = new char[len];
|
char[] chars = new char[len];
|
||||||
|
|||||||
Reference in New Issue
Block a user