UDP优化

This commit is contained in:
redkale
2023-01-31 10:24:52 +08:00
parent e47abd6417
commit e55d991bce
7 changed files with 43 additions and 14 deletions

View File

@@ -30,14 +30,30 @@ public abstract class AsyncGroup {
return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
} }
public static AsyncGroup create(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
} }
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
}
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool);
}
public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
}
public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool);
}
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) { public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {
return createTCPClient(address, 0, 0); return createTCPClient(address, 0, 0);
} }

View File

@@ -59,11 +59,15 @@ public class AsyncIOGroup extends AsyncGroup {
protected final ScheduledThreadPoolExecutor timeoutExecutor; protected final ScheduledThreadPoolExecutor timeoutExecutor;
public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) {
this(true, "Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize); this(true, "Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize);
} }
public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(clientMode, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize);
}
public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) {
return false; return false;
@@ -75,8 +79,12 @@ public class AsyncIOGroup extends AsyncGroup {
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, safeBufferPool);
}
@SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
this.bufferCapacity = bufferCapacity; this.bufferCapacity = bufferCapacity;
final int threads = Utility.cpus();
this.ioReadThreads = new AsyncIOThread[threads]; this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads];
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));

View File

@@ -61,10 +61,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
} }
if (options.contains(StandardSocketOptions.SO_RCVBUF)) { if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
} }
if (options.contains(StandardSocketOptions.SO_SNDBUF)) { if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
} }
} }

View File

@@ -60,10 +60,10 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
} }
if (options.contains(StandardSocketOptions.SO_RCVBUF)) { if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
} }
if (options.contains(StandardSocketOptions.SO_SNDBUF)) { if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
} }
} }

View File

@@ -127,7 +127,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.readTimeoutSeconds = config.getIntValue("readTimeoutSeconds", 0); this.readTimeoutSeconds = config.getIntValue("readTimeoutSeconds", 0);
this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0); this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0);
this.backlog = parseLenth(config.getValue("backlog"), 1024); this.backlog = parseLenth(config.getValue("backlog"), 1024);
this.maxBody = parseLenth(config.getValue("maxbody"), 64 * 1024); this.maxBody = parseLenth(config.getValue("maxbody"), "UDP".equalsIgnoreCase(netprotocol) ? 16 * 1024 : 64 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(netprotocol) ? 1350 : 32 * 1024); int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(netprotocol) ? 1350 : 32 * 1024);
this.bufferCapacity = "UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity); this.bufferCapacity = "UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity);
this.bufferPoolSize = config.getIntValue("bufferPoolSize", Utility.cpus() * 8); this.bufferPoolSize = config.getIntValue("bufferPoolSize", Utility.cpus() * 8);
@@ -212,6 +212,9 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
if (value % 1024 == 0) { if (value % 1024 == 0) {
return value / (1024) + "K"; return value / (1024) + "K";
} }
if (value >= 1000) {
return "" + value;
}
return value + "B"; return value + "B";
} }

View File

@@ -118,8 +118,9 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected SncpContext createContext() { protected SncpContext createContext() {
this.bufferCapacity = Math.max(this.bufferCapacity, 8 * 1024); if (!"UDP".equalsIgnoreCase(netprotocol)) {
this.bufferCapacity = Math.max(this.bufferCapacity, 8 * 1024);
}
final SncpContextConfig contextConfig = new SncpContextConfig(); final SncpContextConfig contextConfig = new SncpContextConfig();
initContextConfig(contextConfig); initContextConfig(contextConfig);

View File

@@ -12,7 +12,7 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import org.redkale.boot.LoggingBaseHandler; import org.redkale.boot.*;
import org.redkale.convert.bson.*; import org.redkale.convert.bson.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
@@ -31,13 +31,14 @@ public class SncpTest {
private static int port2 = 4240; private static int port2 = 4240;
private static final String protocol = "SNCP.TCP"; private static final String protocol = "SNCP.UDP";
private static final ResourceFactory factory = ResourceFactory.create(); private static final ResourceFactory factory = ResourceFactory.create();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
LoggingBaseHandler.initDebugLogConfig(); LoggingBaseHandler.initDebugLogConfig();
factory.register("", BsonConvert.class, BsonFactory.root().getConvert()); factory.register("", BsonConvert.class, BsonFactory.root().getConvert());
factory.register("", Application.class, Application.create(true));
if (System.getProperty("client") == null) { if (System.getProperty("client") == null) {
runServer(); runServer();
if (port2 > 0) { if (port2 > 0) {