From f440b2d639ab32d108fba1be4f051aec06880e20 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 21 May 2018 11:54:52 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection.java | 96 ++++++++++++++++++------ src/org/redkale/net/ProtocolServer.java | 83 ++++++++++++++------ 2 files changed, 135 insertions(+), 44 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 85c460acc..93f8382f9 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -219,7 +219,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return future; } - static class NIOTCPAsyncConnection extends AsyncConnection { + static class AsyncNIOTCPConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -251,7 +251,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl CompletionHandler writeHandler; - public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr0, + public AsyncNIOTCPConnection(final SocketChannel ch, SocketAddress addr0, final Selector selector, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { @@ -306,6 +306,24 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + CompletionHandler removeReadHandler() { + CompletionHandler handler = this.readHandler; + this.readHandler = null; + return handler; + } + + ByteBuffer removeReadBuffer() { + ByteBuffer buffer = this.readBuffer; + this.readBuffer = null; + return buffer; + } + + Object removeReadAttachment() { + Object attach = this.readAttachment; + this.readAttachment = null; + return attach; + } + void completeRead(int rs) { Object attach = this.readAttachment; CompletionHandler handler = this.readHandler; @@ -324,6 +342,42 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl handler.failed(t, attach); } + CompletionHandler removeWriteHandler() { + CompletionHandler handler = this.writeHandler; + this.writeHandler = null; + return handler; + } + + ByteBuffer removeWriteOneBuffer() { + ByteBuffer buffer = this.writeOneBuffer; + this.writeOneBuffer = null; + return buffer; + } + + ByteBuffer[] removeWriteBuffers() { + ByteBuffer[] buffers = this.writeBuffers; + this.writeBuffers = null; + return buffers; + } + + int removeWriteOffset() { + int rs = this.writeOffset; + this.writeOffset = 0; + return rs; + } + + int removeWriteLength() { + int rs = this.writeLength; + this.writeLength = 0; + return rs; + } + + Object removeWriteAttachment() { + Object attach = this.writeAttachment; + this.writeAttachment = null; + return attach; + } + void completeWrite(int rs) { Object attach = this.writeAttachment; CompletionHandler handler = this.writeHandler; @@ -466,20 +520,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) { - return new NIOTCPAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + return new AsyncNIOTCPConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); } public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } - private static class BIOUDPAsyncConnection extends AsyncConnection { + private static class AsyncBIOUDPConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -491,7 +545,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl private final boolean client; - public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr0, + public AsyncBIOUDPConnection(final DatagramChannel ch, SocketAddress addr0, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { this.channel = ch; @@ -628,16 +682,16 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } - private static class BIOTCPAsyncConnection extends AsyncConnection { + private static class AsyncBIOTCPConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -651,7 +705,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl private final SocketAddress remoteAddress; - public BIOTCPAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + public AsyncBIOTCPConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { this.socket = socket; ReadableByteChannel rc = null; @@ -802,15 +856,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - return new BIOTCPAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); + return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); } public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new BIOTCPAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); + return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); } - private static class AIOTCPAsyncConnection extends AsyncConnection { + private static class AsyncAIOTCPConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -820,7 +874,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl private final SocketAddress remoteAddress; - public AIOTCPAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext, + public AsyncAIOTCPConnection(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { this.channel = ch; @@ -952,29 +1006,29 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new AIOTCPAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new AIOTCPAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) { - return new AIOTCPAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AIOTCPAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AIOTCPAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AIOTCPAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); + return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); } } diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index 85d014304..29caf918f 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -12,7 +12,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; -import org.redkale.net.AsyncConnection.NIOTCPAsyncConnection; +import org.redkale.net.AsyncConnection.AsyncNIOTCPConnection; import org.redkale.util.AnyValue; /** @@ -85,8 +85,8 @@ public abstract class ProtocolServer { //--------------------------------------------------------------------- public static ProtocolServer create(String protocol, Context context) { - if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolAIOTCPServer(context); - if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context); + if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolNIOTCPServer(context); + if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolBIOUDPServer(context); throw new RuntimeException("ProtocolServer not support protocol " + protocol); } @@ -98,7 +98,7 @@ public abstract class ProtocolServer { return supportTcpKeepAlive; } - static final class ProtocolUDPServer extends ProtocolServer { + static final class ProtocolBIOUDPServer extends ProtocolServer { private boolean running; @@ -106,7 +106,7 @@ public abstract class ProtocolServer { private DatagramChannel serverChannel; - public ProtocolUDPServer(Context context) { + public ProtocolBIOUDPServer(Context context) { this.context = context; } @@ -451,8 +451,16 @@ public abstract class ProtocolServer { private void processKey(SelectionKey key) { if (key == null || !key.isValid()) return; SocketChannel socket = (SocketChannel) key.channel(); - NIOTCPAsyncConnection conn = (NIOTCPAsyncConnection) key.attachment(); - if(conn == null) return; + AsyncNIOTCPConnection conn = (AsyncNIOTCPConnection) key.attachment(); + if (!socket.isOpen()) { + if (conn == null) { + key.cancel(); + } else { + conn.dispose(); + } + return; + } + if (conn == null) return; if (key.isReadable()) { if (conn.readHandler != null) readOP(key, socket, conn); } else if (key.isWritable()) { @@ -460,23 +468,47 @@ public abstract class ProtocolServer { } } - private void readOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) { + private void readOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) { + final CompletionHandler handler = conn.removeReadHandler(); + final ByteBuffer buffer = conn.removeReadBuffer(); + final Object attach = conn.removeReadAttachment(); + //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler); + if (handler == null || buffer == null) return; try { - final int rs = socket.read(conn.readBuffer); - //System.out.println(conn + "------readbuf:" + conn.readBuffer + "-------handler:" + conn.readHandler + "-------read: " + rs); - context.runAsync(() -> conn.completeRead(rs)); + final int rs = socket.read(buffer); + { //测试 + buffer.flip(); + byte[] bs = new byte[buffer.remaining()]; + buffer.get(bs); + //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs)); + } + //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs); + context.runAsync(() -> { + try { + handler.completed(rs, attach); + } catch (Throwable e) { + handler.failed(e, attach); + } + }); } catch (Throwable t) { - context.runAsync(() -> conn.faileRead(t)); + context.runAsync(() -> handler.failed(t, attach)); } } - private void writeOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) { + private void writeOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) { + final CompletionHandler handler = conn.removeWriteHandler(); + final ByteBuffer oneBuffer = conn.removeWriteOneBuffer(); + final ByteBuffer[] buffers = conn.removeWriteBuffers(); + final Object attach = conn.removeWriteAttachment(); + final int writeOffset = conn.removeWriteOffset(); + final int writeLength = conn.removeWriteLength(); + if (handler == null || (oneBuffer == null && buffers == null)) return; + //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler); try { int rs = 0; - if (conn.writeOneBuffer == null) { - final ByteBuffer[] buffers = conn.writeBuffers; - int offset = conn.writeOffset; - int length = conn.writeLength; + if (oneBuffer == null) { + int offset = writeOffset; + int length = writeLength; for (;;) { long sr = socket.write(buffers, offset, length); if (sr > 0) rs += sr; @@ -492,15 +524,20 @@ public abstract class ProtocolServer { if (over) break; } } else { - final ByteBuffer buffer = conn.writeOneBuffer; - while (buffer.hasRemaining()) rs += socket.write(buffer); + while (oneBuffer.hasRemaining()) rs += socket.write(oneBuffer); } - key.interestOps(SelectionKey.OP_READ); + key.interestOps(SelectionKey.OP_READ); //OP_CONNECT final int rs0 = rs; - //System.out.println(conn + "------buffers:" + conn.writeBuffers + "---onebuf:" + conn.writeOneBuffer + "-------handler:" + conn.writeHandler + "-------write: " + rs); - context.runAsync(() -> conn.completeWrite(rs0)); + //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs); + context.runAsync(() -> { + try { + handler.completed(rs0, attach); + } catch (Throwable e) { + handler.failed(e, attach); + } + }); } catch (Throwable t) { - context.runAsync(() -> conn.faileWrite(t)); + context.runAsync(() -> handler.failed(t, attach)); } }