diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index af8c2c0bb..77ee2a93d 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -302,6 +302,7 @@ public class AsyncIOGroup extends AsyncGroup { private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException { DatagramChannel channel = DatagramChannel.open(); + channel.configureBlocking(false); AsyncIOThread readThread = null; AsyncIOThread writeThread = null; AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread(); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 22737b3ae..cf09d5933 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -153,7 +153,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { if (++writeIndex >= writes) { writeIndex = 0; } - accept(key, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); + accept(ioReadThreads[readIndex], ioWriteThreads[writeIndex]); } } keys.clear(); @@ -166,7 +166,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { this.acceptThread.start(); } - private void accept(SelectionKey key, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { + private void accept(AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { SocketChannel channel = this.serverChannel.accept(); channel.configureBlocking(false); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); @@ -194,6 +194,11 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { } } + @Override + public SocketAddress getLocalAddress() throws IOException { + return this.serverChannel.getLocalAddress(); + } + @Override public void close() throws IOException { if (this.closed) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index a98b7e4e3..48b412be4 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -100,10 +100,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection { @Override public boolean isConnected() { - if (!clientMode) { + if (clientMode) { + return this.channel.isConnected(); + } else { return true; } - return this.channel.isConnected(); } @Override @@ -113,7 +114,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection { @Override protected int implRead(ByteBuffer dst) throws IOException { - return this.channel.read(dst); + if (clientMode) { + return this.channel.read(dst); + } else { + return 0; + } } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index f24b292b8..0ae373d52 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.Set; import java.util.concurrent.atomic.LongAdder; import java.util.function.*; +import java.util.logging.Level; import org.redkale.boot.Application; import org.redkale.util.*; @@ -110,7 +111,6 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); this.ioGroup.start(); this.serverChannel.register(this.selector, SelectionKey.OP_READ); - this.acceptThread = new Thread() { { setName(String.format(threadNameFormat, "Accept")); @@ -124,21 +124,41 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { final int writes = ioWriteThreads.length; int readIndex = -1; int writeIndex = -1; + Set keys = null; + final Selector sel = selector; ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool); while (!closed) { - final ByteBuffer buffer = unsafeBufferPool.get(); try { - SocketAddress address = serverChannel.receive(buffer); - buffer.flip(); - if (++readIndex >= reads) { - readIndex = 0; + int count = sel.select(); + if (count == 0) { + continue; } - if (++writeIndex >= writes) { - writeIndex = 0; + if (keys == null) { + keys = selector.selectedKeys(); + } + for (SelectionKey key : keys) { + if (key.isReadable()) { + final ByteBuffer buffer = unsafeBufferPool.get(); + try { + SocketAddress address = serverChannel.receive(buffer); + buffer.flip(); + if (++readIndex >= reads) { + readIndex = 0; + } + if (++writeIndex >= writes) { + writeIndex = 0; + } + accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); + } catch (Throwable t) { + unsafeBufferPool.accept(buffer); + } + } + } + keys.clear(); + } catch (Throwable ex) { + if (!closed) { + server.logger.log(Level.FINE, getName() + " selector run failed", ex); } - accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); - } catch (Throwable t) { - unsafeBufferPool.accept(buffer); } } } @@ -167,6 +187,11 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { } } + @Override + public SocketAddress getLocalAddress() throws IOException { + return this.serverChannel.getLocalAddress(); + } + @Override public void close() throws IOException { if (this.closed) { diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 2e64c265d..04eb7412a 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -113,7 +113,7 @@ class ProtocolCodec implements CompletionHandler { } catch (Exception te) { channel.dispose();// response.init(channel); 在调用之前异常 if (context.logger.isLoggable(Level.FINEST)) { - context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te); + context.logger.log(Level.FINEST, "Servlet start read channel erroneous, force to close channel ", te); } } } @@ -134,7 +134,7 @@ class ProtocolCodec implements CompletionHandler { } catch (Exception te) { channel.dispose();// response.init(channel); 在调用之前异常 if (context.logger.isLoggable(Level.FINEST)) { - context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te); + context.logger.log(Level.FINEST, "Servlet run read channel erroneous, force to close channel ", te); } } } @@ -227,7 +227,7 @@ class ProtocolCodec implements CompletionHandler { channel.offerReadBuffer(attachment); response.error(exc); if (exc != null) { - request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); + request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc); } } } diff --git a/src/main/java/org/redkale/net/ProtocolServer.java b/src/main/java/org/redkale/net/ProtocolServer.java index c95689e8f..9c824afc8 100644 --- a/src/main/java/org/redkale/net/ProtocolServer.java +++ b/src/main/java/org/redkale/net/ProtocolServer.java @@ -7,7 +7,7 @@ package org.redkale.net; import java.io.IOException; import java.net.*; -import java.util.*; +import java.util.Set; import org.redkale.annotation.Resource; import org.redkale.boot.Application; import org.redkale.util.AnyValue; @@ -40,6 +40,8 @@ public abstract class ProtocolServer { public abstract void accept(Application application, Server server) throws IOException; + public abstract SocketAddress getLocalAddress() throws IOException; + public abstract void close() throws IOException; protected ProtocolServer(Context context) { diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index fbc387684..0c2d16c44 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -305,6 +305,10 @@ public abstract class Server { //---------------------head---------------------------------- if (this.readState == READ_STATE_ROUTE) { if (buffer.remaining() < HEADER_SIZE) { - return 1; //小于60 + return HEADER_SIZE - buffer.remaining(); //小于60 } this.seqid = buffer.getLong(); //8 if (buffer.getChar() != HEADER_SIZE) { //2 diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 502758907..5ceef9cb7 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -25,13 +25,11 @@ import org.redkale.util.*; */ public class SncpTest { - private static final String serviceName = ""; - private static final String myhost = Utility.localInetAddress().getHostAddress(); - private static final int port = 4040; + private static int port = 0; - private static final int port2 = 4240; + private static int port2 = 4240; private static final String protocol = "SNCP.TCP"; @@ -182,6 +180,7 @@ public class SncpTest { System.out.println(service); server.init(conf); server.start(); + port = server.getSocketAddress().getPort(); cdl.countDown(); } catch (Exception e) { e.printStackTrace(); @@ -206,7 +205,7 @@ public class SncpTest { try { AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); conf.addValue("host", "0.0.0.0"); - conf.addValue("port", "" + port2); + conf.addValue("port", "" + (port2 < 10 ? 0 : port2)); conf.addValue("protocol", protocol); SncpServer server = new SncpServer(null, System.currentTimeMillis(), conf, factory); Set set = new LinkedHashSet<>(); @@ -218,6 +217,7 @@ public class SncpTest { server.addSncpServlet(service); server.init(conf); server.start(); + port2 = server.getSocketAddress().getPort(); cdl.countDown(); } catch (Exception e) { e.printStackTrace();