diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index a71ad183b..f186eddf6 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -105,8 +105,8 @@ public final class Application { //NodeServer 资源 final List servers = new CopyOnWriteArrayList<>(); - //传输端的TransportFactory - final TransportFactory transportFactory; + //SNCP传输端的TransportFactory, 注意: 只给SNCP使用 + final TransportFactory sncpTransportFactory; //全局根ResourceFactory final ResourceFactory resourceFactory = ResourceFactory.root(); @@ -292,7 +292,9 @@ public final class Application { throw new RuntimeException(e); } } - this.transportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy); + this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy); + DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")); + this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); } @@ -301,8 +303,8 @@ public final class Application { return resourceFactory; } - public TransportFactory getTransportFactory() { - return transportFactory; + public TransportFactory getSncpTransportFactory() { + return sncpTransportFactory; } public RedkaleClassLoader getClassLoader() { @@ -406,7 +408,7 @@ public final class Application { } else if (type == ResourceFactory.class) { field.set(src, res.name().equalsIgnoreCase("server") ? rf : (res.name().isEmpty() ? application.resourceFactory : null)); } else if (type == TransportFactory.class) { - field.set(src, application.transportFactory); + field.set(src, application.sncpTransportFactory); } else if (type == NodeSncpServer.class) { NodeServer server = null; for (NodeServer ns : application.getNodeServers()) { @@ -472,7 +474,7 @@ public final class Application { final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port")); ginfo.putAddress(addr); } - transportFactory.addGroupInfo(ginfo); + sncpTransportFactory.addGroupInfo(ginfo); } } //------------------------------------------------------------------------ @@ -815,7 +817,7 @@ public final class Application { logger.log(Level.FINER, source.getClass() + " close CacheSource erroneous", e); } } - this.transportFactory.shutdownNow(); + this.sncpTransportFactory.shutdownNow(); } private static int parseLenth(String value, int defValue) { diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 9e2d5b42e..b1dccba90 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -109,7 +109,7 @@ public class NodeHttpServer extends NodeServer { synchronized (regFactory) { Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class); if (nodeService == null) { - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); } resourceFactory.inject(nodeService, self); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 93bff70f7..aabde1098 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -111,7 +111,7 @@ public abstract class NodeServer { if (isSNCP()) { // SNCP协议 String host = this.serverConf.getValue("host", isWATCH() ? "127.0.0.1" : "0.0.0.0").replace("0.0.0.0", ""); this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port")); - this.sncpGroup = application.transportFactory.findGroupName(this.sncpAddress); + this.sncpGroup = application.sncpTransportFactory.findGroupName(this.sncpAddress); //单向SNCP服务不需要对等group //if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found info"); } @@ -171,7 +171,7 @@ public abstract class NodeServer { final NodeServer self = this; //--------------------------------------------------------------------------------------------- final ResourceFactory appResFactory = application.getResourceFactory(); - final TransportFactory appTranFactory = application.getTransportFactory(); + final TransportFactory appSncpTranFactory = application.getSncpTransportFactory(); final AnyValue resources = application.config.getAnyValue("resources"); final Map cacheResource = new HashMap<>(); final Map dataResources = new HashMap<>(); @@ -232,7 +232,7 @@ public abstract class NodeServer { final Set groups = new HashSet<>(); if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup()); if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups()); - Service cacheListenerService = Sncp.createLocalService(serverClassLoader, resourceName, DataCacheListenerService.class, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf((Service) src)); + Service cacheListenerService = Sncp.createLocalService(serverClassLoader, resourceName, DataCacheListenerService.class, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf((Service) src)); appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService); localServices.add(cacheListenerService); sncpServer.consumerAccept(cacheListenerService); @@ -266,11 +266,11 @@ public abstract class NodeServer { final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value")); Object source; if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource - source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); + source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); application.dataSources.add((DataSource) source); appResFactory.register(resourceName, DataSource.class, source); } else { // CacheSource - source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); + source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; Type valType = pt == null ? null : pt.getActualTypeArguments()[0]; @@ -311,7 +311,7 @@ public abstract class NodeServer { final Set> entrys = (Set) serviceFilter.getAllFilterEntrys(); ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory; final ResourceFactory appResourceFactory = application.getResourceFactory(); - final TransportFactory appTransportFactory = application.getTransportFactory(); + final TransportFactory appSncpTransFactory = application.getSncpTransportFactory(); for (FilterEntry entry : entrys) { //service实现类 final Class serviceImplClass = entry.getType(); if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过 @@ -342,9 +342,9 @@ public abstract class NodeServer { Service service; boolean ws = src instanceof WebSocketServlet; if (ws || localed) { //本地模式 - service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); + service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } else { - service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); + service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) return; //class没有可用的方法且没有标记启动优先级的, 通常为BaseService diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 4c994e1bc..632a2ab94 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -5,6 +5,7 @@ */ package org.redkale.net; +import java.lang.ref.WeakReference; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; @@ -79,6 +80,7 @@ public final class Transport { this.subprotocol = subprotocol == null ? "" : subprotocol.trim(); this.protocol = protocol; this.factory = factory; + factory.transportReferences.add(new WeakReference<>(this)); this.tcp = "TCP".equalsIgnoreCase(protocol); this.group = transportChannelGroup; this.bufferPool = transportBufferPool; diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 1b06b2e43..e80706950 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -9,16 +9,18 @@ import java.io.IOException; import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.*; import java.util.stream.Collectors; import org.redkale.service.Service; -import org.redkale.util.ObjectPool; +import org.redkale.util.*; /** + * System.getProperty("net.transport.pinginterval", "30") 心跳周期,默认30秒 + * *

* 详情见: https://redkale.org * @@ -26,6 +28,8 @@ import org.redkale.util.ObjectPool; */ public class TransportFactory { + public static final String NAME_PINGINTERVAL = "pinginterval"; + protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName()); //传输端的线程池 @@ -45,6 +49,20 @@ public class TransportFactory { protected final List> services = new CopyOnWriteArrayList<>(); + protected final List> transportReferences = new CopyOnWriteArrayList<>(); + + //心跳周期, 单位:秒 + protected int pinginterval; + + //ping的定时器 + private ScheduledThreadPoolExecutor pingScheduler; + + //ping的内容 + private ByteBuffer pingBuffer; + + //pong的数据长度, 小于0表示不进行判断 + protected int pongLength; + //负载均衡策略 protected final TransportStrategy strategy; @@ -60,6 +78,26 @@ public class TransportFactory { this(executor, bufferPool, channelGroup, null); } + public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) { + if (conf != null) { + this.pinginterval = conf.getIntValue(NAME_PINGINTERVAL, 0); + } + if (this.pinginterval > 0) { + if (this.pingScheduler == null && pingBuffer != null) { + this.pingBuffer = pingBuffer.asReadOnlyBuffer(); + this.pongLength = pongLength; + this.pingScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { + final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryPingTask-Thread"); + t.setDaemon(true); + return t; + }); + pingScheduler.scheduleAtFixedRate(() -> { + pings(); + }, pinginterval, pinginterval, TimeUnit.SECONDS); + } + } + } + public static TransportFactory create(int threads) { return create(threads, threads * 2, 8 * 1024); } @@ -220,6 +258,7 @@ public class TransportFactory { } public void shutdownNow() { + if (this.pingScheduler != null) this.pingScheduler.shutdownNow(); try { this.channelGroup.shutdownNow(); } catch (Exception e) { @@ -227,6 +266,73 @@ public class TransportFactory { } } + private void pings() { + long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000; + List nulllist = new ArrayList<>(); + for (WeakReference ref : transportReferences) { + Transport transport = ref.get(); + if (transport == null) { + nulllist.add(ref); + continue; + } + List> list = new ArrayList<>(transport.getAsyncConnectionPool().values()); + for (final BlockingQueue queue : list) { + AsyncConnection conn; + while ((conn = queue.poll()) != null) { + if (conn.getLastWriteTime() > timex && false) { //最近几秒内已经进行过IO操作 + queue.offer(conn); + } else { //超过一定时间的连接需要进行ping处理 + ByteBuffer sendBuffer = pingBuffer.duplicate(); + final AsyncConnection localconn = conn; + final BlockingQueue localqueue = queue; + localconn.write(sendBuffer, sendBuffer, new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer buffer) { + if (buffer.hasRemaining()) { + localconn.write(buffer, buffer, this); + return; + } + ByteBuffer pongBuffer = bufferPool.get(); + localconn.read(pongBuffer, pongBuffer, new CompletionHandler() { + int counter = 0; + + @Override + public void completed(Integer result, ByteBuffer attachment) { + if (counter > 3) { + bufferPool.offer(attachment); + localconn.dispose(); + return; + } + if (pongLength > 0 && attachment.position() < pongLength) { + counter++; + localconn.read(pongBuffer, pongBuffer, this); + return; + } + bufferPool.offer(attachment); + localqueue.offer(localconn); + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + localconn.dispose(); + } + }); + } + + @Override + public void failed(Throwable exc, ByteBuffer buffer) { + localconn.dispose(); + } + }); + } + } + } + } + for (WeakReference ref : nulllist) { + transportReferences.remove(ref); + } + } + private static boolean checkName(String name) { //不能含特殊字符 if (name.isEmpty()) return false; if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false; diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 04e6cca88..edfdb77d3 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -8,6 +8,7 @@ package org.redkale.net.sncp; import java.lang.annotation.Annotation; import java.lang.reflect.*; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.security.*; import java.util.*; import javax.annotation.Resource; @@ -31,6 +32,10 @@ import org.redkale.util.*; */ public abstract class Sncp { + public static final ByteBuffer PING_BUFFER = ByteBuffer.wrap("PING".getBytes()).asReadOnlyBuffer(); + + public static final ByteBuffer PONG_BUFFER = ByteBuffer.wrap("PONG".getBytes()).asReadOnlyBuffer(); + static final String FIELDPREFIX = "_redkale"; static final String LOCALPREFIX = "_DynLocal"; diff --git a/src/org/redkale/net/sncp/SncpPrepareServlet.java b/src/org/redkale/net/sncp/SncpPrepareServlet.java index 7e828df4d..5e991f2b4 100644 --- a/src/org/redkale/net/sncp/SncpPrepareServlet.java +++ b/src/org/redkale/net/sncp/SncpPrepareServlet.java @@ -8,7 +8,6 @@ package org.redkale.net.sncp; import org.redkale.net.PrepareServlet; import org.redkale.util.AnyValue; import java.io.IOException; -import java.nio.ByteBuffer; import org.redkale.service.Service; import org.redkale.util.*; @@ -23,7 +22,6 @@ public class SncpPrepareServlet extends PrepareServlet