udp优化

This commit is contained in:
redkale
2023-02-07 16:43:47 +08:00
parent bab857778b
commit 1d6b76b182
16 changed files with 142 additions and 81 deletions

View File

@@ -6,9 +6,8 @@
package org.redkale.net; package org.redkale.net;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.redkale.util.ObjectPool; import org.redkale.util.*;
/** /**
* Client模式的AsyncConnection连接构造器 * Client模式的AsyncConnection连接构造器
@@ -22,36 +21,38 @@ import org.redkale.util.ObjectPool;
*/ */
public abstract class AsyncGroup { public abstract class AsyncGroup {
public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350);
public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(true, threadNameFormat, workExecutor, safeBufferPool);
} }
public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, safeBufferPool);
} }
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, safeBufferPool);
} }
public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, safeBufferPool);
} }
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) { public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {

View File

@@ -7,7 +7,6 @@ package org.redkale.net;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
@@ -67,24 +66,17 @@ public class AsyncIOGroup extends AsyncGroup {
} }
public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, this(clientMode, threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity));
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) {
return false;
}
e.clear();
return true;
}));
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, safeBufferPool); this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool);
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
this.bufferCapacity = bufferCapacity; this.bufferCapacity = safeBufferPool.getBufferCapacity();
this.ioReadThreads = new AsyncIOThread[threads]; this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads];
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
@@ -115,15 +107,15 @@ public class AsyncIOGroup extends AsyncGroup {
} }
} }
protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool); return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool);
} }
protected AsyncIOThread createClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { protected AsyncIOThread createClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
return new ClientReadIOThread(g, name, index, threads, workExecutor, safeBufferPool); return new ClientReadIOThread(g, name, index, threads, workExecutor, safeBufferPool);
} }
protected AsyncIOThread createClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { protected AsyncIOThread createClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool); return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool);
} }

View File

@@ -41,11 +41,11 @@ public class AsyncIOThread extends WorkThread {
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, null); super(g, name, index, threads, workExecutor, null);
this.selector = Selector.open(); this.selector = Selector.open();
this.setDaemon(true); this.setDaemon(true);
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(this, 512, safeBufferPool); ByteBufferPool unsafeBufferPool = ByteBufferPool.createUnsafePool(this, 512, safeBufferPool);
this.bufferSupplier = unsafeBufferPool; this.bufferSupplier = unsafeBufferPool;
this.bufferConsumer = unsafeBufferPool; this.bufferConsumer = unsafeBufferPool;
} }

View File

