diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index c5574a940..bfb6d4e80 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -86,6 +86,7 @@ load: 加载文件,多个用;隔开。 默认置入的system.property.的有: System.setProperty("net.transport.pinginterval", "30"); + System.setProperty("net.transport.checkinterval", "30"); System.setProperty("convert.json.tiny", "true"); System.setProperty("convert.bson.tiny", "true"); System.setProperty("convert.json.pool.size", "128"); diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 9ad5df31b..72e60d766 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -315,7 +315,8 @@ public final class Application { }); } this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSecond, writeTimeoutSecond, strategy); - DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")); + DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")) + .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30")); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index e72842404..76cbe6c25 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -5,13 +5,14 @@ */ package org.redkale.net; +import java.io.IOException; import java.lang.ref.WeakReference; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.function.Supplier; import java.util.logging.Level; import javax.net.ssl.SSLContext; @@ -46,6 +47,8 @@ public final class Transport { supportTcpNoDelay = tcpNoDelay; } + protected final AtomicInteger seq = new AtomicInteger(-1); + protected final TransportFactory factory; protected final String name; //即的name属性 @@ -245,29 +248,29 @@ public final class Transport { } //---------------------随机取地址------------------------ - //从连接池里取 + int enablecount = 0; + final TransportAddress[] newtaddrs = new TransportAddress[taddrs.length]; for (final TransportAddress taddr : taddrs) { if (taddr.disabletime > 0) continue; - final BlockingQueue queue = taddr.conns; + newtaddrs[enablecount++] = taddr; + } + final long now = System.currentTimeMillis(); + if (enablecount > 0) { //存在可用的地址 + final TransportAddress one = newtaddrs[Math.abs(seq.incrementAndGet()) % enablecount]; + final BlockingQueue queue = one.conns; if (!queue.isEmpty()) { AsyncConnection conn; while ((conn = queue.poll()) != null) { if (conn.isOpen()) return CompletableFuture.completedFuture(conn); } } - } - //从可用/不可用的地址列表中创建连接 - AtomicInteger count = new AtomicInteger(taddrs.length); - CompletableFuture future = new CompletableFuture(); - final long now = System.currentTimeMillis(); - for (final TransportAddress taddr : taddrs) { - if (future.isDone()) return future; + CompletableFuture future = new CompletableFuture(); final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.connect(taddr.address, taddr, new CompletionHandler() { + channel.connect(one.address, one, new CompletionHandler() { @Override public void completed(Void result, TransportAddress attachment) { - taddr.disabletime = 0; + attachment.disabletime = 0; AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond); if (future.isDone()) { if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); @@ -278,23 +281,70 @@ public final class Transport { @Override public void failed(Throwable exc, TransportAddress attachment) { - taddr.disabletime = now; - if (count.decrementAndGet() < 1) { - future.completeExceptionally(exc); - } + attachment.disabletime = now; try { channel.close(); } catch (Exception e) { } + try { + pollConnection0(taddrs, one, now).whenComplete((r, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(r); + } + }); + + } catch (Exception e) { + future.completeExceptionally(e); + } } }); + return future; } - return future; + return pollConnection0(taddrs, null, now); } catch (Exception ex) { throw new RuntimeException("transport address = " + addr, ex); } } + private CompletableFuture pollConnection0(TransportAddress[] taddrs, TransportAddress exclude, long now) throws IOException { + //从可用/不可用的地址列表中创建连接 + AtomicInteger count = new AtomicInteger(taddrs.length); + CompletableFuture future = new CompletableFuture(); + for (final TransportAddress taddr : taddrs) { + if (taddr == exclude) continue; + if (future.isDone()) return future; + final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); + if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.connect(taddr.address, taddr, new CompletionHandler() { + @Override + public void completed(Void result, TransportAddress attachment) { + attachment.disabletime = 0; + AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond); + if (future.isDone()) { + if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); + } else { + future.complete(asyncConn); + } + } + + @Override + public void failed(Throwable exc, TransportAddress attachment) { + attachment.disabletime = now; + if (count.decrementAndGet() < 1) { + future.completeExceptionally(exc); + } + try { + channel.close(); + } catch (Exception e) { + } + } + }); + } + return future; + } + public void offerConnection(final boolean forceClose, AsyncConnection conn) { if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return; if (!forceClose && conn.isTCP()) { diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 383bee65c..0b70a30b5 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -7,7 +7,7 @@ package org.redkale.net; import java.io.IOException; import java.lang.ref.WeakReference; -import java.net.InetSocketAddress; +import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; @@ -22,6 +22,7 @@ import org.redkale.util.*; /** * System.getProperty("net.transport.pinginterval", "30") 心跳周期,默认30秒 + * System.getProperty("net.transport.checkinterval", "30") 检查不可用地址周期,默认30秒 * *

