优化sncp.udp

This commit is contained in:
redkale
2023-02-07 14:08:13 +08:00
parent 89f9baee46
commit 35fddb48de
11 changed files with 97 additions and 56 deletions

View File

@@ -158,7 +158,7 @@
threads【已废弃】 线程数, 默认: CPU核数*2最小8个【已废弃 @since 2.3.0】
maxconns 最大连接数, 小于1表示无限制 默认: 0
maxbody: request.body最大值 默认: 64K
bufferCapacity: ByteBuffer的初始化大小 TCP默认: 32K; (HTTP 2.0、WebSocket必须要16k以上); UDP默认: 1350B
bufferCapacity: ByteBuffer的初始化大小 TCP默认: 32K; (HTTP 2.0、WebSocket必须要16k以上); UDP默认: 8K
bufferPoolSize ByteBuffer池的大小默认: 线程数*4
responsePoolSize Response池的大小默认: 1024
aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive

View File

@@ -139,6 +139,12 @@ public class BsonWriter extends Writer implements ByteTuple {
content[count++] = ch;
}
//类似writeTo(new byte[length])
public void writePlaceholderTo(final int length) {
expand(length);
count += length;
}
public final void writeTo(final byte... chs) {
writeTo(chs, 0, chs.length);
}

View File

@@ -10,7 +10,9 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.net.ssl.SSLContext;
import org.redkale.net.AsyncNioUdpProtocolServer.AsyncNioUdpServerChannel;
/**
*
@@ -23,6 +25,10 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
private final DatagramChannel channel;
private final ConcurrentLinkedDeque<ByteBuffer> revbufferQueue = new ConcurrentLinkedDeque<>();
AsyncNioUdpServerChannel udpServerChannel;
public AsyncNioUdpConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) {
super(clientMode, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
@@ -117,10 +123,21 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
if (clientMode) {
return this.channel.read(dst);
} else {
return 0;
ByteBuffer buf = revbufferQueue.poll();
if (buf == null) {
return 0;
}
int start = dst.position();
dst.put(buf);
return dst.position() - start;
}
}
void receiveBuffer(ByteBuffer buf) {
revbufferQueue.offer(buf.flip());
doRead(true);
}
@Override
protected int implWrite(ByteBuffer src) throws IOException {
return this.channel.send(src, remoteAddress);

View File

@@ -10,6 +10,7 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.*;
import java.util.logging.Level;
@@ -26,7 +27,7 @@ import org.redkale.util.*;
*/
class AsyncNioUdpProtocolServer extends ProtocolServer {
private DatagramChannel serverChannel;
private AsyncNioUdpServerChannel udpServerChannel;
private Selector selector;
@@ -46,40 +47,41 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
@Override
public void open(AnyValue config) throws IOException {
this.serverChannel = DatagramChannel.open();
this.serverChannel.configureBlocking(false);
DatagramChannel serverChannel = DatagramChannel.open();
this.udpServerChannel = new AsyncNioUdpServerChannel(serverChannel);
serverChannel.configureBlocking(false);
this.selector = Selector.open();
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
final Set<SocketOption<?>> options = serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local);
udpServerChannel.serverChannel.bind(local);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
udpServerChannel.serverChannel.setOption(name, value);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
return udpServerChannel.serverChannel.supportedOptions();
}
@Override
@@ -110,7 +112,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool);
this.ioGroup.start();
this.serverChannel.register(this.selector, SelectionKey.OP_READ);
udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ);
this.acceptThread = new Thread() {
{
setName(String.format(threadNameFormat, "Accept"));
@@ -126,6 +128,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
int writeIndex = -1;
Set<SelectionKey> keys = null;
final Selector sel = selector;
final DatagramChannel serverChannel = udpServerChannel.serverChannel;
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool);
while (!closed) {
try {
@@ -141,14 +144,19 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
final ByteBuffer buffer = unsafeBufferPool.get();
try {
SocketAddress address = serverChannel.receive(buffer);
buffer.flip();
serverChannel.register(sel, SelectionKey.OP_READ);
if (++readIndex >= reads) {
readIndex = 0;
}
if (++writeIndex >= writes) {
writeIndex = 0;
}
accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
AsyncNioUdpConnection conn = udpServerChannel.connections.get(address);
if (conn == null) {
accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
} else {
conn.receiveBuffer(buffer);
}
} catch (Throwable t) {
unsafeBufferPool.accept(buffer);
}
@@ -169,9 +177,12 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
private void accept(SocketAddress address, ByteBuffer buffer, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
ioGroup.connCreateCounter.increment();
ioGroup.connLivingCounter.increment();
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address);
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, udpServerChannel.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address);
conn.udpServerChannel = udpServerChannel;
udpServerChannel.connections.put(address, conn);
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
conn.protocolCodec = codec;
buffer.flip();
if (conn.sslEngine == null) {
codec.start(buffer);
} else {
@@ -189,7 +200,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
@Override
public SocketAddress getLocalAddress() throws IOException {
return this.serverChannel.getLocalAddress();
return udpServerChannel.serverChannel.getLocalAddress();
}
@Override
@@ -199,7 +210,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
}
this.closed = true;
this.ioGroup.close();
this.serverChannel.close();
udpServerChannel.serverChannel.close();
}
@Override
@@ -221,4 +232,16 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
public long getLivingConnectionCount() {
return -1;
}
static class AsyncNioUdpServerChannel {
DatagramChannel serverChannel;
ConcurrentHashMap<SocketAddress, AsyncNioUdpConnection> connections = new ConcurrentHashMap<>();
public AsyncNioUdpServerChannel(DatagramChannel serverChannel) {
this.serverChannel = serverChannel;
}
}
}

