This commit is contained in:
Redkale
2018-05-21 11:54:52 +08:00
parent 324d4bd94e
commit f440b2d639
2 changed files with 135 additions and 44 deletions

View File

@@ -219,7 +219,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return future;
}
static class NIOTCPAsyncConnection extends AsyncConnection {
static class AsyncNIOTCPConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -251,7 +251,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
CompletionHandler writeHandler;
public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr0,
public AsyncNIOTCPConnection(final SocketChannel ch, SocketAddress addr0,
final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
@@ -306,6 +306,24 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
CompletionHandler removeReadHandler() {
CompletionHandler handler = this.readHandler;
this.readHandler = null;
return handler;
}
ByteBuffer removeReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
return buffer;
}
Object removeReadAttachment() {
Object attach = this.readAttachment;
this.readAttachment = null;
return attach;
}
void completeRead(int rs) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
@@ -324,6 +342,42 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
handler.failed(t, attach);
}
CompletionHandler removeWriteHandler() {
CompletionHandler handler = this.writeHandler;
this.writeHandler = null;
return handler;
}
ByteBuffer removeWriteOneBuffer() {
ByteBuffer buffer = this.writeOneBuffer;
this.writeOneBuffer = null;
return buffer;
}
ByteBuffer[] removeWriteBuffers() {
ByteBuffer[] buffers = this.writeBuffers;
this.writeBuffers = null;
return buffers;
}
int removeWriteOffset() {
int rs = this.writeOffset;
this.writeOffset = 0;
return rs;
}
int removeWriteLength() {
int rs = this.writeLength;
this.writeLength = 0;
return rs;
}
Object removeWriteAttachment() {
Object attach = this.writeAttachment;
this.writeAttachment = null;
return attach;
}
void completeWrite(int rs) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
@@ -466,20 +520,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) {
return new NIOTCPAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
return new AsyncNIOTCPConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private static class BIOUDPAsyncConnection extends AsyncConnection {
private static class AsyncBIOUDPConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -491,7 +545,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
private final boolean client;
public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr0,
public AsyncBIOUDPConnection(final DatagramChannel ch, SocketAddress addr0,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
@@ -628,16 +682,16 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private static class BIOTCPAsyncConnection extends AsyncConnection {
private static class AsyncBIOTCPConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -651,7 +705,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
private final SocketAddress remoteAddress;
public BIOTCPAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
public AsyncBIOTCPConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.socket = socket;
ReadableByteChannel rc = null;
@@ -802,15 +856,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new BIOTCPAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null);
return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null);
}
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0,
final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new BIOTCPAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter);
return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter);
}
private static class AIOTCPAsyncConnection extends AsyncConnection {
private static class AsyncAIOTCPConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -820,7 +874,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
private final SocketAddress remoteAddress;
public AIOTCPAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
public AsyncAIOTCPConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
@@ -952,29 +1006,29 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new AIOTCPAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new AIOTCPAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) {
return new AIOTCPAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds,
final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AIOTCPAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds,
final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AIOTCPAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0,
final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AIOTCPAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
}
}

View File

@@ -12,7 +12,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.net.AsyncConnection.NIOTCPAsyncConnection;
import org.redkale.net.AsyncConnection.AsyncNIOTCPConnection;
import org.redkale.util.AnyValue;
/**
@@ -85,8 +85,8 @@ public abstract class ProtocolServer {
//---------------------------------------------------------------------
public static ProtocolServer create(String protocol, Context context) {
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolAIOTCPServer(context);
if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context);
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolNIOTCPServer(context);
if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolBIOUDPServer(context);
throw new RuntimeException("ProtocolServer not support protocol " + protocol);
}
@@ -98,7 +98,7 @@ public abstract class ProtocolServer {
return supportTcpKeepAlive;
}
static final class ProtocolUDPServer extends ProtocolServer {
static final class ProtocolBIOUDPServer extends ProtocolServer {
private boolean running;
@@ -106,7 +106,7 @@ public abstract class ProtocolServer {
private DatagramChannel serverChannel;
public ProtocolUDPServer(Context context) {
public ProtocolBIOUDPServer(Context context) {
this.context = context;
}
@@ -451,8 +451,16 @@ public abstract class ProtocolServer {
private void processKey(SelectionKey key) {
if (key == null || !key.isValid()) return;
SocketChannel socket = (SocketChannel) key.channel();
NIOTCPAsyncConnection conn = (NIOTCPAsyncConnection) key.attachment();
if(conn == null) return;
AsyncNIOTCPConnection conn = (AsyncNIOTCPConnection) key.attachment();
if (!socket.isOpen()) {
if (conn == null) {
key.cancel();
} else {
conn.dispose();
}
return;
}
if (conn == null) return;
if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
} else if (key.isWritable()) {
@@ -460,23 +468,47 @@ public abstract class ProtocolServer {
}
}
private void readOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) {
private void readOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) {
final CompletionHandler handler = conn.removeReadHandler();
final ByteBuffer buffer = conn.removeReadBuffer();
final Object attach = conn.removeReadAttachment();
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler);
if (handler == null || buffer == null) return;
try {
final int rs = socket.read(conn.readBuffer);
//System.out.println(conn + "------readbuf:" + conn.readBuffer + "-------handler:" + conn.readHandler + "-------read: " + rs);
context.runAsync(() -> conn.completeRead(rs));
final int rs = socket.read(buffer);
{ //测试
buffer.flip();
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs));
}
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> conn.faileRead(t));
context.runAsync(() -> handler.failed(t, attach));
}
}
private void writeOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) {
private void writeOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) {
final CompletionHandler handler = conn.removeWriteHandler();
final ByteBuffer oneBuffer = conn.removeWriteOneBuffer();
final ByteBuffer[] buffers = conn.removeWriteBuffers();
final Object attach = conn.removeWriteAttachment();
final int writeOffset = conn.removeWriteOffset();
final int writeLength = conn.removeWriteLength();
if (handler == null || (oneBuffer == null && buffers == null)) return;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler);
try {
int rs = 0;
if (conn.writeOneBuffer == null) {
final ByteBuffer[] buffers = conn.writeBuffers;
int offset = conn.writeOffset;
int length = conn.writeLength;
if (oneBuffer == null) {
int offset = writeOffset;
int length = writeLength;
for (;;) {
long sr = socket.write(buffers, offset, length);
if (sr > 0) rs += sr;
@@ -492,15 +524,20 @@ public abstract class ProtocolServer {
if (over) break;
}
} else {
final ByteBuffer buffer = conn.writeOneBuffer;
while (buffer.hasRemaining()) rs += socket.write(buffer);
while (oneBuffer.hasRemaining()) rs += socket.write(oneBuffer);
}
key.interestOps(SelectionKey.OP_READ);
key.interestOps(SelectionKey.OP_READ); //OP_CONNECT
final int rs0 = rs;
//System.out.println(conn + "------buffers:" + conn.writeBuffers + "---onebuf:" + conn.writeOneBuffer + "-------handler:" + conn.writeHandler + "-------write: " + rs);
context.runAsync(() -> conn.completeWrite(rs0));
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs0, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> conn.faileWrite(t));
context.runAsync(() -> handler.failed(t, attach));
}
}