This commit is contained in:
地平线
2015-08-14 16:37:39 +08:00
parent b150fbc9f7
commit 38fdee11ac
11 changed files with 309 additions and 452 deletions

View File

@@ -28,12 +28,6 @@
<!-- 所有服务所需的资源 -->
<resources>
<!-- 设置系统的 DataCacheListener 的Service实现值[none]表示不需要启动DataCacheListener同步 默认为系统自带实现类 -->
<datacachelistener service="none"/>
<!-- 设置系统的 WebSocketNode 的Service实现 值[none]表示不需要启动WebSocketNode同步默认为系统自带实现类 -->
<websocketnode service="xxxx"/>
<!--
一个组包含多个NODE 同一Service服务可以由多个进程提供这些进程称为一个GROUP且同一GROUP内的进程必须在同一机房或局域网内
name: 服务组ID长度不能超过11个字节. 默认为空字符串。

View File

@@ -39,31 +39,32 @@ import org.w3c.dom.*;
public final class Application {
//当前进程启动的时间, 类型: long
public static final String RESNAME_TIME = "APP_TIME";
public static final String RESNAME_APP_TIME = "APP_TIME";
//当前进程的根目录, 类型String
public static final String RESNAME_HOME = "APP_HOME";
//当前进程节点的name 类型String
public static final String RESNAME_NODE = "APP_NODE";
//当前进程节点的IP地址 类型InetAddress、String
public static final String RESNAME_ADDR = "APP_ADDR";
public static final String RESNAME_APP_HOME = "APP_HOME";
//application.xml 文件中resources节点的内容 类型: AnyValue
public static final String RESNAME_GRES = "APP_GRES";
public static final String RESNAME_APP_GRES = "APP_GRES";
//当前SNCP Server所属的组 类型: String
public static final String RESNAME_SNCP_GROUP = "SNCP_GROUP";
//当前进程节点的name 类型String
public static final String RESNAME_APP_NODE = "APP_NODE";
//当前Service所属的组 类型: Set<String>、String[]
public static final String RESNAME_SNCP_GROUPS = Sncp.RESNAME_SNCP_GROUPS; //SNCP_GROUPS
//当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String
public static final String RESNAME_SNCP_NODE = "SNCP_NODE";
//当前进程节点的IP地址 类型InetAddress、String
public static final String RESNAME_APP_ADDR = "APP_ADDR";
//当前SNCP Server的IP地址+端口集合 类型: Map<InetSocketAddress, String>、HashMap<InetSocketAddress, String>
public static final String RESNAME_SNCP_NODES = "SNCP_NODES";
public static final String RESNAME_APP_NODES = "APP_NODES";
//当前Service的IP地址+端口 类型: SocketAddress、InetSocketAddress、String
public static final String RESNAME_SERVER_ADDR = "SERVER_ADDR"; // SERVER_ADDR
//当前SNCP Server所属的组 类型: String
public static final String RESNAME_SERVER_GROUP = "SERVER_GROUP";
//当前Service所属的组 类型: Set<String>、String[]
public static final String RESNAME_SNCP_GROUPS = Sncp.RESNAME_SNCP_GROUPS; // SNCP_GROUPS
protected final ResourceFactory factory = ResourceFactory.root();
@@ -77,10 +78,6 @@ public final class Application {
protected final InetAddress localAddress;
protected Class<? extends DataCacheListener> dataCacheListenerClass = DataCacheListenerService.class;
protected Class<? extends WebSocketNode> webSocketNodeClass = WebSocketNodeService.class;
protected final List<DataSource> sources = new CopyOnWriteArrayList<>();
protected final List<NodeServer> servers = new CopyOnWriteArrayList<>();
@@ -99,20 +96,20 @@ public final class Application {
private Application(final AnyValue config) {
this.config = config;
final File root = new File(System.getProperty(RESNAME_HOME));
this.factory.register(RESNAME_TIME, long.class, this.startTime);
this.factory.register(RESNAME_HOME, Path.class, root.toPath());
this.factory.register(RESNAME_HOME, File.class, root);
final File root = new File(System.getProperty(RESNAME_APP_HOME));
this.factory.register(RESNAME_APP_TIME, long.class, this.startTime);
this.factory.register(RESNAME_APP_HOME, Path.class, root.toPath());
this.factory.register(RESNAME_APP_HOME, File.class, root);
try {
this.factory.register(RESNAME_HOME, root.getCanonicalPath());
this.factory.register(RESNAME_APP_HOME, root.getCanonicalPath());
this.home = root.getCanonicalFile();
} catch (IOException e) {
throw new RuntimeException(e);
}
String localaddr = config.getValue("address", "").trim();
this.localAddress = localaddr.isEmpty() ? Utility.localInetAddress() : new InetSocketAddress(localaddr, 0).getAddress();
Application.this.factory.register(RESNAME_ADDR, Application.this.localAddress.getHostAddress());
Application.this.factory.register(RESNAME_ADDR, InetAddress.class, Application.this.localAddress);
Application.this.factory.register(RESNAME_APP_ADDR, Application.this.localAddress.getHostAddress());
Application.this.factory.register(RESNAME_APP_ADDR, InetAddress.class, Application.this.localAddress);
{
StringBuilder sb = new StringBuilder();
byte[] bs = this.localAddress.getAddress();
@@ -123,8 +120,8 @@ public final class Application {
if (v2 <= 0xf) sb.append('0');
sb.append(Integer.toHexString(v2));
String node = sb.toString();
Application.this.factory.register(RESNAME_NODE, node);
System.setProperty(RESNAME_NODE, node);
Application.this.factory.register(RESNAME_APP_NODE, node);
System.setProperty(RESNAME_APP_NODE, node);
}
//以下是初始化日志配置
final File logconf = new File(root, "conf/logging.properties");
@@ -194,7 +191,7 @@ public final class Application {
File persist = new File(this.home, "conf/persistence.xml");
final String homepath = this.home.getCanonicalPath();
if (persist.isFile()) System.setProperty(DataDefaultSource.DATASOURCE_CONFPATH, persist.getCanonicalPath());
logger.log(Level.INFO, RESNAME_HOME + "=" + homepath + "\r\n" + RESNAME_ADDR + "=" + this.localAddress.getHostAddress());
logger.log(Level.INFO, RESNAME_APP_HOME + "=" + homepath + "\r\n" + RESNAME_APP_ADDR + "=" + this.localAddress.getHostAddress());
String lib = config.getValue("lib", "").trim().replace("${APP_HOME}", homepath);
lib = lib.isEmpty() ? (homepath + "/conf") : (lib + ";" + homepath + "/conf");
Server.loadLib(logger, lib);
@@ -207,7 +204,7 @@ public final class Application {
//------------------------------------------------------------------------
final AnyValue resources = config.getAnyValue("resources");
if (resources != null) {
factory.register(RESNAME_GRES, AnyValue.class, resources);
factory.register(RESNAME_APP_GRES, AnyValue.class, resources);
final AnyValue properties = resources.getAnyValue("properties");
if (properties != null) {
String dfloads = properties.getValue("load");
@@ -249,38 +246,6 @@ public final class Application {
final AnyValue resources = config.getAnyValue("resources");
if (resources != null) {
//------------------------------------------------------------------------
AnyValue datacachelistenerConf = resources.getAnyValue("datacachelistener");
if (datacachelistenerConf != null) {
String val = datacachelistenerConf.getValue("service", "");
if (!val.isEmpty()) {
if ("none".equalsIgnoreCase(val)) {
this.dataCacheListenerClass = null;
} else {
Class clazz = Class.forName(val);
if (!DataCacheListener.class.isAssignableFrom(clazz) || !Service.class.isAssignableFrom(clazz)) {
throw new RuntimeException("datacachelistener service (" + val + ") is illegal");
}
this.dataCacheListenerClass = clazz;
}
}
}
//------------------------------------------------------------------------
AnyValue websocketnodeConf = resources.getAnyValue("websocketnode");
if (websocketnodeConf != null) {
String val = websocketnodeConf.getValue("service", "");
if (!val.isEmpty()) {
if ("none".equalsIgnoreCase(val)) {
this.webSocketNodeClass = null;
} else {
Class clazz = Class.forName(val);
if (!WebSocketNode.class.isAssignableFrom(clazz) || !Service.class.isAssignableFrom(clazz)) {
throw new RuntimeException("websocketnode service (" + val + ") is illegal");
}
this.webSocketNodeClass = clazz;
}
}
}
//------------------------------------------------------------------------
for (AnyValue conf : resources.getAnyValues("group")) {
final String group = conf.getValue("name", "");
@@ -390,14 +355,14 @@ public final class Application {
}
if (!sncps.isEmpty() && globalNodes.isEmpty()) throw new RuntimeException("found SNCP Server node bug not found <group> node info.");
factory.register(RESNAME_SNCP_NODES, new TypeToken<Map<InetSocketAddress, String>>() {
factory.register(RESNAME_APP_NODES, new TypeToken<Map<InetSocketAddress, String>>() {
}.getType(), globalNodes);
factory.register(RESNAME_SNCP_NODES, new TypeToken<HashMap<InetSocketAddress, String>>() {
factory.register(RESNAME_APP_NODES, new TypeToken<HashMap<InetSocketAddress, String>>() {
}.getType(), globalNodes);
factory.register(RESNAME_SNCP_NODES, new TypeToken<Map<String, Set<InetSocketAddress>>>() {
factory.register(RESNAME_APP_NODES, new TypeToken<Map<String, Set<InetSocketAddress>>>() {
}.getType(), globalGroups);
factory.register(RESNAME_SNCP_NODES, new TypeToken<HashMap<String, Set<InetSocketAddress>>>() {
factory.register(RESNAME_APP_NODES, new TypeToken<HashMap<String, Set<InetSocketAddress>>>() {
}.getType(), globalGroups);
runServers(timecd, sncps); //必须确保sncp都启动后再启动其他协议
@@ -463,7 +428,8 @@ public final class Application {
public static <T extends Service> T singleton(Class<T> serviceClass, boolean remote) throws Exception {
final Application application = Application.create();
T service = remote ? Sncp.createRemoteService("", serviceClass, null, null) : Sncp.createLocalService("", serviceClass, null, new LinkedHashSet<>(), null, null);
T service = remote ? Sncp.createRemoteService("", serviceClass, null, new LinkedHashSet<>(), null)
: Sncp.createLocalService("", serviceClass, null, new LinkedHashSet<>(), null, null);
application.init();
application.factory.register(service);
new NodeSncpServer(application, new CountDownLatch(1), null).init(application.config);
@@ -472,8 +438,8 @@ public final class Application {
}
private static Application create() throws IOException {
final String home = new File(System.getProperty(RESNAME_HOME, "")).getCanonicalPath();
System.setProperty(RESNAME_HOME, home);
final String home = new File(System.getProperty(RESNAME_APP_HOME, "")).getCanonicalPath();
System.setProperty(RESNAME_APP_HOME, home);
File appfile = new File(home, "conf/application.xml");
//System.setProperty(DataConnection.PERSIST_FILEPATH, appfile.getCanonicalPath());
return new Application(load(new FileInputStream(appfile)));
@@ -537,7 +503,7 @@ public final class Application {
}
private static void load(final DefaultAnyValue any, final Node root) {
final String home = System.getProperty(RESNAME_HOME);
final String home = System.getProperty(RESNAME_APP_HOME);
NamedNodeMap nodes = root.getAttributes();
if (nodes == null) return;
for (int i = 0; i < nodes.getLength(); i++) {

View File

@@ -127,7 +127,7 @@ public final class ClassFilter<T> {
}
}
}
entrys.add(new FilterEntry(clazz, property));
entrys.add(new FilterEntry(clazz, autoscan, property));
} catch (Throwable cfe) {
}
}
@@ -260,11 +260,18 @@ public final class ClassFilter<T> {
private final AnyValue property;
private final boolean autoload;
public FilterEntry(Class<T> type, AnyValue property) {
this(type, false, property);
}
public FilterEntry(Class<T> type, final boolean autoload, AnyValue property) {
this.type = type;
String str = property == null ? null : property.getValue("groups");
if (str != null) groups.addAll(Arrays.asList(str.split(";")));
this.property = property;
this.autoload = autoload;
this.name = property == null ? "" : property.getValue("name", "");
}
@@ -302,6 +309,10 @@ public final class ClassFilter<T> {
return groups;
}
public boolean isAutoload() {
return autoload;
}
}
/**

View File

@@ -41,29 +41,21 @@ public final class NodeHttpServer extends NodeServer {
}
@Override
public void prepare(AnyValue config) throws Exception {
ClassFilter<HttpServlet> httpFilter = createClassFilter(null, config, WebServlet.class, HttpServlet.class, null, "servlets", "servlet");
ClassFilter<Service> serviceFilter = createServiceClassFilter(config);
public void prepare() throws Exception {
ClassFilter<HttpServlet> httpFilter = createClassFilter(null, WebServlet.class, HttpServlet.class, null, "servlets", "servlet");
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
ClassFilter.Loader.load(application.getHome(), serviceFilter, httpFilter);
long e = System.currentTimeMillis() - s;
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(serviceFilter); //必须在servlet之前
initWebSocketService();
if (server != null) loadHttpServlet(config.getAnyValue("servlets"), httpFilter);
if (server != null) loadHttpServlet(this.nodeConf.getAnyValue("servlets"), httpFilter);
}
private void initWebSocketService() {
NodeSncpServer sncpServer0 = null;
for (NodeServer ns : application.servers) {
if (!ns.isSNCP()) continue;
if (sncpServer0 == null) sncpServer0 = (NodeSncpServer) ns;
if (ns.getSncpGroup().equals(getSncpGroup())) {
sncpServer0 = (NodeSncpServer) ns;
break;
}
}
final NodeSncpServer sncpServer = sncpServer0;
final boolean fine = logger.isLoggable(Level.FINE);
final ResourceFactory regFactory = application.factory;
factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, Field field) -> {
try {
@@ -75,19 +67,19 @@ public final class NodeHttpServer extends NodeServer {
synchronized (regFactory) {
Service nodeService = (Service) rf.find(rcname, WebSocketNode.class);
if (nodeService == null) {
Class<? extends Service> sc = (Class<? extends Service>) application.webSocketNodeClass;
nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc),
getSncpAddress(), new LinkedHashSet<>(), (sc == null ? null : sncpSameGroupTransports), (sc == null ? null : sncpDiffGroupTransports));
nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) WebSocketNodeService.class,
getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(rcname, WebSocketNode.class, nodeService);
WebSocketNode wsn = (WebSocketNode) nodeService;
wsn.setLocalSncpAddress(getSncpAddress());
final Set<InetSocketAddress> alladdrs = new HashSet<>();
application.globalNodes.forEach((k, v) -> alladdrs.add(k));
alladdrs.remove(getSncpAddress());
final Class<? extends Service> serviceType = (sc == null ? WebSocketNodeService.class : sc);
factory.inject(nodeService);
if (sncpServer != null) {
ServiceWrapper wrapper = new ServiceWrapper(serviceType, nodeService, rcname, getSncpGroup(), new LinkedHashSet<>(), null);
logger.fine("[" + Thread.currentThread().getName() + "] Load " + nodeService);
if (getSncpAddress() != null) {
NodeSncpServer sncpServer = null;
for (NodeServer node : application.servers) {
if (node.isSNCP() && getSncpAddress().equals(node.getSncpAddress())) {
sncpServer = (NodeSncpServer) node;
}
}
ServiceWrapper wrapper = new ServiceWrapper(WebSocketNodeService.class, nodeService, rcname, getSncpGroup(), sncpDefaultGroups, null);
sncpServer.getSncpServer().addService(wrapper);
}
}

View File

@@ -6,15 +6,15 @@
package com.wentch.redkale.boot;
import static com.wentch.redkale.boot.Application.*;
import com.wentch.redkale.boot.ClassFilter.FilterEntry;
import com.wentch.redkale.net.sncp.ServiceWrapper;
import com.wentch.redkale.net.Server;
import com.wentch.redkale.net.sncp.Sncp;
import com.wentch.redkale.service.Service;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.util.Ignore;
import com.wentch.redkale.boot.ClassFilter.FilterEntry;
import com.wentch.redkale.net.*;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.service.*;
import com.wentch.redkale.source.*;
import com.wentch.redkale.util.*;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
@@ -38,6 +38,8 @@ public abstract class NodeServer {
protected final Logger logger;
protected final boolean fine;
protected final Application application;
protected final ResourceFactory factory;
@@ -46,23 +48,25 @@ public abstract class NodeServer {
private final Server server;
private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构
private String sncpGroup = null; //当前Server的SNCP协议的组
private AnyValue nodeConf;
private String nodeProtocol = Sncp.DEFAULT_PROTOCOL;
private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构
protected Consumer<ServiceWrapper> consumer;
protected AnyValue nodeConf;
protected final HashSet<String> sncpDefaultGroups = new LinkedHashSet<>();
protected final List<Transport> sncpSameGroupTransports = new ArrayList<>();
protected final List<Transport> sncpDiffGroupTransports = new ArrayList<>();
protected final Set<ServiceWrapper> localServices = new LinkedHashSet<>();
protected final Set<ServiceWrapper> localServiceWrappers = new LinkedHashSet<>();
protected final Set<ServiceWrapper> remoteServices = new LinkedHashSet<>();
protected final Set<ServiceWrapper> remoteServiceWrappers = new LinkedHashSet<>();
public NodeServer(Application application, ResourceFactory factory, CountDownLatch servicecdl, Server server) {
this.application = application;
@@ -70,9 +74,10 @@ public abstract class NodeServer {
this.servicecdl = servicecdl;
this.server = server;
this.logger = Logger.getLogger(this.getClass().getSimpleName());
this.fine = logger.isLoggable(Level.FINE);
}
protected abstract void prepare(final AnyValue config) throws Exception;
protected abstract void prepare() throws Exception;
public void init(AnyValue config) throws Exception {
this.nodeConf = config == null ? AnyValue.create() : config;
@@ -83,35 +88,27 @@ public abstract class NodeServer {
if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info");
if (server != null) this.nodeProtocol = server.getProtocol();
}
if (this.sncpAddress != null) { // 无分布式结构下 HTTP协议的sncpAddress 为 null
this.factory.register(RESNAME_SNCP_NODE, SocketAddress.class, this.sncpAddress);
this.factory.register(RESNAME_SNCP_NODE, InetSocketAddress.class, this.sncpAddress);
this.factory.register(RESNAME_SNCP_NODE, String.class, this.sncpAddress.getAddress().getHostAddress());
this.factory.register(RESNAME_SNCP_GROUP, this.sncpGroup);
}
initGroup();
if (this.sncpAddress != null) this.factory.register(RESNAME_SERVER_ADDR, this.sncpAddress);
if (this.sncpGroup != null) this.factory.register(RESNAME_SERVER_GROUP, this.sncpGroup);
{
//设置root文件夹
String webroot = config.getValue("root", "root");
File myroot = new File(webroot);
if (!webroot.contains(":") && !webroot.startsWith("/")) {
myroot = new File(System.getProperty(Application.RESNAME_HOME), webroot);
myroot = new File(System.getProperty(Application.RESNAME_APP_HOME), webroot);
}
final String homepath = myroot.getCanonicalPath();
Server.loadLib(logger, config.getValue("lib", "") + ";" + homepath + "/lib/*;" + homepath + "/classes");
if (server != null) server.init(config);
}
initResource();
prepare(config);
prepare();
}
private void initResource() {
final List<Transport>[] transportses = parseTransport(this.nodeConf.getValue("group", "").split(";"));
this.sncpSameGroupTransports.addAll(transportses[0]);
this.sncpDiffGroupTransports.addAll(transportses[1]);
//---------------------------------------------------------------------------------------------
final ResourceFactory regFactory = application.factory;
final HashSet<String> defGroups = new LinkedHashSet<>();
factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field) -> {
try {
Resource rs = field.getAnnotation(Resource.class);
@@ -120,12 +117,11 @@ public abstract class NodeServer {
DataSource source = DataSourceFactory.create(rs.name());
application.sources.add(source);
regFactory.register(rs.name(), DataSource.class, source);
Class<? extends Service> sc = (Class<? extends Service>) application.dataCacheListenerClass;
if (sc != null) {
Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.sncpAddress, defGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
if (factory.find(rs.name(), DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(rs.name(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, rs.name(), sncpGroup, defGroups, null);
localServices.add(wrapper);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, rs.name(), sncpGroup, sncpDefaultGroups, null);
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
rf.inject(cacheListenerService);
}
@@ -137,7 +133,35 @@ public abstract class NodeServer {
});
}
protected List<Transport>[] parseTransport(final String[] groups) {
private void initGroup() {
final AnyValue[] services = this.nodeConf.getAnyValues("services");
final String[] groups = services.length < 1 ? new String[]{""} : services[0].getValue("groups", "").split(";");
this.sncpDefaultGroups.addAll(Arrays.asList(groups));
if (!isSNCP()) {
NodeSncpServer sncpServer = null;
for (NodeServer node : application.servers) {
if (!node.isSNCP()) continue;
if (!this.sncpDefaultGroups.contains(node.sncpGroup)) continue;
sncpServer = (NodeSncpServer) node;
break;
}
if (sncpServer == null && (groups.length == 1 && groups[0].isEmpty())) {
for (NodeServer node : application.servers) {
if (!node.isSNCP()) continue;
sncpServer = (NodeSncpServer) node;
break;
}
}
if (sncpServer != null) {
this.sncpAddress = sncpServer.getSncpAddress();
this.sncpGroup = sncpServer.getSncpGroup();
this.sncpDefaultGroups.clear();
this.sncpDefaultGroups.addAll(sncpServer.sncpDefaultGroups);
this.sncpSameGroupTransports.addAll(sncpServer.sncpSameGroupTransports);
this.sncpDiffGroupTransports.addAll(sncpServer.sncpDiffGroupTransports);
return;
}
}
final Set<InetSocketAddress> sameGroupAddrs = application.findGlobalGroup(this.sncpGroup);
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
for (String groupitem : groups) {
@@ -145,16 +169,13 @@ public abstract class NodeServer {
if (addrs == null || groupitem.equals(this.sncpGroup)) continue;
diffGroupAddrs.put(groupitem, addrs);
}
final List<Transport> sameGroupTransports0 = new ArrayList<>();
if (sameGroupAddrs != null) {
sameGroupAddrs.remove(this.sncpAddress);
for (InetSocketAddress iaddr : sameGroupAddrs) {
sameGroupTransports0.add(loadTransport(this.sncpGroup, getNodeProtocol(), iaddr));
sncpSameGroupTransports.add(loadTransport(this.sncpGroup, getNodeProtocol(), iaddr));
}
}
final List<Transport> diffGroupTransports0 = new ArrayList<>();
diffGroupAddrs.forEach((k, v) -> diffGroupTransports0.add(loadTransport(k, getNodeProtocol(), v)));
return new List[]{sameGroupTransports0, diffGroupTransports0};
diffGroupAddrs.forEach((k, v) -> sncpDiffGroupTransports.add(loadTransport(k, getNodeProtocol(), v)));
}
@SuppressWarnings("unchecked")
@@ -198,6 +219,7 @@ public abstract class NodeServer {
}
Service service = Sncp.createLocalService(entry.getName(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports);
wrapper = new ServiceWrapper(type, service, this.sncpGroup, entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load " + service);
} else {
sameGroupAddrs.remove(this.sncpAddress);
StringBuilder g = new StringBuilder();
@@ -207,8 +229,9 @@ public abstract class NodeServer {
sameGroupAddrs.addAll(v);
});
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")");
Service service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
Service service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
wrapper = new ServiceWrapper(type, service, "", entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load " + service);
}
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService());
@@ -222,9 +245,9 @@ public abstract class NodeServer {
regFactory.register(wrapper.getName(), WebSocketNode.class, wrapper.getService());
}
if (wrapper.isRemote()) {
remoteServices.add(wrapper);
remoteServiceWrappers.add(wrapper);
} else {
localServices.add(wrapper);
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
}
} else if (isSNCP()) {
@@ -236,14 +259,14 @@ public abstract class NodeServer {
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
//---------------- inject ----------------
new HashSet<>(localServices).forEach(y -> {
new HashSet<>(localServiceWrappers).forEach(y -> {
factory.inject(y.getService());
});
remoteServices.forEach(y -> {
remoteServiceWrappers.forEach(y -> {
factory.inject(y.getService());
});
//----------------- init -----------------
localServices.parallelStream().forEach(y -> {
localServiceWrappers.parallelStream().forEach(y -> {
long s = System.currentTimeMillis();
y.getService().init(y.getConf());
long e = System.currentTimeMillis() - s;
@@ -280,16 +303,16 @@ public abstract class NodeServer {
return transport;
}
protected ClassFilter<Service> createServiceClassFilter(final AnyValue config) {
return createClassFilter(this.sncpGroup, config, null, Service.class, Annotation.class, "services", "service");
protected ClassFilter<Service> createServiceClassFilter() {
return createClassFilter(this.sncpGroup, null, Service.class, Annotation.class, "services", "service");
}
protected static ClassFilter createClassFilter(final String localGroup, final AnyValue config, Class<? extends Annotation> ref,
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,
Class inter, Class<? extends Annotation> ref2, String properties, String property) {
ClassFilter cf = new ClassFilter(ref, inter, null);
if (properties == null && properties == null) return cf;
if (config == null) return cf;
AnyValue[] proplist = config.getAnyValues(properties);
if (this.nodeConf == null) return cf;
AnyValue[] proplist = this.nodeConf.getAnyValues(properties);
if (proplist == null || proplist.length < 1) return cf;
cf = null;
for (AnyValue list : proplist) {
@@ -343,7 +366,7 @@ public abstract class NodeServer {
public void shutdown() throws IOException {
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
localServices.forEach(y -> {
localServiceWrappers.forEach(y -> {
long s = System.currentTimeMillis();
y.getService().destroy(y.getConf());
long e = System.currentTimeMillis() - s;

View File

@@ -6,7 +6,6 @@
package com.wentch.redkale.boot;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.service.Service;
import java.net.*;
import java.util.concurrent.CountDownLatch;
@@ -32,8 +31,8 @@ public final class NodeSncpServer extends NodeServer {
}
@Override
public void prepare(AnyValue config) throws Exception {
ClassFilter<Service> serviceFilter = createServiceClassFilter(config);
public void prepare() throws Exception {
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
ClassFilter.Loader.load(application.getHome(), serviceFilter);
long e = System.currentTimeMillis() - s;

View File

@@ -95,6 +95,10 @@ public final class Transport {
return true;
}
public InetSocketAddress[] getRemoteAddress() {
return remoteAddres;
}
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name=" + name + ",protocol=" + protocol + ",remoteAddres=" + Arrays.toString(remoteAddres) + "}";

View File

@@ -12,6 +12,7 @@ import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
/**
*
@@ -31,6 +32,7 @@ public abstract class WebSocketNode {
protected final boolean finest = logger.isLoggable(Level.FINEST);
@Resource(name = "SERVER_ADDR")
protected InetSocketAddress localSncpAddress; //为SncpServer的服务address
@SncpRemote
@@ -106,10 +108,6 @@ public abstract class WebSocketNode {
}
}
public final void setLocalSncpAddress(InetSocketAddress localSncpAddress) {
this.localSncpAddress = localSncpAddress;
}
public final void putWebSocketEngine(WebSocketEngine engine) {
engines.put(engine.getEngineid(), engine);
}

View File

@@ -25,6 +25,9 @@ import jdk.internal.org.objectweb.asm.Type;
*/
public abstract class Sncp {
//当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String
public static final String RESNAME_SNCP_ADDR = "SNCP_ADDR";
//当前Service所属的组 类型: Set<String>、String[]
public static final String RESNAME_SNCP_GROUPS = "SNCP_GROUPS";
@@ -160,12 +163,19 @@ public abstract class Sncp {
*
* private SncpClient _client;
*
* private String _selfstring;
*
* @Override
* public final String name() {
* return "";
* }
*
* @Override
* public String toString() {
* return _selfstring == null ? super.toString() : _selfstring;
* }
*
* @Override
* public String updateSomeThing(String id){
* return _updateSomeThing(true, true, id);
* }
@@ -255,6 +265,10 @@ public abstract class Sncp {
fv.visitEnd();
}
}
{
fv = cw.visitField(ACC_PRIVATE, "_selfstring", "Ljava/lang/String;", null, null);
fv.visitEnd();
}
{ //构造函数
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
//mv.setDebug(true);
@@ -271,6 +285,24 @@ public abstract class Sncp {
mv.visitMaxs(1, 1);
mv.visitEnd();
}
{ // toString()
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "_selfstring", "Ljava/lang/String;");
Label l1 = new Label();
mv.visitJumpInsn(IFNONNULL, l1);
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "toString", "()Ljava/lang/String;", false);
Label l2 = new Label();
mv.visitJumpInsn(GOTO, l2);
mv.visitLabel(l1);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "_selfstring", "Ljava/lang/String;");
mv.visitLabel(l2);
mv.visitInsn(ARETURN);
mv.visitMaxs(1, 1);
mv.visitEnd();
}
int i = - 1;
for (final Method method : methods) {
final MultiRun mrun = method.getAnnotation(MultiRun.class);
@@ -592,33 +624,69 @@ public abstract class Sncp {
if (!list.isEmpty()) remoteTransport = new Transport(list.get(0), clientAddress, list);
}
if (field.getType().isAssignableFrom(newClazz) && remoteTransport != null) {
field.set(rs, createRemoteService(name, serviceClass, clientAddress, remoteTransport));
field.set(rs, createRemoteService(name, serviceClass, clientAddress, groups, remoteTransport));
}
continue;
}
Resource res = field.getAnnotation(Resource.class);
if (res == null || !res.name().equals(RESNAME_SNCP_GROUPS)) continue;
if (res == null) continue;
field.setAccessible(true);
if (groups == null) groups = new LinkedHashSet<>();
if (groupArray == null) groupArray = groups.toArray(new String[groups.size()]);
if (field.getGenericType().equals(GROUPS_TYPE1)) {
field.set(rs, groups);
} else if (field.getGenericType().equals(GROUPS_TYPE2)) {
field.set(rs, groupArray);
if (res.name().equals(RESNAME_SNCP_GROUPS)) {
if (groups == null) groups = new LinkedHashSet<>();
if (groupArray == null) groupArray = groups.toArray(new String[groups.size()]);
if (field.getGenericType().equals(GROUPS_TYPE1)) {
field.set(rs, groups);
} else if (field.getGenericType().equals(GROUPS_TYPE2)) {
field.set(rs, groupArray);
}
} else if (res.name().endsWith(RESNAME_SNCP_ADDR)) {
if (field.getType() == String.class) {
field.set(rs, clientAddress == null ? null : (clientAddress.getHostString() + ":" + clientAddress.getPort()));
} else {
field.set(rs, clientAddress);
}
}
}
} while ((loop = loop.getSuperclass()) != Object.class);
}
SncpClient client = null;
{
Field e;
try {
e = newClazz.getDeclaredField("_client");
Field e = newClazz.getDeclaredField("_client");
e.setAccessible(true);
client = new SncpClient(name, hash(serviceClass), false, newClazz, true, clientAddress, groups);
e.set(rs, client);
} catch (NoSuchFieldException ne) {
return rs;
}
e.setAccessible(true);
e.set(rs, new SncpClient(name, hash(serviceClass), false, newClazz, true, clientAddress));
}
{
StringBuilder sb = new StringBuilder();
sb.append(newClazz.getName()).append("{name = ").append(name);
if (client != null) {
sb.append(", nameid = ").append(client.getNameid()).append(", serviceid = ").append(client.getServiceid());
sb.append(", action.size = ").append(client.getActionCount());
sb.append(", address = ").append(clientAddress).append(", groups = ").append(groups);
List<InetSocketAddress> addrs = new ArrayList<>();
for (Transport t : sameGroupTransports) {
addrs.addAll(Arrays.asList(t.getRemoteAddress()));
}
sb.append(", samegroups = ").append(addrs);
addrs.clear();
for (Transport t : diffGroupTransports) {
addrs.addAll(Arrays.asList(t.getRemoteAddress()));
}
sb.append(", diffgroups = ").append(addrs);
} else {
sb.append(", ").append(MultiRun.class.getSimpleName().toLowerCase()).append(" = false");
}
sb.append("}");
Field s = newClazz.getDeclaredField("_selfstring");
s.setAccessible(true);
s.set(rs, sb.toString());
}
if (client == null) return rs;
{
Field c = newClazz.getDeclaredField("_sameGroupTransports");
c.setAccessible(true);
@@ -648,12 +716,19 @@ public abstract class Sncp {
*
* private SncpClient _client;
*
* private String _selfstring;
*
* @Override
* public final String name() {
* return "";
* }
*
* @Override
* public String toString() {
* return _selfstring == null ? super.toString() : _selfstring;
* }
*
* @Override
* public boolean testChange(TestBean bean) {
* return _client.remote(_convert, _transport, 0, bean);
* }
@@ -674,12 +749,13 @@ public abstract class Sncp {
* @param <T>
* @param name
* @param serviceClass
* @param clientAddress
* @param clientAddress
* @param groups
* @param transport
* @return
*/
@SuppressWarnings("unchecked")
public static <T extends Service> T createRemoteService(final String name, final Class<T> serviceClass, final InetSocketAddress clientAddress, final Transport transport) {
public static <T extends Service> T createRemoteService(final String name, final Class<T> serviceClass, final InetSocketAddress clientAddress, HashSet<String> groups, final Transport transport) {
if (serviceClass == null) return null;
if (!Service.class.isAssignableFrom(serviceClass)) return null;
int mod = serviceClass.getModifiers();
@@ -694,7 +770,7 @@ public abstract class Sncp {
final String anyValueDesc = Type.getDescriptor(AnyValue.class);
ClassLoader loader = Sncp.class.getClassLoader();
String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceClass.getSimpleName();
final SncpClient client = new SncpClient(name, hash(serviceClass), true, createLocalServiceClass(name, serviceClass), false, clientAddress);
final SncpClient client = new SncpClient(name, hash(serviceClass), true, createLocalServiceClass(name, serviceClass), false, clientAddress, groups);
try {
Class newClazz = Class.forName(newDynName.replace('/', '.'));
T rs = (T) newClazz.newInstance();
@@ -704,11 +780,23 @@ public abstract class Sncp {
Field t = newClazz.getDeclaredField("_transport");
t.setAccessible(true);
t.set(rs, transport);
{
StringBuilder sb = new StringBuilder();
sb.append(newClazz.getName()).append("{name = ").append(name);
sb.append(", nameid = ").append(client.getNameid()).append(", serviceid = ").append(client.getServiceid());
sb.append(", action.size = ").append(client.getActionCount());
sb.append(", address = ").append(clientAddress).append(", groups = ").append(groups);
sb.append(", remotes = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddress()));
sb.append("}");
Field s = newClazz.getDeclaredField("_selfstring");
s.setAccessible(true);
s.set(rs, sb.toString());
}
return rs;
} catch (Exception ex) {
}
//------------------------------------------------------------------------------
ClassWriter cw = new ClassWriter(0);
ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
FieldVisitor fv;
DebugMethodVisitor mv;
AnnotationVisitor av0;
@@ -732,6 +820,10 @@ public abstract class Sncp {
fv = cw.visitField(ACC_PRIVATE, "_client", clientDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_selfstring", "Ljava/lang/String;", null, null);
fv.visitEnd();
}
{ //构造函数
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
//mv.setDebug(true);
@@ -760,6 +852,24 @@ public abstract class Sncp {
mv.visitMaxs(1, 1);
mv.visitEnd();
}
{ // toString()
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "_selfstring", "Ljava/lang/String;");
Label l1 = new Label();
mv.visitJumpInsn(IFNONNULL, l1);
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "toString", "()Ljava/lang/String;", false);
Label l2 = new Label();
mv.visitJumpInsn(GOTO, l2);
mv.visitLabel(l1);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "_selfstring", "Ljava/lang/String;");
mv.visitLabel(l2);
mv.visitInsn(ARETURN);
mv.visitMaxs(1, 1);
mv.visitEnd();
}
int i = -1;
for (final SncpAction entry : client.actions) {
final int index = ++i;
@@ -866,6 +976,18 @@ public abstract class Sncp {
Field t = newClazz.getDeclaredField("_transport");
t.setAccessible(true);
t.set(rs, transport);
{
StringBuilder sb = new StringBuilder();
sb.append(newClazz.getName()).append("{name = ").append(name);
sb.append(", nameid = ").append(client.getNameid()).append(", serviceid = ").append(client.getServiceid());
sb.append(", action.size = ").append(client.getActionCount());
sb.append(", address = ").append(clientAddress).append(", groups = ").append(groups);
sb.append(", remotes = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddress()));
sb.append("}");
Field s = newClazz.getDeclaredField("_selfstring");
s.setAccessible(true);
s.set(rs, sb.toString());
}
return rs;
} catch (Exception ex) {
throw new RuntimeException(ex);

View File

@@ -23,10 +23,6 @@ import java.util.logging.*;
*/
public final class SncpClient {
private final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName());
private final boolean debug = logger.isLoggable(Level.FINEST);
protected static final class SncpAction {
protected final DLong actionid;
@@ -73,12 +69,20 @@ public final class SncpClient {
}
}
private final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName());
private final boolean debug = logger.isLoggable(Level.FINEST);
protected final String name;
protected final boolean remote;
private final Class serviceClass;
protected final InetSocketAddress address;
protected final HashSet<String> groups;
private final byte[] addrBytes;
private final int addrPort;
@@ -91,10 +95,13 @@ public final class SncpClient {
protected final BlockingQueue<Runnable> queue = new ArrayBlockingQueue(1024 * 64);
public SncpClient(final String serviceName, final long serviceid0, boolean remote, final Class serviceClass, boolean onlySncpDyn, final InetSocketAddress clientAddress) {
public SncpClient(final String serviceName, final long serviceid0, boolean remote, final Class serviceClass,
boolean onlySncpDyn, final InetSocketAddress clientAddress, final HashSet<String> groups) {
if (serviceName.length() > 10) throw new RuntimeException(serviceClass + " @Resource name(" + serviceName + ") too long , must less 11");
this.remote = remote;
this.serviceClass = serviceClass;
this.address = clientAddress;
this.groups = groups;
//if (subLocalClass != null && !serviceClass.isAssignableFrom(subLocalClass)) throw new RuntimeException(subLocalClass + " is not " + serviceClass + " sub class ");
this.name = serviceName;
this.nameid = Sncp.hash(serviceName);
@@ -108,7 +115,6 @@ public final class SncpClient {
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
logger.fine("[" + Thread.currentThread().getName() + "] Load " + this);
new Thread() {
{
setName(SncpClient.class.getSimpleName() + serviceClass.getSimpleName() + "-" + serviceName + "-Thread");
@@ -129,11 +135,25 @@ public final class SncpClient {
}.start();
}
public long getNameid() {
return nameid;
}
public long getServiceid() {
return serviceid;
}
public int getActionCount() {
return actions.length;
}
@Override
public String toString() {
String service = serviceClass.getName();
if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX);
return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", name = " + name + ", nameid = " + nameid + ", actions.size = " + actions.length + ")";
return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid
+ ", name = " + name + ", nameid = " + nameid + ", address = " + (address == null ? "" : (address.getHostString() + ":" + address.getPort()))
+ ", groups = " + groups + ", actions.size = " + actions.length + ")";
}
public static List<Method> parseMethod(final Class serviceClass, boolean onlySncpDyn) {

View File

@@ -1,272 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.service;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
/**
*
* @author zhangjx
*/
@AutoLoad(false)
public class WebSocketNodeService2 implements Service {
public static final int RETCODE_ENGINE_NULL = 5001;
public static final int RETCODE_NODESERVICE_NULL = 5002;
public static final int RETCODE_GROUP_EMPTY = 5005;
public static final int RETCODE_WSOFFLINE = 5011;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean finest = logger.isLoggable(Level.FINEST);
@Resource(name = "APP_NODE")
protected String localNodeName = "";
@Resource
protected HashMap<String, WebSocketNodeService2> nodemaps;
//用户分布在节点上的队列信息,只保存远程节点的用户分布信息
protected final ConcurrentHashMap<Serializable, Set<String>> usernodes = new ConcurrentHashMap();
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
public void initUserNodes() {
if (this.nodemaps == null || this.nodemaps.isEmpty()) return;
new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
usernodes.putAll(queryNodes());
}
}.start();
}
public final void addWebSocketEngine(WebSocketEngine engine) {
engines.put(engine.getEngineid(), engine);
}
////@RemoteOn
public Map<Serializable, Set<String>> queryNodes() {
Map<Serializable, Set<String>> rs = new HashMap<>();
this.nodemaps.forEach((x, y) -> {
if (!rs.isEmpty()) return;
try {
rs.putAll(y.queryNodes());
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " query error (" + x + ")", e);
}
});
return rs;
}
public final Map<Serializable, Set<String>> onQueryNodes() {
Map<Serializable, Set<String>> rs = new HashMap<>();
rs.putAll(this.usernodes);
return rs;
}
public void connectSelf(Serializable userid) {
connect(this.localNodeName, userid);
}
public void disconnectSelf(Serializable userid) {
disconnect(this.localNodeName, userid);
}
////@RemoteOn
public void connect(String nodeid, Serializable userid) {
onConnect(nodeid, userid);
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")");
y.connect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")", e);
}
});
}
public final void onConnect(String nodeid, Serializable userid) {
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket connect event (" + userid + " on " + nodeid + ").");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) {
userNodelist = new CopyOnWriteArraySet<>();
usernodes.put(userid, userNodelist);
}
userNodelist.add(nodeid);
}
////@RemoteOn
public void disconnect(String nodeid, Serializable userid) {
onDisconnect(nodeid, userid);
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")");
y.disconnect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")", e);
}
});
}
public final void onDisconnect(String nodeid, Serializable userid) {
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket disconnect event (" + userid + " on " + nodeid + ").");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) return;
userNodelist.remove(nodeid);
if (userNodelist.isEmpty()) usernodes.remove(userid);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, String text) {
return send(engineid, groupid, text, true);
}
public final int onSend(String engineid, Serializable groupid, String text) {
return onSend(engineid, groupid, text, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, String text, boolean last) {
return send0(engineid, groupid, false, text, last);
}
public final int onSend(String engineid, Serializable groupid, String text, boolean last) {
return onSend0(engineid, groupid, false, text, last);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, String text) {
return send0(engineid, groupid, recent, text, true);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, String text) {
return onSend0(engineid, groupid, recent, text, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, String text, boolean last) {
return send0(engineid, groupid, recent, text, last);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, String text, boolean last) {
return onSend0(engineid, groupid, recent, text, last);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, byte[] data) {
return send(engineid, groupid, data, true);
}
public final int onSend(String engineid, Serializable groupid, byte[] data) {
return onSend(engineid, groupid, data, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, byte[] data, boolean last) {
return send0(engineid, groupid, false, data, last);
}
public final int onSend(String engineid, Serializable groupid, byte[] data, boolean last) {
return onSend0(engineid, groupid, false, data, last);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, byte[] data) {
return send0(engineid, groupid, recent, data, true);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, byte[] data) {
return onSend0(engineid, groupid, recent, data, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, byte[] data, boolean last) {
return send0(engineid, groupid, recent, data, last);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, byte[] data, boolean last) {
return onSend0(engineid, groupid, recent, data, last);
}
private int send0(String engineid, Serializable groupid, boolean recent, Serializable text, boolean last) {
final Set<String> nodes = usernodes.get(groupid);
if (nodes == null) return RETCODE_WSOFFLINE; //未登录
int rs = 0;
if (nodes.contains(this.localNodeName)) rs = onSend0(engineid, groupid, recent, text, last);
if (nodemaps == null) return rs;
this.nodemaps.forEach((x, y) -> {
if (nodes.contains(x)) {
int irs = -1;
try {
if (text != null && text.getClass() == byte[].class) {
irs = y.send(engineid, groupid, (byte[]) text, last);
} else {
irs = y.send(engineid, groupid, (String) text, last);
}
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} finish and result is " + irs);
} catch (Exception e) {
onDisconnect(x, groupid);
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} failed and result is " + irs, e);
}
}
});
return rs;
}
/**
* 消息接受者存在WebSocket并发送成功返回true 否则返回false
*
* @param engineid
* @param groupid 接收方
* @param recent 是否只发送最近的WebSocket端
* @param text
* @return
*/
private int onSend0(String engineid, Serializable groupid, boolean recent, Serializable text, boolean last) {
WebSocketEngine webSocketEngine = engines.get(engineid);
if (webSocketEngine == null) {
if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_ENGINE_NULL);
return RETCODE_ENGINE_NULL;
}
WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'}.");
if (text != null && text.getClass() == byte[].class) {
if (recent) {
group.getRecentWebSocket().send((byte[]) text, last);
} else {
group.getWebSockets().forEach(x -> x.send((byte[]) text, last));
}
} else {
if (recent) {
group.getRecentWebSocket().send(text.toString(), last);
} else {
group.getWebSockets().forEach(x -> x.send(text.toString(), last));
}
}
return 0;
}
}