Transport.pollConnection的负载均衡策略改成轮询

This commit is contained in:
Redkale
2018-03-29 20:21:31 +08:00
parent a72c689f07
commit 8c1aba5608
5 changed files with 147 additions and 42 deletions

View File

@@ -86,6 +86,7 @@
load: 加载文件,多个用;隔开。
默认置入的system.property.的有:
System.setProperty("net.transport.pinginterval", "30");
System.setProperty("net.transport.checkinterval", "30");
System.setProperty("convert.json.tiny", "true");
System.setProperty("convert.bson.tiny", "true");
System.setProperty("convert.json.pool.size", "128");

View File

@@ -315,7 +315,8 @@ public final class Application {
});
}
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSecond, writeTimeoutSecond, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"));
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);

View File

@@ -5,13 +5,14 @@
*/
package org.redkale.net;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;
import java.util.function.Supplier;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
@@ -46,6 +47,8 @@ public final class Transport {
supportTcpNoDelay = tcpNoDelay;
}
protected final AtomicInteger seq = new AtomicInteger(-1);
protected final TransportFactory factory;
protected final String name; //即<group>的name属性
@@ -245,29 +248,29 @@ public final class Transport {
}
//---------------------随机取地址------------------------
//从连接池里取
int enablecount = 0;
final TransportAddress[] newtaddrs = new TransportAddress[taddrs.length];
for (final TransportAddress taddr : taddrs) {
if (taddr.disabletime > 0) continue;
final BlockingQueue<AsyncConnection> queue = taddr.conns;
newtaddrs[enablecount++] = taddr;
}
final long now = System.currentTimeMillis();
if (enablecount > 0) { //存在可用的地址
final TransportAddress one = newtaddrs[Math.abs(seq.incrementAndGet()) % enablecount];
final BlockingQueue<AsyncConnection> queue = one.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
}
}
}
//从可用/不可用的地址列表中创建连接
AtomicInteger count = new AtomicInteger(taddrs.length);
CompletableFuture future = new CompletableFuture();
final long now = System.currentTimeMillis();
for (final TransportAddress taddr : taddrs) {
if (future.isDone()) return future;
CompletableFuture future = new CompletableFuture();
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(taddr.address, taddr, new CompletionHandler<Void, TransportAddress>() {
channel.connect(one.address, one, new CompletionHandler<Void, TransportAddress>() {
@Override
public void completed(Void result, TransportAddress attachment) {
taddr.disabletime = 0;
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
@@ -278,23 +281,70 @@ public final class Transport {
@Override
public void failed(Throwable exc, TransportAddress attachment) {
taddr.disabletime = now;
if (count.decrementAndGet() < 1) {
future.completeExceptionally(exc);
}
attachment.disabletime = now;
try {
channel.close();
} catch (Exception e) {
}
try {
pollConnection0(taddrs, one, now).whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(r);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
}
});
return future;
}
return future;
return pollConnection0(taddrs, null, now);
} catch (Exception ex) {
throw new RuntimeException("transport address = " + addr, ex);
}
}
private CompletableFuture<AsyncConnection> pollConnection0(TransportAddress[] taddrs, TransportAddress exclude, long now) throws IOException {
//从可用/不可用的地址列表中创建连接
AtomicInteger count = new AtomicInteger(taddrs.length);
CompletableFuture future = new CompletableFuture();
for (final TransportAddress taddr : taddrs) {
if (taddr == exclude) continue;
if (future.isDone()) return future;
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(taddr.address, taddr, new CompletionHandler<Void, TransportAddress>() {
@Override
public void completed(Void result, TransportAddress attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
}
@Override
public void failed(Throwable exc, TransportAddress attachment) {
attachment.disabletime = now;
if (count.decrementAndGet() < 1) {
future.completeExceptionally(exc);
}
try {
channel.close();
} catch (Exception e) {
}
}
});
}
return future;
}
public void offerConnection(final boolean forceClose, AsyncConnection conn) {
if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return;
if (!forceClose && conn.isTCP()) {

View File

@@ -7,7 +7,7 @@ package org.redkale.net;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
@@ -22,6 +22,7 @@ import org.redkale.util.*;
/**
* System.getProperty("net.transport.pinginterval", "30") 心跳周期默认30秒
* System.getProperty("net.transport.checkinterval", "30") 检查不可用地址周期默认30秒
*
* <p>
* 详情见: https://redkale.org
@@ -38,6 +39,8 @@ public class TransportFactory {
public static final String NAME_PINGINTERVAL = "pinginterval";
public static final String NAME_CHECKINTERVAL = "checkinterval";
protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName());
//传输端的线程池
@@ -62,14 +65,17 @@ public class TransportFactory {
//心跳周期, 单位:秒
protected int pinginterval;
//检查不可用地址周期, 单位:秒
protected int checkinterval = Integer.getInteger("net.transport.checkinterval", 30);
//TCP读取超时秒数
protected int readTimeoutSecond;
//TCP写入超时秒数
protected int writeTimeoutSecond;
//ping的定时器
private ScheduledThreadPoolExecutor pingScheduler;
//ping和检查的定时器
private ScheduledThreadPoolExecutor scheduler;
protected SSLContext sslContext;
@@ -101,17 +107,23 @@ public class TransportFactory {
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
if (conf != null) {
this.pinginterval = conf.getIntValue(NAME_PINGINTERVAL, 0);
this.checkinterval = conf.getIntValue(NAME_CHECKINTERVAL, this.checkinterval);
}
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryTask-Thread");
t.setDaemon(true);
return t;
});
this.scheduler.scheduleAtFixedRate(() -> {
checks();
}, checkinterval, checkinterval, TimeUnit.SECONDS);
if (this.pinginterval > 0) {
if (this.pingScheduler == null && pingBuffer != null) {
if (pingBuffer != null) {
this.pingBuffer = pingBuffer.asReadOnlyBuffer();
this.pongLength = pongLength;
this.pingScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryPingTask-Thread");
t.setDaemon(true);
return t;
});
pingScheduler.scheduleAtFixedRate(() -> {
scheduler.scheduleAtFixedRate(() -> {
pings();
}, pinginterval, pinginterval, TimeUnit.SECONDS);
}
@@ -309,7 +321,7 @@ public class TransportFactory {
}
public void shutdownNow() {
if (this.pingScheduler != null) this.pingScheduler.shutdownNow();
if (this.scheduler != null) this.scheduler.shutdownNow();
try {
this.channelGroup.shutdownNow();
} catch (Exception e) {
@@ -317,8 +329,7 @@ public class TransportFactory {
}
}
private void pings() {
long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000;
private void checks() {
List<WeakReference> nulllist = new ArrayList<>();
for (WeakReference<Transport> ref : transportReferences) {
Transport transport = ref.get();
@@ -327,6 +338,36 @@ public class TransportFactory {
continue;
}
Transport.TransportAddress[] taddrs = transport.getTransportAddresses();
for (final Transport.TransportAddress taddr : taddrs) {
if (taddr.disabletime < 1) continue; //可用
try {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(transport.group);
channel.connect(taddr.address, taddr, new CompletionHandler<Void, Transport.TransportAddress>() {
@Override
public void completed(Void result, Transport.TransportAddress attachment) {
attachment.disabletime = 0;
}
@Override
public void failed(Throwable exc, Transport.TransportAddress attachment) {
attachment.disabletime = System.currentTimeMillis();
}
});
} catch (Exception e) {
}
}
}
for (WeakReference ref : nulllist) {
transportReferences.remove(ref);
}
}
private void pings() {
long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000;
for (WeakReference<Transport> ref : transportReferences) {
Transport transport = ref.get();
if (transport == null) continue;
Transport.TransportAddress[] taddrs = transport.getTransportAddresses();
for (final Transport.TransportAddress taddr : taddrs) {
final BlockingQueue<AsyncConnection> queue = taddr.conns;
AsyncConnection conn;
@@ -380,9 +421,6 @@ public class TransportFactory {
}
}
}
for (WeakReference ref : nulllist) {
transportReferences.remove(ref);
}
}
private static boolean checkName(String name) { //不能含特殊字符

View File

@@ -7,6 +7,7 @@ package org.redkale.test.net;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import org.redkale.net.*;
import org.redkale.net.http.HttpServer;
import org.redkale.net.sncp.Sncp;
@@ -21,7 +22,7 @@ public class TransportTest {
private static final String format = "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%tL";
public static void main(String[] args) throws Throwable {
System.setProperty("net.transport.checkinterval", "2");
List<InetSocketAddress> addrs = new ArrayList<>();
addrs.add(new InetSocketAddress("127.0.0.1", 22001));
addrs.add(new InetSocketAddress("127.0.0.1", 22002));
@@ -42,13 +43,27 @@ public class TransportTest {
Transport transport = factory.createTransportTCP("", null, addrs);
System.out.println(String.format(format, System.currentTimeMillis()));
try {
AsyncConnection firstconn = transport.pollConnection(null).join();
System.out.println(firstconn);
if (firstconn != null) transport.offerConnection(false, firstconn);
AsyncConnection conn = transport.pollConnection(null).join();
System.out.println(conn + "-------应该与前值相同");
conn = transport.pollConnection(null).join();
System.out.println(conn + "-------应该与前值不同");
CountDownLatch cdl = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
transport.pollConnection(null).whenComplete((r, t) -> {
cdl.countDown();
System.out.println("连接: " + r.getRemoteAddress());
});
}
cdl.await();
HttpServer server = new HttpServer();
DefaultAnyValue servconf = DefaultAnyValue.create("port", 22005);
server.init(servconf);
server.start();
Thread.sleep(4000);
CountDownLatch cdl2 = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
transport.pollConnection(null).whenComplete((r, t) -> {
cdl2.countDown();
System.out.println("连接: " + r.getRemoteAddress());
});
}
cdl2.await();
} finally {
System.out.println(String.format(format, System.currentTimeMillis()));
}