This commit is contained in:
wentch
2015-12-17 10:42:10 +08:00
parent e30113cb38
commit d4bd020210
12 changed files with 340 additions and 92 deletions

View File

@@ -77,7 +77,9 @@ public final class Application {
protected final InetAddress localAddress;
protected final List<DataSource> sources = new CopyOnWriteArrayList<>();
protected final List<CacheSource> cacheSources = new CopyOnWriteArrayList<>();
protected final List<DataSource> dataSources = new CopyOnWriteArrayList<>();
protected final List<NodeServer> servers = new CopyOnWriteArrayList<>();
@@ -527,13 +529,21 @@ public final class Application {
serversLatch.countDown();
}
});
for (DataSource source : sources) {
for (DataSource source : dataSources) {
try {
source.getClass().getMethod("close").invoke(source);
} catch (Exception e) {
logger.log(Level.FINER, "close DataSource erroneous", e);
}
}
for (CacheSource source : cacheSources) {
try {
source.getClass().getMethod("close").invoke(source);
} catch (Exception e) {
logger.log(Level.FINER, "close CacheSource erroneous", e);
}
}
}
private static AnyValue load(final InputStream in0) {

View File

@@ -66,19 +66,16 @@ public final class NodeHttpServer extends NodeServer {
private void initWebSocketService() {
final NodeServer self = this;
final ResourceFactory regFactory = application.getResourceFactory();
factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, Field field, Object attachment) -> {
factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, Object attachment) -> {
try {
Resource rs = field.getAnnotation(Resource.class);
if (rs == null) return;
if (field.getAnnotation(Resource.class) == null) return;
if (!(src instanceof WebSocketServlet)) return;
String rcname = rs.name();
if (rcname.contains(ResourceFactory.RESOURCE_PARENT_NAME)) rcname = rcname.replace(ResourceFactory.RESOURCE_PARENT_NAME, ((WebSocketServlet) src).name());
synchronized (regFactory) {
Service nodeService = (Service) rf.find(rcname, WebSocketNode.class);
Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class);
if (nodeService == null) {
nodeService = Sncp.createLocalService(rcname, getExecutor(), (Class<? extends Service>) WebSocketNodeService.class,
nodeService = Sncp.createLocalService(resourceName, getExecutor(), (Class<? extends Service>) WebSocketNodeService.class,
getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(rcname, WebSocketNode.class, nodeService);
regFactory.register(resourceName, WebSocketNode.class, nodeService);
factory.inject(nodeService, self);
logger.fine("[" + Thread.currentThread().getName() + "] Load " + nodeService);
if (getSncpAddress() != null) {
@@ -88,7 +85,7 @@ public final class NodeHttpServer extends NodeServer {
sncpServer = (NodeSncpServer) node;
}
}
ServiceWrapper wrapper = new ServiceWrapper(WebSocketNodeService.class, nodeService, rcname, getSncpGroup(), sncpDefaultGroups, null);
ServiceWrapper wrapper = new ServiceWrapper(WebSocketNodeService.class, nodeService, resourceName, getSncpGroup(), sncpDefaultGroups, null);
sncpServer.getSncpServer().addService(wrapper);
}
}

View File

@@ -150,14 +150,13 @@ public abstract class NodeServer {
final NodeServer self = this;
//---------------------------------------------------------------------------------------------
final ResourceFactory regFactory = application.getResourceFactory();
factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field, final Object attachment) -> {
factory.add(DataSource.class, (ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> {
try {
Resource rs = field.getAnnotation(Resource.class);
if (rs == null) return;
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource
DataSource source = new DataDefaultSource(rs.name());
application.sources.add(source);
regFactory.register(rs.name(), DataSource.class, source);
DataSource source = new DataDefaultSource(resourceName);
application.dataSources.add(source);
regFactory.register(resourceName, DataSource.class, source);
List<Transport> sameGroupTransports = sncpSameGroupTransports;
List<Transport> diffGroupTransports = sncpDiffGroupTransports;
try {
@@ -173,10 +172,10 @@ public abstract class NodeServer {
} catch (Exception e) {
//src 不含 MultiRun 方法
}
if (factory.find(rs.name(), DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(rs.name(), getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, rs.name(), sncpGroup, sncpDefaultGroups, null);
if (factory.find(resourceName, DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
regFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, sncpDefaultGroups, null);
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
rf.inject(cacheListenerService, self);
@@ -187,6 +186,35 @@ public abstract class NodeServer {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
});
factory.add(CacheSource.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource
List<Transport> sameGroupTransports = sncpSameGroupTransports;
List<Transport> diffGroupTransports = sncpDiffGroupTransports;
try {
Field ts = src.getClass().getDeclaredField("_sameGroupTransports");
ts.setAccessible(true);
Transport[] lts = (Transport[]) ts.get(src);
sameGroupTransports = Arrays.asList(lts);
ts = src.getClass().getDeclaredField("_diffGroupTransports");
ts.setAccessible(true);
lts = (Transport[]) ts.get(src);
diffGroupTransports = Arrays.asList(lts);
} catch (Exception e) {
//src 不含 MultiRun 方法
}
CacheSource source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
application.cacheSources.add(source);
regFactory.register(resourceName, CacheSource.class, source);
field.set(src, source);
rf.inject(source, self); //
((Service) source).init(null);
} catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
});
}
private void initGroup() {

View File

@@ -13,6 +13,7 @@ import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
import org.redkale.net.sncp.*;
import org.redkale.source.*;
import org.redkale.util.*;
/**
@@ -33,7 +34,8 @@ public abstract class WebSocketNode {
protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
protected final ConcurrentHashMap<Serializable, LinkedHashSet<InetSocketAddress>> dataNodes = new ConcurrentHashMap();
@Resource(name = "$_nodeaddress_source")
protected CacheSource source;
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合
protected final ConcurrentHashMap<Serializable, Set<String>> localNodes = new ConcurrentHashMap();
@@ -41,23 +43,7 @@ public abstract class WebSocketNode {
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
public void init(AnyValue conf) {
if (remoteNode != null) {
new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
try {
Map<Serializable, LinkedHashSet<InetSocketAddress>> map = remoteNode.getDataNodes();
if (map != null) dataNodes.putAll(map);
} catch (Exception e) {
logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + localSncpAddress + ") not load data nodes ", e);
}
}
}.start();
}
}
public void destroy(AnyValue conf) {
@@ -69,10 +55,6 @@ public abstract class WebSocketNode {
});
}
public Map<Serializable, LinkedHashSet<InetSocketAddress>> getDataNodes() {
return dataNodes;
}
protected abstract int sendMessage(@SncpParam(SncpParamType.TargetAddress) InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last);
protected abstract void connect(Serializable groupid, InetSocketAddress addr);
@@ -124,7 +106,7 @@ public abstract class WebSocketNode {
}
}
if ((recent && rscode == 0) || remoteNode == null) return rscode;
LinkedHashSet<InetSocketAddress> addrs = dataNodes.get(groupid);
LinkedHashSet<InetSocketAddress> addrs = source.get(groupid);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点
if (recent) {
InetSocketAddress one = null;

View File

@@ -18,23 +18,23 @@ import org.redkale.util.*;
/**
* 当WebSocketServlet接收一个TCP连接后进行协议判断如果成功就会创建一个WebSocket。
*
* WebSocketServlet
* |
* |
* WebSocketEngine
* / \
* / \
* / \
* WebSocketGroup1 WebSocketGroup2
* / \ / \
* / \ / \
* WebSocket1 WebSocket2 WebSocket3 WebSocket4
*
* WebSocketServlet
* |
* |
* WebSocketEngine
* / \
* / \
* / \
* WebSocketGroup1 WebSocketGroup2
* / \ / \
* / \ / \
* WebSocket1 WebSocket2 WebSocket3 WebSocket4
*
* @see http://www.redkale.org
* @author zhangjx
*/
public abstract class WebSocketServlet extends HttpServlet implements Nameable {
public abstract class WebSocketServlet extends HttpServlet {
public static final String WEBPARAM__LIVEINTERVAL = "liveinterval";
@@ -76,7 +76,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Nameable {
engine.close();
}
@Override
public String name() {
return this.getClass().getSimpleName().replace("Servlet", "").replace("WebSocket", "").toLowerCase();
}

View File

@@ -0,0 +1,249 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.service;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.convert.json.*;
import org.redkale.source.*;
import org.redkale.util.*;
/**
*
* @author zhangjx
*/
@AutoLoad(false)
public class CacheSourceService implements CacheSource, Service {
private ScheduledThreadPoolExecutor scheduler;
private Consumer<CacheEntry> expireHandler;
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final ConcurrentHashMap<Serializable, CacheEntry> container = new ConcurrentHashMap<>();
@Override
public void init(AnyValue conf) {
final CacheSourceService self = this;
AnyValue prop = conf == null ? null : conf.getAnyValue("property");
String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler");
if (expireHandlerClass != null) {
try {
this.expireHandler = (Consumer<CacheEntry>) Class.forName(expireHandlerClass).newInstance();
} catch (Exception e) {
logger.log(Level.SEVERE, self.getClass().getSimpleName() + " new expirehandler class (" + expireHandlerClass + ") instance error", e);
}
}
if (scheduler == null) {
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, self.getClass().getSimpleName() + "-Expirer-Thread");
t.setDaemon(true);
return t;
});
final List<Serializable> keys = new ArrayList<>();
scheduler.scheduleWithFixedDelay(() -> {
keys.clear();
int now = (int) (System.currentTimeMillis() / 1000);
container.forEach((k, x) -> {
if (x.expireSeconds > 0 && (now > (x.lastAccessed + x.expireSeconds))) {
keys.add(x.key);
}
});
for (Serializable key : keys) {
CacheEntry entry = container.remove(key);
if (expireHandler != null && entry != null) expireHandler.accept(entry);
}
}, 10, 10, TimeUnit.SECONDS);
logger.finest(self.getClass().getSimpleName() + ":" + self.name() + " start schedule expire executor");
}
}
public void close() { //给Application 关闭时调用
if (scheduler != null) scheduler.shutdownNow();
}
@Override
public void destroy(AnyValue conf) {
if (scheduler != null) scheduler.shutdownNow();
}
@Override
public boolean exists(Serializable key) {
if (key == null) return false;
return container.containsKey(key);
}
@Override
public <T> T get(Serializable key) {
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null) return null;
return (T) entry.getValue();
}
@Override
@MultiRun
public <T> T refreshAndGet(Serializable key) {
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null) return null;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
return (T) entry.getValue();
}
@Override
@MultiRun
public void refresh(Serializable key) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) return;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
}
@Override
@MultiRun
public <T> void set(Serializable key, T value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(key, value);
container.putIfAbsent(key, entry);
} else {
entry.value = value;
}
}
@Override
@MultiRun
public void setExpireSeconds(Serializable key, int expireSeconds) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) return;
entry.expireSeconds = expireSeconds;
}
@Override
@MultiRun
public <T> void set(int expireSeconds, Serializable key, T value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(expireSeconds, key, value);
container.putIfAbsent(key, entry);
} else {
if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
entry.value = value;
}
}
@Override
@MultiRun
public void remove(Serializable key) {
if (key == null) return;
container.remove(key);
}
@Override
@MultiRun
public <V> void appendListItem(Serializable key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !(entry.value instanceof List)) {
List<V> list = new CopyOnWriteArrayList<>();
entry = new CacheEntry(key, list);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) list = (List) old.value;
list.add(value);
} else {
((List) entry.getValue()).add(value);
}
}
@Override
@MultiRun
public <V> void removeListItem(Serializable key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !(entry.value instanceof List)) return;
((List) entry.getValue()).remove(value);
}
@Override
public <V> void appendSetItem(Serializable key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !(entry.value instanceof Set)) {
Set<V> set = new CopyOnWriteArraySet();
entry = new CacheEntry(key, set);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) set = (Set) old.value;
set.add(value);
} else {
((Set) entry.getValue()).add(value);
}
}
@Override
@MultiRun
public <V> void removeSetItem(Serializable key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !(entry.value instanceof Set)) return;
((Set) entry.getValue()).remove(value);
}
public static final class CacheEntry<T> {
private final int createTime; //创建时间
private volatile int lastAccessed; //最后刷新时间
//<=0表示永久保存
private int expireSeconds;
private T value;
private final Serializable key;
public CacheEntry(Serializable key, T value) {
this(0, key, value);
}
public CacheEntry(int expireSecond, Serializable key, T value) {
this.expireSeconds = expireSecond;
this.createTime = (int) (System.currentTimeMillis() / 1000);
this.lastAccessed = this.createTime;
this.key = key;
this.value = value;
}
@Override
public String toString() {
return JsonFactory.root().getConvert().convertTo(this);
}
public long getCreateTime() {
return createTime;
}
public long getLastAccessed() {
return lastAccessed;
}
public T getValue() {
return value;
}
public Serializable getKey() {
return key;
}
}
}