@@ -7,7 +7,6 @@ package org.redkale.net;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@@ -92,7 +91,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
LongAdder createResponseCounter = new LongAdder(); LongAdder createResponseCounter = new LongAdder();
LongAdder cycleResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder();
ObjectPool<ByteBuffer> safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ByteBufferPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
ObjectPool<Response> safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ObjectPool<Response> safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
final int respPoolMax = server.getResponsePoolSize(); final int respPoolMax = server.getResponsePoolSize();
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> { ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
@@ -119,7 +118,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
(pool == null ? safeResponsePool : pool).accept(v); (pool == null ? safeResponsePool : pool).accept(v);
}; };
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool);
this.ioGroup.start(); this.ioGroup.start();
this.acceptThread = new Thread() { this.acceptThread = new Thread() {

View File

@@ -92,7 +92,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
LongAdder createResponseCounter = new LongAdder(); LongAdder createResponseCounter = new LongAdder();
LongAdder cycleResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder();
ObjectPool<ByteBuffer> safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ByteBufferPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
ObjectPool<Response> safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ObjectPool<Response> safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> { ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
if (!(Thread.currentThread() instanceof WorkThread)) { if (!(Thread.currentThread() instanceof WorkThread)) {
@@ -110,7 +110,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
(pool == null ? safeResponsePool : pool).accept(v); (pool == null ? safeResponsePool : pool).accept(v);
}; };
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool);
this.ioGroup.start(); this.ioGroup.start();
udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ); udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ);
this.acceptThread = new Thread() { this.acceptThread = new Thread() {
@@ -120,7 +120,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
@Override @Override
public void run() { public void run() {
udpServerChannel.unsafeBufferPool = ObjectPool.createUnsafePool(Thread.currentThread(), 512, safeBufferPool); udpServerChannel.unsafeBufferPool = ByteBufferPool.createUnsafePool(Thread.currentThread(), 512, safeBufferPool);
final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads;
final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads;
final int reads = ioReadThreads.length; final int reads = ioReadThreads.length;
@@ -130,7 +130,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
Set<SelectionKey> keys = null; Set<SelectionKey> keys = null;
final Selector sel = selector; final Selector sel = selector;
final DatagramChannel serverChannel = udpServerChannel.serverChannel; final DatagramChannel serverChannel = udpServerChannel.serverChannel;
final ObjectPool<ByteBuffer> unsafeBufferPool = udpServerChannel.unsafeBufferPool; final ByteBufferPool unsafeBufferPool = udpServerChannel.unsafeBufferPool;
while (!closed) { while (!closed) {
try { try {
int count = sel.select(); int count = sel.select();
@@ -238,7 +238,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
DatagramChannel serverChannel; DatagramChannel serverChannel;
ObjectPool<ByteBuffer> unsafeBufferPool; ByteBufferPool unsafeBufferPool;
ConcurrentHashMap<SocketAddress, AsyncNioUdpConnection> connections = new ConcurrentHashMap<>(); ConcurrentHashMap<SocketAddress, AsyncNioUdpConnection> connections = new ConcurrentHashMap<>();

View File

@@ -7,13 +7,13 @@ package org.redkale.net;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.logging.*; import java.util.logging.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import static org.redkale.net.AsyncGroup.UDP_BUFFER_CAPACITY;
import org.redkale.net.Filter; import org.redkale.net.Filter;
import org.redkale.util.*; import org.redkale.util.*;
@@ -128,7 +128,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0); this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0);
this.backlog = parseLenth(config.getValue("backlog"), 1024); this.backlog = parseLenth(config.getValue("backlog"), 1024);
this.maxBody = parseLenth(config.getValue("maxbody"), "UDP".equalsIgnoreCase(netprotocol) ? 16 * 1024 : 64 * 1024); this.maxBody = parseLenth(config.getValue("maxbody"), "UDP".equalsIgnoreCase(netprotocol) ? 16 * 1024 : 64 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(netprotocol) ? 8 * 1024 : 32 * 1024); int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(netprotocol) ? UDP_BUFFER_CAPACITY : 32 * 1024);
this.bufferCapacity = "UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity); this.bufferCapacity = "UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity);
this.bufferPoolSize = config.getIntValue("bufferPoolSize", Utility.cpus() * 8); this.bufferPoolSize = config.getIntValue("bufferPoolSize", Utility.cpus() * 8);
this.responsePoolSize = config.getIntValue("responsePoolSize", 1024); this.responsePoolSize = config.getIntValue("responsePoolSize", 1024);
@@ -433,7 +433,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
} }
//必须在 createContext()之后调用 //必须在 createContext()之后调用
protected abstract ObjectPool<ByteBuffer> createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize); protected abstract ByteBufferPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize);
//必须在 createContext()之后调用 //必须在 createContext()之后调用
protected abstract ObjectPool<P> createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize); protected abstract ObjectPool<P> createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize);

View File