View File

@@ -128,7 +128,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0);
this.backlog = parseLenth(config.getValue("backlog"), 1024);
this.maxBody = parseLenth(config.getValue("maxbody"), "UDP".equalsIgnoreCase(netprotocol) ? 16 * 1024 : 64 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(netprotocol) ? 1350 : 32 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(netprotocol) ? 8 * 1024 : 32 * 1024);
this.bufferCapacity = "UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity);
this.bufferPoolSize = config.getIntValue("bufferPoolSize", Utility.cpus() * 8);
this.responsePoolSize = config.getIntValue("responsePoolSize", 1024);

View File

@@ -19,7 +19,7 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.bson.*;
import org.redkale.net.sncp.SncpAsyncHandler.DefaultSncpAsyncHandler;
import static org.redkale.net.sncp.SncpRequest.DEFAULT_HEADER;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.service.Service;
import org.redkale.util.*;
@@ -106,7 +106,7 @@ public final class SncpDynServlet extends SncpServlet {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else {
BsonWriter out = action.convert.pollBsonWriter();
out.writeTo(DEFAULT_HEADER);
out.writePlaceholderTo(HEADER_SIZE);
BsonReader in = action.convert.pollBsonReader();
SncpAsyncHandler handler = null;
try {

View File

@@ -19,7 +19,6 @@ import org.redkale.mq.*;
import org.redkale.net.*;
import org.redkale.net.sncp.Sncp.SncpDyn;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import static org.redkale.net.sncp.SncpRequest.*;
import org.redkale.net.sncp.SncpServiceInfo.SncpServiceAction;
import org.redkale.service.*;
import org.redkale.source.*;
@@ -226,7 +225,7 @@ public final class SncpOldClient {
bsonConvert = BsonConvert.root();
}
final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入
writer.writeTo(DEFAULT_HEADER);
writer.writePlaceholderTo(HEADER_SIZE);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
BsonConvert bcc = bsonConvert;
if (params[i] instanceof org.redkale.service.RetResult) {

View File

@@ -7,8 +7,8 @@ package org.redkale.net.sncp;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.logging.*;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.net.Request;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.Uint128;
@@ -22,8 +22,6 @@ import org.redkale.util.Uint128;
*/
public class SncpRequest extends Request<SncpContext> {
public static final byte[] DEFAULT_HEADER = new byte[HEADER_SIZE];
protected static final int READ_STATE_ROUTE = 1;
protected static final int READ_STATE_HEADER = 2;
@@ -32,8 +30,6 @@ public class SncpRequest extends Request<SncpContext> {
protected static final int READ_STATE_END = 4;
protected final BsonConvert convert;
protected int readState = READ_STATE_ROUTE;
private SncpHeader header;
@@ -46,7 +42,6 @@ public class SncpRequest extends Request<SncpContext> {
protected SncpRequest(SncpContext context) {
super(context);
this.convert = context.getBsonConvert();
}
@Override //request.header与response.header数据格式保持一致
@@ -79,12 +74,14 @@ public class SncpRequest extends Request<SncpContext> {
}
return 0;
}
int len = Math.min(bodyLength, buffer.remaining());
buffer.get(body, 0, len);
this.bodyOffset = len;
int rs = bodyLength - len;
int len = Math.min(bodyLength - this.bodyOffset, buffer.remaining());
buffer.get(body, this.bodyOffset, len);
this.bodyOffset += len;
int rs = bodyLength - this.bodyOffset;
if (rs == 0) {
this.readState = READ_STATE_END;
} else {
buffer.clear();
}
return rs;
}
@@ -107,7 +104,7 @@ public class SncpRequest extends Request<SncpContext> {
@Override
public String toString() {
return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this) + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
}
@Override

View File

@@ -92,6 +92,7 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finish(RETCODE_THROWEXCEPTION, null);
}
//调用此方法时out已写入SncpHeader
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final ByteArray buffer = new ByteArray(HEADER_SIZE);
@@ -99,9 +100,8 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finish(buffer);
return;
}
final int respBodyLength = out.count(); //body总长度
final ByteArray array = out.toByteArray();
fillHeader(array, respBodyLength - HEADER_SIZE, retcode);
fillHeader(array, array.length() - HEADER_SIZE, retcode);
finish(array);
}

View File

@@ -7,11 +7,10 @@ package org.redkale.test.sncp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.boot.*;
import org.redkale.convert.bson.*;
import org.redkale.net.*;
@@ -33,6 +32,8 @@ public class SncpTest {
private static final String protocol = "SNCP.UDP";
private static final int clientCapacity = protocol.endsWith(".UDP") ? 1350 : 8192;
private static final ResourceFactory factory = ResourceFactory.create();
public static void main(String[] args) throws Exception {
@@ -64,17 +65,6 @@ public class SncpTest {
return AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1);
}
public static ObjectPool<ByteBuffer> newBufferPool() {
return ObjectPool.createSafePool(new LongAdder(), new LongAdder(), 16,
(Object... params) -> ByteBuffer.allocateDirect(8192), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != 8192) {
return false;
}
e.clear();
return true;
});
}
private static void runClient() throws Exception {
InetSocketAddress addr = new InetSocketAddress(myhost, port);
Set<InetSocketAddress> set = new LinkedHashSet<>();
@@ -82,7 +72,7 @@ public class SncpTest {
if (port2 > 0) {
set.add(new InetSocketAddress(myhost, port2));
}
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(clientCapacity, 16);
asyncGroup.start();
final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0);
transFactory.addGroupInfo("client", set);
@@ -120,7 +110,7 @@ public class SncpTest {
bean.setContent("数据: " + (k < 10 ? "0" : "") + k);
StringBuilder sb = new StringBuilder();
sb.append(k).append("------");
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 120; i++) {
sb.append("_").append(i).append("_").append(k).append("_0123456789");
}
bean.setContent(sb.toString());
@@ -139,6 +129,10 @@ public class SncpTest {
}
cld.await();
System.out.println("---并发" + count + "次耗时: " + (System.currentTimeMillis() - s) / 1000.0 + "s");
if (count == 1) {
System.exit(0);
return;
}
final CountDownLatch cld2 = new CountDownLatch(1);
long s2 = System.currentTimeMillis();
final CompletableFuture<String> future = service.queryResultAsync(callbean);
@@ -194,7 +188,7 @@ public class SncpTest {
private static void runServer2() throws Exception {
InetSocketAddress addr = new InetSocketAddress(myhost, port2);
final CountDownLatch cdl = new CountDownLatch(1);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8196, 16);
asyncGroup.start();
new Thread() {
{

View File

@@ -57,16 +57,21 @@ public class SncpTestServiceImpl implements SncpTestIService {
return bean;
}
public SncpTestBean expand(SncpTestBean bean) {
bean.setId(System.currentTimeMillis());
return bean;
}
@Override
public String queryResult(SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法");
return "result: " + bean;
return "result: " + bean.getId();
}
public void queryResult(CompletionHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " handler 运行了queryResult方法");
if (handler != null) {
handler.completed("result: " + bean, bean);
handler.completed("result: " + bean.getId(), bean);
}
}