View File

@@ -19,7 +19,7 @@ import org.redkale.util.*;
* @see http://www.redkale.org
* @author zhangjx
*/
public interface Service extends Nameable {
public interface Service {
/**
* 该方法必须是可以重复调用, 当reload时需要重复调用init方法
@@ -39,7 +39,6 @@ public interface Service extends Nameable {
* <p>
* @return
*/
@Override
default String name() {
return "";
}

View File

@@ -54,24 +54,14 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
@MultiRun
public void connect(Serializable groupid, InetSocketAddress addr) {
LinkedHashSet<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs == null) {
addrs = new LinkedHashSet<>();
dataNodes.put(groupid, addrs);
}
addrs.add(addr);
source.appendSetItem(groupid, addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr);
}
@Override
@MultiRun
public void disconnect(Serializable groupid, InetSocketAddress addr) {
Set<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs == null) return;
synchronized (addrs) {
addrs.remove(addr);
}
if (addrs.isEmpty()) dataNodes.remove(groupid);
source.removeSetItem(groupid, addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
}
}

View File

@@ -30,6 +30,8 @@ public interface CacheSource {
public <T> void set(final int expireSeconds, final Serializable key, final T value);
public void setExpireSeconds(Serializable key, int expireSeconds);
public void remove(final Serializable key);
public <V> void appendListItem(final Serializable key, final V value);

View File

@@ -25,7 +25,7 @@ import org.redkale.util.*;
* @author zhangjx
*/
@SuppressWarnings("unchecked")
public final class DataDefaultSource implements DataSource, Nameable, Function<Class, EntityInfo> {
public final class DataDefaultSource implements DataSource, Function<Class, EntityInfo> {
public static final String DATASOURCE_CONFPATH = "DATASOURCE_CONFPATH";
@@ -264,7 +264,6 @@ public final class DataDefaultSource implements DataSource, Nameable, Function<C
return (ConnectionPoolDataSource) pdsource;
}
@Override
public final String name() {
return name;
}

View File

@@ -1,16 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.util;
/**
*
* @see http://www.redkale.org
* @author zhangjx
*/
public interface Nameable {
String name();
}

View File

@@ -182,7 +182,16 @@ public final class ResourceFactory {
continue;
}
if (Modifier.isFinal(field.getModifiers())) continue;
final String rcname = (rc.name().contains(RESOURCE_PARENT_NAME) && src instanceof Nameable) ? rc.name().replace(RESOURCE_PARENT_NAME, ((Nameable) src).name()) : rc.name();
String tname = rc.name();
if (tname.contains(RESOURCE_PARENT_NAME)) {
try {
String srcname = (String) src.getClass().getMethod("name").invoke(src);
tname = tname.replace(RESOURCE_PARENT_NAME, srcname);
} catch (Exception e) { // 获取src中的name()方法的值, 异常则忽略
logger.log(Level.SEVERE, src.getClass().getName() + " not found [public String name()] method", e);
}
}
final String rcname = tname;
Object rs = genctype == classtype ? null : find(rcname, genctype);
if (rs == null) {
if (Map.class.isAssignableFrom(classtype)) {
@@ -195,7 +204,7 @@ public final class ResourceFactory {
}
if (rs == null) {
Intercepter it = findIntercepter(field.getGenericType(), field);
if (it != null) it.invoke(this, src, field, attachment);
if (it != null) it.invoke(this, src, rcname, field, attachment);
continue;
}
if (!rs.getClass().isPrimitive() && classtype.isPrimitive()) {
@@ -239,7 +248,7 @@ public final class ResourceFactory {
public static interface Intercepter {
public void invoke(ResourceFactory factory, Object src, Field field, Object attachment);
public void invoke(ResourceFactory factory, Object src, String resourceName, Field field, Object attachment);
}
}