@@ -4,10 +4,9 @@
package org.redkale.net.client; package org.redkale.net.client;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.redkale.net.AsyncIOThread; import org.redkale.net.AsyncIOThread;
import org.redkale.util.ObjectPool; import org.redkale.util.ByteBufferPool;
/** /**
* 客户端IO读线程 * 客户端IO读线程
@@ -22,7 +21,7 @@ import org.redkale.util.ObjectPool;
public class ClientReadIOThread extends AsyncIOThread { public class ClientReadIOThread extends AsyncIOThread {
public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, public ClientReadIOThread(ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool); super(g, name, index, threads, workExecutor, safeBufferPool);
} }

View File

@@ -26,7 +26,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
private final BlockingQueue<ClientFuture> requestQueue = new LinkedBlockingQueue<>(); private final BlockingQueue<ClientFuture> requestQueue = new LinkedBlockingQueue<>();
public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool); super(g, name, index, threads, workExecutor, safeBufferPool);
} }

View File

@@ -7,7 +7,6 @@ package org.redkale.net.http;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.nio.ByteBuffer;
import java.text.*; import java.text.*;
import java.time.ZoneId; import java.time.ZoneId;
import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME;
@@ -42,7 +41,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private HttpResponseConfig respConfig; private HttpResponseConfig respConfig;
private ObjectPool<ByteBuffer> safeBufferPool; private ByteBufferPool safeBufferPool;
private final ReentrantLock groupLock = new ReentrantLock(); private final ReentrantLock groupLock = new ReentrantLock();
@@ -550,7 +549,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
groupLock.lock(); groupLock.lock();
try { try {
if (asyncGroup == null) { if (asyncGroup == null) {
WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, bufferCapacity, safeBufferPool); WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool);
g.start(); g.start();
asyncGroup = g; asyncGroup = g;
} }
@@ -572,18 +571,9 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
} }
@Override @Override
protected ObjectPool<ByteBuffer> createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { protected ByteBufferPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
final int rcapacity = this.bufferCapacity; this.safeBufferPool = ByteBufferPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, this.bufferCapacity);
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, return this.safeBufferPool;
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) {
return false;
}
e.clear();
return true;
});
this.safeBufferPool = bufferPool;
return bufferPool;
} }
@Override @Override

View File

@@ -4,10 +4,9 @@
package org.redkale.net.http; package org.redkale.net.http;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.ObjectPool; import org.redkale.util.*;
/** /**
* WebSocket只写版的AsyncIOGroup <br> * WebSocket只写版的AsyncIOGroup <br>
@@ -23,12 +22,12 @@ import org.redkale.util.ObjectPool;
*/ */
class WebSocketAsyncGroup extends AsyncIOGroup { class WebSocketAsyncGroup extends AsyncIOGroup {
public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
super(false, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); super(false, threadNameFormat, workExecutor, safeBufferPool);
} }
@Override @Override
protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool); return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool);
} }

View File

@@ -28,7 +28,7 @@ public class WebSocketWriteIOThread extends AsyncIOThread {
private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>(); private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>();
public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads, public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException { ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool); super(g, name, index, threads, workExecutor, safeBufferPool);
Objects.requireNonNull(timeoutExecutor); Objects.requireNonNull(timeoutExecutor);
this.timeoutExecutor = timeoutExecutor; this.timeoutExecutor = timeoutExecutor;

View File

@@ -5,7 +5,6 @@
*/ */
package org.redkale.net.sncp; package org.redkale.net.sncp;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import org.redkale.boot.Application; import org.redkale.boot.Application;
@@ -128,17 +127,8 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
} }
@Override @Override
protected ObjectPool<ByteBuffer> createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { protected ByteBufferPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
final int rcapacity = this.bufferCapacity; return ByteBufferPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, this.bufferCapacity);
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) {
return false;
}
e.clear();
return true;
});
return bufferPool;
} }
@Override @Override

View File

@@ -0,0 +1,88 @@
/*
*
*/
package org.redkale.util;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.LongAdder;
/**
* ByteBuffer的对象池 <br>
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*/
public class ByteBufferPool extends ObjectPool<ByteBuffer> {
private final int bufferCapacity;
protected ByteBufferPool(ObjectPool<ByteBuffer> parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, int bufferCapacity, Queue<ByteBuffer> queue) {
super(parent, creatCounter, cycleCounter, unsafeThread, max, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) {
return false;
}
e.clear();
return true;
}, queue);
this.bufferCapacity = bufferCapacity;
}
//非线程安全版
public static ByteBufferPool createUnsafePool(int max, int bufferCapacity) {
return createUnsafePool(null, null, max, bufferCapacity);
}
//非线程安全版
public static ByteBufferPool createUnsafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, int bufferCapacity) {
return createUnsafePool(null, creatCounter, cycleCounter, max, bufferCapacity);
}
//非线程安全版
public static ByteBufferPool createUnsafePool(ByteBufferPool parent, int bufferCapacity) {
return createUnsafePool(parent, 2, bufferCapacity);
}
//非线程安全版
public static ByteBufferPool createUnsafePool(ByteBufferPool parent, int max, int bufferCapacity) {
return createUnsafePool(parent, null, null, max, bufferCapacity);
}
//非线程安全版
public static ByteBufferPool createUnsafePool(ByteBufferPool parent, LongAdder creatCounter, LongAdder cycleCounter, int max, int bufferCapacity) {
return new ByteBufferPool(parent, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max), bufferCapacity, new ArrayDeque<>(Math.max(Utility.cpus(), max)));
}
//非线程安全版
public static ByteBufferPool createUnsafePool(Thread unsafeThread, int max, ByteBufferPool safePool) {
return createUnsafePool(safePool, safePool.getCreatCounter(), safePool.getCycleCounter(), unsafeThread, max, safePool.getBufferCapacity());
}
//非线程安全版
public static ByteBufferPool createUnsafePool(ByteBufferPool parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, int bufferCapacity) {
return new ByteBufferPool(parent, creatCounter, cycleCounter, unsafeThread, Math.max(Utility.cpus(), max), bufferCapacity, new ArrayDeque<>(Math.max(Utility.cpus(), max)));
}
//线程安全版
public static ByteBufferPool createSafePool(int bufferCapacity) {
return createSafePool(2, bufferCapacity);
}
//线程安全版
public static ByteBufferPool createSafePool(int max, int bufferCapacity) {
return createSafePool(null, null, max, bufferCapacity);
}
//线程安全版
public static ByteBufferPool createSafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, int bufferCapacity) {
return new ByteBufferPool(null, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max), bufferCapacity, new LinkedBlockingQueue<>(Math.max(Utility.cpus(), max)));
}
public int getBufferCapacity() {
return bufferCapacity;
}
}