* 详情见: https://redkale.org @@ -38,6 +39,8 @@ public class TransportFactory { public static final String NAME_PINGINTERVAL = "pinginterval"; + public static final String NAME_CHECKINTERVAL = "checkinterval"; + protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName()); //传输端的线程池 @@ -62,14 +65,17 @@ public class TransportFactory { //心跳周期, 单位:秒 protected int pinginterval; + //检查不可用地址周期, 单位:秒 + protected int checkinterval = Integer.getInteger("net.transport.checkinterval", 30); + //TCP读取超时秒数 protected int readTimeoutSecond; //TCP写入超时秒数 protected int writeTimeoutSecond; - //ping的定时器 - private ScheduledThreadPoolExecutor pingScheduler; + //ping和检查的定时器 + private ScheduledThreadPoolExecutor scheduler; protected SSLContext sslContext; @@ -101,17 +107,23 @@ public class TransportFactory { public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) { if (conf != null) { this.pinginterval = conf.getIntValue(NAME_PINGINTERVAL, 0); + this.checkinterval = conf.getIntValue(NAME_CHECKINTERVAL, this.checkinterval); } + this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { + final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryTask-Thread"); + t.setDaemon(true); + return t; + }); + this.scheduler.scheduleAtFixedRate(() -> { + checks(); + }, checkinterval, checkinterval, TimeUnit.SECONDS); + if (this.pinginterval > 0) { - if (this.pingScheduler == null && pingBuffer != null) { + if (pingBuffer != null) { this.pingBuffer = pingBuffer.asReadOnlyBuffer(); this.pongLength = pongLength; - this.pingScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { - final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryPingTask-Thread"); - t.setDaemon(true); - return t; - }); - pingScheduler.scheduleAtFixedRate(() -> { + + scheduler.scheduleAtFixedRate(() -> { pings(); }, pinginterval, pinginterval, TimeUnit.SECONDS); } @@ -309,7 +321,7 @@ public class TransportFactory { } public void shutdownNow() { - if (this.pingScheduler != null) this.pingScheduler.shutdownNow(); + if (this.scheduler != null) this.scheduler.shutdownNow(); try { this.channelGroup.shutdownNow(); } catch (Exception e) { @@ -317,8 +329,7 @@ public class TransportFactory { } } - private void pings() { - long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000; + private void checks() { List nulllist = new ArrayList<>(); for (WeakReference ref : transportReferences) { Transport transport = ref.get(); @@ -327,6 +338,36 @@ public class TransportFactory { continue; } Transport.TransportAddress[] taddrs = transport.getTransportAddresses(); + for (final Transport.TransportAddress taddr : taddrs) { + if (taddr.disabletime < 1) continue; //可用 + try { + final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(transport.group); + channel.connect(taddr.address, taddr, new CompletionHandler() { + @Override + public void completed(Void result, Transport.TransportAddress attachment) { + attachment.disabletime = 0; + } + + @Override + public void failed(Throwable exc, Transport.TransportAddress attachment) { + attachment.disabletime = System.currentTimeMillis(); + } + }); + } catch (Exception e) { + } + } + } + for (WeakReference ref : nulllist) { + transportReferences.remove(ref); + } + } + + private void pings() { + long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000; + for (WeakReference ref : transportReferences) { + Transport transport = ref.get(); + if (transport == null) continue; + Transport.TransportAddress[] taddrs = transport.getTransportAddresses(); for (final Transport.TransportAddress taddr : taddrs) { final BlockingQueue queue = taddr.conns; AsyncConnection conn; @@ -380,9 +421,6 @@ public class TransportFactory { } } } - for (WeakReference ref : nulllist) { - transportReferences.remove(ref); - } } private static boolean checkName(String name) { //不能含特殊字符 diff --git a/test/org/redkale/test/net/TransportTest.java b/test/org/redkale/test/net/TransportTest.java index e13ade230..8f5c49350 100644 --- a/test/org/redkale/test/net/TransportTest.java +++ b/test/org/redkale/test/net/TransportTest.java @@ -7,6 +7,7 @@ package org.redkale.test.net; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.CountDownLatch; import org.redkale.net.*; import org.redkale.net.http.HttpServer; import org.redkale.net.sncp.Sncp; @@ -21,7 +22,7 @@ public class TransportTest { private static final String format = "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%tL"; public static void main(String[] args) throws Throwable { - + System.setProperty("net.transport.checkinterval", "2"); List addrs = new ArrayList<>(); addrs.add(new InetSocketAddress("127.0.0.1", 22001)); addrs.add(new InetSocketAddress("127.0.0.1", 22002)); @@ -42,13 +43,27 @@ public class TransportTest { Transport transport = factory.createTransportTCP("", null, addrs); System.out.println(String.format(format, System.currentTimeMillis())); try { - AsyncConnection firstconn = transport.pollConnection(null).join(); - System.out.println(firstconn); - if (firstconn != null) transport.offerConnection(false, firstconn); - AsyncConnection conn = transport.pollConnection(null).join(); - System.out.println(conn + "-------应该与前值相同"); - conn = transport.pollConnection(null).join(); - System.out.println(conn + "-------应该与前值不同"); + CountDownLatch cdl = new CountDownLatch(20); + for (int i = 0; i < 20; i++) { + transport.pollConnection(null).whenComplete((r, t) -> { + cdl.countDown(); + System.out.println("连接: " + r.getRemoteAddress()); + }); + } + cdl.await(); + HttpServer server = new HttpServer(); + DefaultAnyValue servconf = DefaultAnyValue.create("port", 22005); + server.init(servconf); + server.start(); + Thread.sleep(4000); + CountDownLatch cdl2 = new CountDownLatch(20); + for (int i = 0; i < 20; i++) { + transport.pollConnection(null).whenComplete((r, t) -> { + cdl2.countDown(); + System.out.println("连接: " + r.getRemoteAddress()); + }); + } + cdl2.await(); } finally { System.out.println(String.format(format, System.currentTimeMillis())); }