SNCP连接增加心跳功能

This commit is contained in:
Redkale
2017-11-10 18:04:43 +08:00
parent 7780e0090c
commit 936fe8d1ab
7 changed files with 135 additions and 22 deletions

View File

@@ -105,8 +105,8 @@ public final class Application {
//NodeServer 资源
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
//传输端的TransportFactory
final TransportFactory transportFactory;
//SNCP传输端的TransportFactory, 注意: 只给SNCP使用
final TransportFactory sncpTransportFactory;
//全局根ResourceFactory
final ResourceFactory resourceFactory = ResourceFactory.root();
@@ -292,7 +292,9 @@ public final class Application {
throw new RuntimeException(e);
}
}
this.transportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy);
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
}
@@ -301,8 +303,8 @@ public final class Application {
return resourceFactory;
}
public TransportFactory getTransportFactory() {
return transportFactory;
public TransportFactory getSncpTransportFactory() {
return sncpTransportFactory;
}
public RedkaleClassLoader getClassLoader() {
@@ -406,7 +408,7 @@ public final class Application {
} else if (type == ResourceFactory.class) {
field.set(src, res.name().equalsIgnoreCase("server") ? rf : (res.name().isEmpty() ? application.resourceFactory : null));
} else if (type == TransportFactory.class) {
field.set(src, application.transportFactory);
field.set(src, application.sncpTransportFactory);
} else if (type == NodeSncpServer.class) {
NodeServer server = null;
for (NodeServer ns : application.getNodeServers()) {
@@ -472,7 +474,7 @@ public final class Application {
final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"));
ginfo.putAddress(addr);
}
transportFactory.addGroupInfo(ginfo);
sncpTransportFactory.addGroupInfo(ginfo);
}
}
//------------------------------------------------------------------------
@@ -815,7 +817,7 @@ public final class Application {
logger.log(Level.FINER, source.getClass() + " close CacheSource erroneous", e);
}
}
this.transportFactory.shutdownNow();
this.sncpTransportFactory.shutdownNow();
}
private static int parseLenth(String value, int defValue) {

View File

@@ -109,7 +109,7 @@ public class NodeHttpServer extends NodeServer {
synchronized (regFactory) {
Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class);
if (nodeService == null) {
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
regFactory.register(resourceName, WebSocketNode.class, nodeService);
}
resourceFactory.inject(nodeService, self);

View File

@@ -111,7 +111,7 @@ public abstract class NodeServer {
if (isSNCP()) { // SNCP协议
String host = this.serverConf.getValue("host", isWATCH() ? "127.0.0.1" : "0.0.0.0").replace("0.0.0.0", "");
this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port"));
this.sncpGroup = application.transportFactory.findGroupName(this.sncpAddress);
this.sncpGroup = application.sncpTransportFactory.findGroupName(this.sncpAddress);
//单向SNCP服务不需要对等group
//if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info");
}
@@ -171,7 +171,7 @@ public abstract class NodeServer {
final NodeServer self = this;
//---------------------------------------------------------------------------------------------
final ResourceFactory appResFactory = application.getResourceFactory();
final TransportFactory appTranFactory = application.getTransportFactory();
final TransportFactory appSncpTranFactory = application.getSncpTransportFactory();
final AnyValue resources = application.config.getAnyValue("resources");
final Map<String, AnyValue> cacheResource = new HashMap<>();
final Map<String, AnyValue> dataResources = new HashMap<>();
@@ -232,7 +232,7 @@ public abstract class NodeServer {
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
Service cacheListenerService = Sncp.createLocalService(serverClassLoader, resourceName, DataCacheListenerService.class, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf((Service) src));
Service cacheListenerService = Sncp.createLocalService(serverClassLoader, resourceName, DataCacheListenerService.class, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf((Service) src));
appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
localServices.add(cacheListenerService);
sncpServer.consumerAccept(cacheListenerService);
@@ -266,11 +266,11 @@ public abstract class NodeServer {
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
Object source;
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
application.dataSources.add((DataSource) source);
appResFactory.register(resourceName, DataSource.class, source);
} else { // CacheSource
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[0];
@@ -311,7 +311,7 @@ public abstract class NodeServer {
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
final ResourceFactory appResourceFactory = application.getResourceFactory();
final TransportFactory appTransportFactory = application.getTransportFactory();
final TransportFactory appSncpTransFactory = application.getSncpTransportFactory();
for (FilterEntry<? extends Service> entry : entrys) { //service实现类
final Class<? extends Service> serviceImplClass = entry.getType();
if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过
@@ -342,9 +342,9 @@ public abstract class NodeServer {
Service service;
boolean ws = src instanceof WebSocketServlet;
if (ws || localed) { //本地模式
service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) return; //class没有可用的方法且没有标记启动优先级的 通常为BaseService

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net;
import java.lang.ref.WeakReference;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
@@ -79,6 +80,7 @@ public final class Transport {
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
this.protocol = protocol;
this.factory = factory;
factory.transportReferences.add(new WeakReference<>(this));
this.tcp = "TCP".equalsIgnoreCase(protocol);
this.group = transportChannelGroup;
this.bufferPool = transportBufferPool;

View File

@@ -9,16 +9,18 @@ import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
import java.util.stream.Collectors;
import org.redkale.service.Service;
import org.redkale.util.ObjectPool;
import org.redkale.util.*;
/**
* System.getProperty("net.transport.pinginterval", "30") 心跳周期默认30秒
*
* <p>
* 详情见: https://redkale.org
*
@@ -26,6 +28,8 @@ import org.redkale.util.ObjectPool;
*/
public class TransportFactory {
public static final String NAME_PINGINTERVAL = "pinginterval";
protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName());
//传输端的线程池
@@ -45,6 +49,20 @@ public class TransportFactory {
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
protected final List<WeakReference<Transport>> transportReferences = new CopyOnWriteArrayList<>();
//心跳周期, 单位:秒
protected int pinginterval;
//ping的定时器
private ScheduledThreadPoolExecutor pingScheduler;
//ping的内容
private ByteBuffer pingBuffer;
//pong的数据长度, 小于0表示不进行判断
protected int pongLength;
//负载均衡策略
protected final TransportStrategy strategy;
@@ -60,6 +78,26 @@ public class TransportFactory {
this(executor, bufferPool, channelGroup, null);
}
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
if (conf != null) {
this.pinginterval = conf.getIntValue(NAME_PINGINTERVAL, 0);
}
if (this.pinginterval > 0) {
if (this.pingScheduler == null && pingBuffer != null) {
this.pingBuffer = pingBuffer.asReadOnlyBuffer();
this.pongLength = pongLength;
this.pingScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryPingTask-Thread");
t.setDaemon(true);
return t;
});
pingScheduler.scheduleAtFixedRate(() -> {
pings();
}, pinginterval, pinginterval, TimeUnit.SECONDS);
}
}
}
public static TransportFactory create(int threads) {
return create(threads, threads * 2, 8 * 1024);
}
@@ -220,6 +258,7 @@ public class TransportFactory {
}
public void shutdownNow() {
if (this.pingScheduler != null) this.pingScheduler.shutdownNow();
try {
this.channelGroup.shutdownNow();
} catch (Exception e) {
@@ -227,6 +266,73 @@ public class TransportFactory {
}
}
private void pings() {
long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000;
List<WeakReference> nulllist = new ArrayList<>();
for (WeakReference<Transport> ref : transportReferences) {
Transport transport = ref.get();
if (transport == null) {
nulllist.add(ref);
continue;
}
List<BlockingQueue<AsyncConnection>> list = new ArrayList<>(transport.getAsyncConnectionPool().values());
for (final BlockingQueue<AsyncConnection> queue : list) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.getLastWriteTime() > timex && false) { //最近几秒内已经进行过IO操作
queue.offer(conn);
} else { //超过一定时间的连接需要进行ping处理
ByteBuffer sendBuffer = pingBuffer.duplicate();
final AsyncConnection localconn = conn;
final BlockingQueue<AsyncConnection> localqueue = queue;
localconn.write(sendBuffer, sendBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
localconn.write(buffer, buffer, this);
return;
}
ByteBuffer pongBuffer = bufferPool.get();
localconn.read(pongBuffer, pongBuffer, new CompletionHandler<Integer, ByteBuffer>() {
int counter = 0;
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (counter > 3) {
bufferPool.offer(attachment);
localconn.dispose();
return;
}
if (pongLength > 0 && attachment.position() < pongLength) {
counter++;
localconn.read(pongBuffer, pongBuffer, this);
return;
}
bufferPool.offer(attachment);
localqueue.offer(localconn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
localconn.dispose();
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
localconn.dispose();
}
});
}
}
}
}
for (WeakReference ref : nulllist) {
transportReferences.remove(ref);
}
}
private static boolean checkName(String name) { //不能含特殊字符
if (name.isEmpty()) return false;
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false;

View File

@@ -8,6 +8,7 @@ package org.redkale.net.sncp;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.*;
import java.util.*;
import javax.annotation.Resource;
@@ -31,6 +32,10 @@ import org.redkale.util.*;
*/
public abstract class Sncp {
public static final ByteBuffer PING_BUFFER = ByteBuffer.wrap("PING".getBytes()).asReadOnlyBuffer();
public static final ByteBuffer PONG_BUFFER = ByteBuffer.wrap("PONG".getBytes()).asReadOnlyBuffer();
static final String FIELDPREFIX = "_redkale";
static final String LOCALPREFIX = "_DynLocal";

View File

@@ -8,7 +8,6 @@ package org.redkale.net.sncp;
import org.redkale.net.PrepareServlet;
import org.redkale.util.AnyValue;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.redkale.service.Service;
import org.redkale.util.*;
@@ -23,7 +22,6 @@ public class SncpPrepareServlet extends PrepareServlet<DLong, SncpContext, SncpR
private final Object sncplock = new Object();
private static final ByteBuffer pongBuffer = ByteBuffer.wrap("PONG".getBytes()).asReadOnlyBuffer();
@Override
public void addServlet(SncpServlet servlet, Object attachment, AnyValue conf, DLong... mappings) {
@@ -69,7 +67,7 @@ public class SncpPrepareServlet extends PrepareServlet<DLong, SncpContext, SncpR
@Override
public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (request.isPing()) {
response.finish(pongBuffer.duplicate());
response.finish(Sncp.PONG_BUFFER.duplicate());
return;
}
SncpServlet servlet = (SncpServlet) mappingServlet(request.getServiceid());