优化SNCP.UDP

This commit is contained in:
redkale
2023-01-30 17:52:58 +08:00
parent 6d1cf4b6bf
commit 737d36be67
9 changed files with 68 additions and 26 deletions

View File

@@ -302,6 +302,7 @@ public class AsyncIOGroup extends AsyncGroup {
private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
AsyncIOThread readThread = null;
AsyncIOThread writeThread = null;
AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread();

View File

@@ -153,7 +153,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
if (++writeIndex >= writes) {
writeIndex = 0;
}
accept(key, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
accept(ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
}
}
keys.clear();
@@ -166,7 +166,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
this.acceptThread.start();
}
private void accept(SelectionKey key, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
private void accept(AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
SocketChannel channel = this.serverChannel.accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
@@ -194,6 +194,11 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
return this.serverChannel.getLocalAddress();
}
@Override
public void close() throws IOException {
if (this.closed) {

View File

@@ -100,10 +100,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
@Override
public boolean isConnected() {
if (!clientMode) {
if (clientMode) {
return this.channel.isConnected();
} else {
return true;
}
return this.channel.isConnected();
}
@Override
@@ -113,7 +114,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
@Override
protected int implRead(ByteBuffer dst) throws IOException {
return this.channel.read(dst);
if (clientMode) {
return this.channel.read(dst);
} else {
return 0;
}
}
@Override

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.boot.Application;
import org.redkale.util.*;
@@ -110,7 +111,6 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool);
this.ioGroup.start();
this.serverChannel.register(this.selector, SelectionKey.OP_READ);
this.acceptThread = new Thread() {
{
setName(String.format(threadNameFormat, "Accept"));
@@ -124,21 +124,41 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
final int writes = ioWriteThreads.length;
int readIndex = -1;
int writeIndex = -1;
Set<SelectionKey> keys = null;
final Selector sel = selector;
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool);
while (!closed) {
final ByteBuffer buffer = unsafeBufferPool.get();
try {
SocketAddress address = serverChannel.receive(buffer);
buffer.flip();
if (++readIndex >= reads) {
readIndex = 0;
int count = sel.select();
if (count == 0) {
continue;
}
if (++writeIndex >= writes) {
writeIndex = 0;
if (keys == null) {
keys = selector.selectedKeys();
}
for (SelectionKey key : keys) {
if (key.isReadable()) {
final ByteBuffer buffer = unsafeBufferPool.get();
try {
SocketAddress address = serverChannel.receive(buffer);
buffer.flip();
if (++readIndex >= reads) {
readIndex = 0;
}
if (++writeIndex >= writes) {
writeIndex = 0;
}
accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
} catch (Throwable t) {
unsafeBufferPool.accept(buffer);
}
}
}
keys.clear();
} catch (Throwable ex) {
if (!closed) {
server.logger.log(Level.FINE, getName() + " selector run failed", ex);
}
accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
} catch (Throwable t) {
unsafeBufferPool.accept(buffer);
}
}
}
@@ -167,6 +187,11 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
return this.serverChannel.getLocalAddress();
}
@Override
public void close() throws IOException {
if (this.closed) {

View File

@@ -113,7 +113,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
} catch (Exception te) {
channel.dispose();// response.init(channel); 在调用之前异常
if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te);
context.logger.log(Level.FINEST, "Servlet start read channel erroneous, force to close channel ", te);
}
}
}
@@ -134,7 +134,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
} catch (Exception te) {
channel.dispose();// response.init(channel); 在调用之前异常
if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te);
context.logger.log(Level.FINEST, "Servlet run read channel erroneous, force to close channel ", te);
}
}
}
@@ -227,7 +227,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
channel.offerReadBuffer(attachment);
response.error(exc);
if (exc != null) {
request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);
request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc);
}
}
}

View File

@@ -7,7 +7,7 @@ package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.util.*;
import java.util.Set;
import org.redkale.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.util.AnyValue;
@@ -40,6 +40,8 @@ public abstract class ProtocolServer {
public abstract void accept(Application application, Server server) throws IOException;
public abstract SocketAddress getLocalAddress() throws IOException;
public abstract void close() throws IOException;
protected ProtocolServer(Context context) {

View File

@@ -305,6 +305,10 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
}
this.serverChannel.open(config);
serverChannel.bind(address, backlog);
SocketAddress localAddress = serverChannel.getLocalAddress();
if (localAddress instanceof InetSocketAddress) {
this.address = (InetSocketAddress) localAddress;
}
serverChannel.accept(application, this);
postStart();
logger.info(this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(netprotocol) ? "" : ("." + netprotocol)) + " listen: " + (address.getHostString() + ":" + address.getPort())

View File

@@ -65,7 +65,7 @@ public class SncpRequest extends Request<SncpContext> {
//---------------------head----------------------------------
if (this.readState == READ_STATE_ROUTE) {
if (buffer.remaining() < HEADER_SIZE) {
return 1; //小于60
return HEADER_SIZE - buffer.remaining(); //小于60
}
this.seqid = buffer.getLong(); //8
if (buffer.getChar() != HEADER_SIZE) { //2

View File

@@ -25,13 +25,11 @@ import org.redkale.util.*;
*/
public class SncpTest {
private static final String serviceName = "";
private static final String myhost = Utility.localInetAddress().getHostAddress();
private static final int port = 4040;
private static int port = 0;
private static final int port2 = 4240;
private static int port2 = 4240;
private static final String protocol = "SNCP.TCP";
@@ -182,6 +180,7 @@ public class SncpTest {
System.out.println(service);
server.init(conf);
server.start();
port = server.getSocketAddress().getPort();
cdl.countDown();
} catch (Exception e) {
e.printStackTrace();
@@ -206,7 +205,7 @@ public class SncpTest {
try {
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("host", "0.0.0.0");
conf.addValue("port", "" + port2);
conf.addValue("port", "" + (port2 < 10 ? 0 : port2));
conf.addValue("protocol", protocol);
SncpServer server = new SncpServer(null, System.currentTimeMillis(), conf, factory);
Set<InetSocketAddress> set = new LinkedHashSet<>();
@@ -218,6 +217,7 @@ public class SncpTest {
server.addSncpServlet(service);
server.init(conf);
server.start();
port2 = server.getSocketAddress().getPort();
cdl.countDown();
} catch (Exception e) {
e.printStackTrace();