diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index a9adaedce..df1bce67d 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -77,7 +77,9 @@ public final class Application { protected final InetAddress localAddress; - protected final List sources = new CopyOnWriteArrayList<>(); + protected final List cacheSources = new CopyOnWriteArrayList<>(); + + protected final List dataSources = new CopyOnWriteArrayList<>(); protected final List servers = new CopyOnWriteArrayList<>(); @@ -527,13 +529,21 @@ public final class Application { serversLatch.countDown(); } }); - for (DataSource source : sources) { + + for (DataSource source : dataSources) { try { source.getClass().getMethod("close").invoke(source); } catch (Exception e) { logger.log(Level.FINER, "close DataSource erroneous", e); } } + for (CacheSource source : cacheSources) { + try { + source.getClass().getMethod("close").invoke(source); + } catch (Exception e) { + logger.log(Level.FINER, "close CacheSource erroneous", e); + } + } } private static AnyValue load(final InputStream in0) { diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index d5b0c68f3..2d2544fe6 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -66,19 +66,16 @@ public final class NodeHttpServer extends NodeServer { private void initWebSocketService() { final NodeServer self = this; final ResourceFactory regFactory = application.getResourceFactory(); - factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, Field field, Object attachment) -> { + factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, Object attachment) -> { try { - Resource rs = field.getAnnotation(Resource.class); - if (rs == null) return; + if (field.getAnnotation(Resource.class) == null) return; if (!(src instanceof WebSocketServlet)) return; - String rcname = rs.name(); - if (rcname.contains(ResourceFactory.RESOURCE_PARENT_NAME)) rcname = rcname.replace(ResourceFactory.RESOURCE_PARENT_NAME, ((WebSocketServlet) src).name()); synchronized (regFactory) { - Service nodeService = (Service) rf.find(rcname, WebSocketNode.class); + Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class); if (nodeService == null) { - nodeService = Sncp.createLocalService(rcname, getExecutor(), (Class) WebSocketNodeService.class, + nodeService = Sncp.createLocalService(resourceName, getExecutor(), (Class) WebSocketNodeService.class, getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports); - regFactory.register(rcname, WebSocketNode.class, nodeService); + regFactory.register(resourceName, WebSocketNode.class, nodeService); factory.inject(nodeService, self); logger.fine("[" + Thread.currentThread().getName() + "] Load " + nodeService); if (getSncpAddress() != null) { @@ -88,7 +85,7 @@ public final class NodeHttpServer extends NodeServer { sncpServer = (NodeSncpServer) node; } } - ServiceWrapper wrapper = new ServiceWrapper(WebSocketNodeService.class, nodeService, rcname, getSncpGroup(), sncpDefaultGroups, null); + ServiceWrapper wrapper = new ServiceWrapper(WebSocketNodeService.class, nodeService, resourceName, getSncpGroup(), sncpDefaultGroups, null); sncpServer.getSncpServer().addService(wrapper); } } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 399544cd0..4e47420a0 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -150,14 +150,13 @@ public abstract class NodeServer { final NodeServer self = this; //--------------------------------------------------------------------------------------------- final ResourceFactory regFactory = application.getResourceFactory(); - factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field, final Object attachment) -> { + factory.add(DataSource.class, (ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> { try { - Resource rs = field.getAnnotation(Resource.class); - if (rs == null) return; + if (field.getAnnotation(Resource.class) == null) return; if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource - DataSource source = new DataDefaultSource(rs.name()); - application.sources.add(source); - regFactory.register(rs.name(), DataSource.class, source); + DataSource source = new DataDefaultSource(resourceName); + application.dataSources.add(source); + regFactory.register(resourceName, DataSource.class, source); List sameGroupTransports = sncpSameGroupTransports; List diffGroupTransports = sncpDiffGroupTransports; try { @@ -173,10 +172,10 @@ public abstract class NodeServer { } catch (Exception e) { //src 不含 MultiRun 方法 } - if (factory.find(rs.name(), DataCacheListener.class) == null) { - Service cacheListenerService = Sncp.createLocalService(rs.name(), getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); - regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService); - ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, rs.name(), sncpGroup, sncpDefaultGroups, null); + if (factory.find(resourceName, DataCacheListener.class) == null) { + Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); + regFactory.register(resourceName, DataCacheListener.class, cacheListenerService); + ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, sncpDefaultGroups, null); localServiceWrappers.add(wrapper); if (consumer != null) consumer.accept(wrapper); rf.inject(cacheListenerService, self); @@ -187,6 +186,35 @@ public abstract class NodeServer { logger.log(Level.SEVERE, "DataSource inject error", e); } }); + factory.add(CacheSource.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> { + try { + if (field.getAnnotation(Resource.class) == null) return; + if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource + List sameGroupTransports = sncpSameGroupTransports; + List diffGroupTransports = sncpDiffGroupTransports; + try { + Field ts = src.getClass().getDeclaredField("_sameGroupTransports"); + ts.setAccessible(true); + Transport[] lts = (Transport[]) ts.get(src); + sameGroupTransports = Arrays.asList(lts); + + ts = src.getClass().getDeclaredField("_diffGroupTransports"); + ts.setAccessible(true); + lts = (Transport[]) ts.get(src); + diffGroupTransports = Arrays.asList(lts); + } catch (Exception e) { + //src 不含 MultiRun 方法 + } + CacheSource source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); + application.cacheSources.add(source); + regFactory.register(resourceName, CacheSource.class, source); + field.set(src, source); + rf.inject(source, self); // + ((Service) source).init(null); + } catch (Exception e) { + logger.log(Level.SEVERE, "DataSource inject error", e); + } + }); } private void initGroup() { diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index bc7953d38..be307822d 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -13,6 +13,7 @@ import java.util.concurrent.*; import java.util.logging.*; import javax.annotation.*; import org.redkale.net.sncp.*; +import org.redkale.source.*; import org.redkale.util.*; /** @@ -33,7 +34,8 @@ public abstract class WebSocketNode { protected WebSocketNode remoteNode; //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合 - protected final ConcurrentHashMap> dataNodes = new ConcurrentHashMap(); + @Resource(name = "$_nodeaddress_source") + protected CacheSource source; //存放本地节点上所有在线用户的队列信息,Set 为 engineid 的集合 protected final ConcurrentHashMap> localNodes = new ConcurrentHashMap(); @@ -41,23 +43,7 @@ public abstract class WebSocketNode { protected final ConcurrentHashMap engines = new ConcurrentHashMap(); public void init(AnyValue conf) { - if (remoteNode != null) { - new Thread() { - { - setDaemon(true); - } - @Override - public void run() { - try { - Map> map = remoteNode.getDataNodes(); - if (map != null) dataNodes.putAll(map); - } catch (Exception e) { - logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + localSncpAddress + ") not load data nodes ", e); - } - } - }.start(); - } } public void destroy(AnyValue conf) { @@ -69,10 +55,6 @@ public abstract class WebSocketNode { }); } - public Map> getDataNodes() { - return dataNodes; - } - protected abstract int sendMessage(@SncpParam(SncpParamType.TargetAddress) InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last); protected abstract void connect(Serializable groupid, InetSocketAddress addr); @@ -124,7 +106,7 @@ public abstract class WebSocketNode { } } if ((recent && rscode == 0) || remoteNode == null) return rscode; - LinkedHashSet addrs = dataNodes.get(groupid); + LinkedHashSet addrs = source.get(groupid); if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点 if (recent) { InetSocketAddress one = null; diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index da02aadac..01664bb35 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -18,23 +18,23 @@ import org.redkale.util.*; /** * 当WebSocketServlet接收一个TCP连接后,进行协议判断,如果成功就会创建一个WebSocket。 - * - * WebSocketServlet - * | - * | - * WebSocketEngine - * / \ - * / \ - * / \ - * WebSocketGroup1 WebSocketGroup2 - * / \ / \ - * / \ / \ - * WebSocket1 WebSocket2 WebSocket3 WebSocket4 + * + * WebSocketServlet + * | + * | + * WebSocketEngine + * / \ + * / \ + * / \ + * WebSocketGroup1 WebSocketGroup2 + * / \ / \ + * / \ / \ + * WebSocket1 WebSocket2 WebSocket3 WebSocket4 * * @see http://www.redkale.org * @author zhangjx */ -public abstract class WebSocketServlet extends HttpServlet implements Nameable { +public abstract class WebSocketServlet extends HttpServlet { public static final String WEBPARAM__LIVEINTERVAL = "liveinterval"; @@ -76,7 +76,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Nameable { engine.close(); } - @Override public String name() { return this.getClass().getSimpleName().replace("Servlet", "").replace("WebSocket", "").toLowerCase(); } diff --git a/src/org/redkale/service/CacheSourceService.java b/src/org/redkale/service/CacheSourceService.java new file mode 100644 index 000000000..cac0c63a4 --- /dev/null +++ b/src/org/redkale/service/CacheSourceService.java @@ -0,0 +1,249 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.service; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.*; +import java.util.logging.*; +import org.redkale.convert.json.*; +import org.redkale.source.*; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +@AutoLoad(false) +public class CacheSourceService implements CacheSource, Service { + + private ScheduledThreadPoolExecutor scheduler; + + private Consumer expireHandler; + + private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected final ConcurrentHashMap container = new ConcurrentHashMap<>(); + + @Override + public void init(AnyValue conf) { + final CacheSourceService self = this; + AnyValue prop = conf == null ? null : conf.getAnyValue("property"); + String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler"); + if (expireHandlerClass != null) { + try { + this.expireHandler = (Consumer) Class.forName(expireHandlerClass).newInstance(); + } catch (Exception e) { + logger.log(Level.SEVERE, self.getClass().getSimpleName() + " new expirehandler class (" + expireHandlerClass + ") instance error", e); + } + } + if (scheduler == null) { + this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { + final Thread t = new Thread(r, self.getClass().getSimpleName() + "-Expirer-Thread"); + t.setDaemon(true); + return t; + }); + final List keys = new ArrayList<>(); + scheduler.scheduleWithFixedDelay(() -> { + keys.clear(); + int now = (int) (System.currentTimeMillis() / 1000); + container.forEach((k, x) -> { + if (x.expireSeconds > 0 && (now > (x.lastAccessed + x.expireSeconds))) { + keys.add(x.key); + } + }); + for (Serializable key : keys) { + CacheEntry entry = container.remove(key); + if (expireHandler != null && entry != null) expireHandler.accept(entry); + } + }, 10, 10, TimeUnit.SECONDS); + logger.finest(self.getClass().getSimpleName() + ":" + self.name() + " start schedule expire executor"); + } + } + + public void close() { //给Application 关闭时调用 + if (scheduler != null) scheduler.shutdownNow(); + } + + @Override + public void destroy(AnyValue conf) { + if (scheduler != null) scheduler.shutdownNow(); + } + + @Override + public boolean exists(Serializable key) { + if (key == null) return false; + return container.containsKey(key); + } + + @Override + public T get(Serializable key) { + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null) return null; + return (T) entry.getValue(); + } + + @Override + @MultiRun + public T refreshAndGet(Serializable key) { + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null) return null; + entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + return (T) entry.getValue(); + } + + @Override + @MultiRun + public void refresh(Serializable key) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null) return; + entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + } + + @Override + @MultiRun + public void set(Serializable key, T value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null) { + entry = new CacheEntry(key, value); + container.putIfAbsent(key, entry); + } else { + entry.value = value; + } + } + + @Override + @MultiRun + public void setExpireSeconds(Serializable key, int expireSeconds) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null) return; + entry.expireSeconds = expireSeconds; + } + + @Override + @MultiRun + public void set(int expireSeconds, Serializable key, T value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null) { + entry = new CacheEntry(expireSeconds, key, value); + container.putIfAbsent(key, entry); + } else { + if (expireSeconds > 0) entry.expireSeconds = expireSeconds; + entry.value = value; + } + } + + @Override + @MultiRun + public void remove(Serializable key) { + if (key == null) return; + container.remove(key); + } + + @Override + @MultiRun + public void appendListItem(Serializable key, V value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || !(entry.value instanceof List)) { + List list = new CopyOnWriteArrayList<>(); + entry = new CacheEntry(key, list); + CacheEntry old = container.putIfAbsent(key, entry); + if (old != null) list = (List) old.value; + list.add(value); + } else { + ((List) entry.getValue()).add(value); + } + } + + @Override + @MultiRun + public void removeListItem(Serializable key, V value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || !(entry.value instanceof List)) return; + ((List) entry.getValue()).remove(value); + } + + @Override + public void appendSetItem(Serializable key, V value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || !(entry.value instanceof Set)) { + Set set = new CopyOnWriteArraySet(); + entry = new CacheEntry(key, set); + CacheEntry old = container.putIfAbsent(key, entry); + if (old != null) set = (Set) old.value; + set.add(value); + } else { + ((Set) entry.getValue()).add(value); + } + } + + @Override + @MultiRun + public void removeSetItem(Serializable key, V value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || !(entry.value instanceof Set)) return; + ((Set) entry.getValue()).remove(value); + } + + public static final class CacheEntry { + + private final int createTime; //创建时间 + + private volatile int lastAccessed; //最后刷新时间 + + //<=0表示永久保存 + private int expireSeconds; + + private T value; + + private final Serializable key; + + public CacheEntry(Serializable key, T value) { + this(0, key, value); + } + + public CacheEntry(int expireSecond, Serializable key, T value) { + this.expireSeconds = expireSecond; + this.createTime = (int) (System.currentTimeMillis() / 1000); + this.lastAccessed = this.createTime; + this.key = key; + this.value = value; + } + + @Override + public String toString() { + return JsonFactory.root().getConvert().convertTo(this); + } + + public long getCreateTime() { + return createTime; + } + + public long getLastAccessed() { + return lastAccessed; + } + + public T getValue() { + return value; + } + + public Serializable getKey() { + return key; + } + + } +} diff --git a/src/org/redkale/service/Service.java b/src/org/redkale/service/Service.java index c95deb19a..2021b735d 100644 --- a/src/org/redkale/service/Service.java +++ b/src/org/redkale/service/Service.java @@ -19,7 +19,7 @@ import org.redkale.util.*; * @see http://www.redkale.org * @author zhangjx */ -public interface Service extends Nameable { +public interface Service { /** * 该方法必须是可以重复调用, 当reload时需要重复调用init方法 @@ -39,7 +39,6 @@ public interface Service extends Nameable { *

* @return */ - @Override default String name() { return ""; } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 871dd1be9..d6c197654 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -54,24 +54,14 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override @MultiRun public void connect(Serializable groupid, InetSocketAddress addr) { - LinkedHashSet addrs = dataNodes.get(groupid); - if (addrs == null) { - addrs = new LinkedHashSet<>(); - dataNodes.put(groupid, addrs); - } - addrs.add(addr); + source.appendSetItem(groupid, addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr); } @Override @MultiRun public void disconnect(Serializable groupid, InetSocketAddress addr) { - Set addrs = dataNodes.get(groupid); - if (addrs == null) return; - synchronized (addrs) { - addrs.remove(addr); - } - if (addrs.isEmpty()) dataNodes.remove(groupid); + source.removeSetItem(groupid, addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); } } diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index 03217f8ce..e9e07666b 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -30,6 +30,8 @@ public interface CacheSource { public void set(final int expireSeconds, final Serializable key, final T value); + public void setExpireSeconds(Serializable key, int expireSeconds); + public void remove(final Serializable key); public void appendListItem(final Serializable key, final V value); diff --git a/src/org/redkale/source/DataDefaultSource.java b/src/org/redkale/source/DataDefaultSource.java index d13aa49f4..8830956a9 100644 --- a/src/org/redkale/source/DataDefaultSource.java +++ b/src/org/redkale/source/DataDefaultSource.java @@ -25,7 +25,7 @@ import org.redkale.util.*; * @author zhangjx */ @SuppressWarnings("unchecked") -public final class DataDefaultSource implements DataSource, Nameable, Function { +public final class DataDefaultSource implements DataSource, Function { public static final String DATASOURCE_CONFPATH = "DATASOURCE_CONFPATH"; @@ -264,7 +264,6 @@ public final class DataDefaultSource implements DataSource, Nameable, Function