View File

@@ -51,18 +51,20 @@ public class SncpClientCodecTest {
ByteArray writeArray = new ByteArray(); ByteArray writeArray = new ByteArray();
request.prepare(header, 1, "", new byte[20]); request.prepare(header, 1, "", new byte[20]);
System.out.println("request.1 = " + request); System.out.println("request.1 = " + request);
writeArray.put(new byte[SncpHeader.HEADER_SIZE]);
request.writeTo(conn, writeArray); request.writeTo(conn, writeArray);
request.prepare(header, 2, "", new byte[25]); request.prepare(header, 2, "", new byte[25]);
System.out.println("request.2 = " + request); System.out.println("request.2 = " + request);
writeArray.put(new byte[SncpHeader.HEADER_SIZE]);
request.writeTo(conn, writeArray); request.writeTo(conn, writeArray);
System.out.println(writeArray.getBytes().length); System.out.println(writeArray.getBytes().length);
realBuf = ByteBuffer.wrap(writeArray.getBytes()); realBuf = ByteBuffer.wrap(writeArray.getBytes());
} }
System.out.println("sncp.realBuf = " + realBuf.remaining()); System.out.println("sncp.realBuf = " + realBuf.remaining());
codec.decodeMessages(realBuf, new ByteArray()); codec.decodeMessages(realBuf, new ByteArray());
System.out.println("respResults.size = " + respResults.size());
if (!main) { if (!main) {
Assertions.assertEquals(2, respResults.size()); Assertions.assertEquals(2, respResults.size());
} }
System.out.println(respResults);
} }
} }

View File

@@ -32,7 +32,7 @@ public class SncpTest {
private static final String protocol = "SNCP.UDP"; private static final String protocol = "SNCP.UDP";
private static final int clientCapacity = protocol.endsWith(".UDP") ? 1350 : 8192; private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192;
private static final ResourceFactory factory = ResourceFactory.create(); private static final ResourceFactory factory = ResourceFactory.create();
@@ -94,7 +94,8 @@ public class SncpTest {
callbean = service.insert(callbean); callbean = service.insert(callbean);
System.out.println("bean " + callbean); System.out.println("bean " + callbean);
System.out.println("---------------------------------------------------"); System.out.println("---------------------------------------------------");
final int count = 10; Thread.sleep(200);
final int count = 1;
final CountDownLatch cld = new CountDownLatch(count); final CountDownLatch cld = new CountDownLatch(count);
final AtomicInteger ai = new AtomicInteger(); final AtomicInteger ai = new AtomicInteger();
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
@@ -110,7 +111,7 @@ public class SncpTest {
bean.setContent("数据: " + (k < 10 ? "0" : "") + k); bean.setContent("数据: " + (k < 10 ? "0" : "") + k);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(k).append("------"); sb.append(k).append("------");
for (int i = 0; i < 120; i++) { for (int i = 0; i < 900; i++) {
sb.append("_").append(i).append("_").append(k).append("_0123456789"); sb.append("_").append(i).append("_").append(k).append("_0123456789");
} }
bean.setContent(sb.toString()); bean.setContent(sb.toString());

View File

@@ -65,7 +65,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
@Override @Override
public String queryResult(SncpTestBean bean) { public String queryResult(SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法"); System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法");
return "result: " + bean.getId(); return "result: " + bean.getContent();
} }
public void queryResult(CompletionHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) { public void queryResult(CompletionHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) {