AsyncConnection接口大变动

This commit is contained in:
Redkale
2018-12-18 14:04:49 +08:00
parent 8afcaa0b34
commit 25eaf6e353
17 changed files with 388 additions and 1273 deletions

View File

@@ -12,8 +12,9 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Consumer;
import java.util.function.*;
import javax.net.ssl.SSLContext;
import org.redkale.util.ObjectPool;
/**
*
@@ -22,7 +23,7 @@ import javax.net.ssl.SSLContext;
*
* @author zhangjx
*/
public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCloseable {
public abstract class AsyncConnection implements ReadableByteChannel, WritableByteChannel, AutoCloseable {
protected SSLContext sslContext;
@@ -34,6 +35,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
protected volatile long writetime;
protected final Supplier<ByteBuffer> bufferSupplier;
protected final Consumer<ByteBuffer> bufferConsumer;
protected ByteBuffer readBuffer;
//在线数
protected AtomicLong livingCounter;
@@ -45,6 +52,22 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
//关联的事件数, 小于1表示没有事件
protected final AtomicInteger eventing = new AtomicInteger();
protected AsyncConnection(Context context) {
this(context.getBufferSupplier(), context.getBufferConsumer(), context.getSSLContext());
}
protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) {
this(bufferPool, bufferPool, sslContext);
}
protected AsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer, SSLContext sslContext) {
Objects.requireNonNull(bufferSupplier);
Objects.requireNonNull(bufferConsumer);
this.bufferSupplier = bufferSupplier;
this.bufferConsumer = bufferConsumer;
this.sslContext = sslContext;
}
public final long getLastReadTime() {
return readtime;
}
@@ -61,6 +84,9 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return eventing.decrementAndGet();
}
@Override
public abstract boolean isOpen();
public abstract boolean isTCP();
public abstract boolean shutdownInput();
@@ -84,17 +110,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
@Override
public abstract Future<Integer> read(ByteBuffer dst);
public abstract int read(ByteBuffer dst) throws IOException;
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
public abstract void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler);
@Override
public abstract <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract int write(ByteBuffer src) throws IOException;
public abstract <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler);
@Override
public abstract Future<Integer> write(ByteBuffer src);
@Override
public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler);
public final <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) {
@@ -103,6 +127,36 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public void setReadBuffer(ByteBuffer buffer) {
if (this.readBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer");
this.readBuffer = buffer;
}
public ByteBuffer pollReadBuffer() {
ByteBuffer rs = this.readBuffer;
if (rs != null) {
this.readBuffer = null;
return rs;
}
return bufferSupplier.get();
}
public void offerBuffer(ByteBuffer buffer) {
if (buffer == null) return;
bufferConsumer.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferConsumer.accept(buffer);
}
}
public ByteBuffer pollWriteBuffer() {
return bufferSupplier.get();
}
public void dispose() {//同close 只是去掉throws IOException
try {
this.close();
@@ -125,11 +179,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
livingCounter.decrementAndGet();
livingCounter = null;
}
if (beforeCloseListener != null)
if (beforeCloseListener != null) {
try {
beforeCloseListener.accept(this);
} catch (Exception io) {
}
}
if (this.readBuffer != null) {
bufferConsumer.accept(this.readBuffer);
}
if (attributes == null) return;
try {
for (Object obj : attributes.values()) {
@@ -174,6 +232,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
/**
* 创建TCP协议客户端连接
*
* @param bufferPool ByteBuffer对象池
* @param address 连接点子
* @param group 连接AsynchronousChannelGroup
* @param readTimeoutSeconds 读取超时秒数
@@ -181,14 +240,31 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address,
final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(group, null, address, readTimeoutSeconds, writeTimeoutSeconds);
public static CompletableFuture<AsyncConnection> createTCP(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousChannelGroup group,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds);
}
/**
* 创建TCP协议客户端连接
*
* @param context Context
* @param address 连接点子
* @param group 连接AsynchronousChannelGroup
* @param readTimeoutSeconds 读取超时秒数
* @param writeTimeoutSeconds 写入超时秒数
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final Context context, final AsynchronousChannelGroup group,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(context.getBufferSupplier(), context.getBufferConsumer(), group, context.getSSLContext(), address, readTimeoutSeconds, writeTimeoutSeconds);
}
/**
* 创建TCP协议客户端连接
*
* @param bufferPool ByteBuffer对象池
* @param address 连接点子
* @param sslContext SSLContext
* @param group 连接AsynchronousChannelGroup
@@ -197,7 +273,25 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
public static CompletableFuture<AsyncConnection> createTCP(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousChannelGroup group, final SSLContext sslContext,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(bufferPool, bufferPool, group, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds);
}
/**
* 创建TCP协议客户端连接
*
* @param bufferSupplier ByteBuffer生产器
* @param bufferConsumer ByteBuffer回收器
* @param address 连接点子
* @param sslContext SSLContext
* @param group 连接AsynchronousChannelGroup
* @param readTimeoutSeconds 读取超时秒数
* @param writeTimeoutSeconds 写入超时秒数
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer, final AsynchronousChannelGroup group, final SSLContext sslContext,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
try {
@@ -211,7 +305,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
channel.connect(address, null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
future.complete(create(channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds));
future.complete(new TcpAioAsyncConnection(bufferSupplier, bufferConsumer, channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds, null, null));
}
@Override
@@ -225,80 +319,109 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return future;
}
/**
* 通常用于 ssl socket
*
* @param socket Socket对象
*
* @return 连接对象
*/
public static AsyncConnection create(final Socket socket) {
return create(socket, null, 0, 0);
}
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null);
}
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0,
final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
// public static AsyncConnection create(final Socket socket) {
// return create(socket, null, 0, 0);
// }
// public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
// return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null);
// }
//
// public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0,
// final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
// return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter);
// }
//
// public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
// final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
// return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
// }
//
// public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) {
// return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
// }
//
// public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
// final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
// final AtomicLong livingCounter, final AtomicLong closedCounter) {
// return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
// }
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) {
return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch, SSLContext sslContext,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch, SSLContext sslContext,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch) {
return create(ch, null, 0, 0);
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch) {
return create(context, ch, (SocketAddress) null, 0, 0);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) {
return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds,
final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds,
final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0,
final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) {
return create(bufferPool, ch, null, 0, 0);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
}

View File

@@ -55,14 +55,13 @@ public class PrepareRunner implements Runnable {
return;
}
if (response == null) response = responsePool.get();
final ByteBuffer buffer = response.request.pollReadBuffer();
try {
channel.read(buffer, keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null,
new CompletionHandler<Integer, Void>() {
channel.read(keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer count, Void attachment1) {
public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) {
response.request.offerReadBuffer(buffer);
channel.offerBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
@@ -85,8 +84,8 @@ public class PrepareRunner implements Runnable {
}
@Override
public void failed(Throwable exc, Void attachment2) {
response.request.offerReadBuffer(buffer);
public void failed(Throwable exc, ByteBuffer buffer) {
channel.offerBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
@@ -96,7 +95,6 @@ public class PrepareRunner implements Runnable {
}
});
} catch (Exception te) {
response.request.offerReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
@@ -126,19 +124,4 @@ public class PrepareRunner implements Runnable {
return response.removeChannel();
}
protected ByteBuffer pollReadBuffer(Request request) {
return request.pollReadBuffer();
}
protected ByteBuffer pollReadBuffer(Response response) {
return response.request.pollReadBuffer();
}
protected void offerReadBuffer(Request request, ByteBuffer buffer) {
request.offerReadBuffer(buffer);
}
protected void offerReadBuffer(Response response, ByteBuffer buffer) {
response.request.offerReadBuffer(buffer);
}
}

View File

@@ -213,15 +213,16 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
public void prepare(final ByteBuffer buffer, final R request, final P response) throws IOException {
executeCounter.incrementAndGet();
final int rs = request.readHeader(buffer);
if (rs < 0) {
request.offerReadBuffer(buffer);
final AsyncConnection channel = request.channel;
if (rs < 0) { //表示数据格式不正确
channel.offerBuffer(buffer);
if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet();
response.finish(true);
} else if (rs == 0) {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else {
request.offerReadBuffer(buffer);
channel.offerBuffer(buffer);
}
request.prepare();
response.filter = this.headFilter;
@@ -229,21 +230,23 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
response.nextEvent();
} else {
buffer.clear();
channel.setReadBuffer(buffer);
final AtomicInteger ai = new AtomicInteger(rs);
request.channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
buffer.flip();
ai.addAndGet(-request.readBody(buffer));
attachment.flip();
ai.addAndGet(-request.readBody(attachment));
if (ai.get() > 0) {
buffer.clear();
request.channel.read(buffer, buffer, this);
attachment.clear();
channel.setReadBuffer(attachment);
channel.read(this);
} else {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
if (attachment.hasRemaining()) {
request.setMoredata(attachment);
} else {
request.offerReadBuffer(buffer);
channel.offerBuffer(attachment);
}
request.prepare();
try {
@@ -261,7 +264,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
illRequestCounter.incrementAndGet();
request.offerReadBuffer(buffer);
channel.offerBuffer(attachment);
response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, forece to close channel ", exc);
}

View File

@@ -73,13 +73,13 @@ public abstract class ProtocolServer {
} else if ("aio".equalsIgnoreCase(netimpl)) {
return new TcpAioProtocolServer(context);
} else if ("nio".equalsIgnoreCase(netimpl)) {
return new TcpNioProtocolServer(context);
return null;// return new TcpNioProtocolServer(context);
}
} else if ("UDP".equalsIgnoreCase(protocol)) {
if (netimpl == null || netimpl.isEmpty()) {
return new UdpBioProtocolServer(context);
return null;// return new UdpBioProtocolServer(context);
} else if ("bio".equalsIgnoreCase(netimpl)) {
return new UdpBioProtocolServer(context);
return null;// return new UdpBioProtocolServer(context);
}
} else if (netimpl == null || netimpl.isEmpty()) {
throw new RuntimeException("ProtocolServer not support protocol " + protocol);

View File

@@ -37,8 +37,6 @@ public abstract class Request<C extends Context> {
protected AsyncConnection channel;
protected ByteBuffer readBuffer;
/**
* properties 与 attributes 的区别在于调用recycle时 attributes会被清空而properties会保留;
* properties 通常存放需要永久绑定在request里的一些对象
@@ -49,7 +47,6 @@ public abstract class Request<C extends Context> {
protected Request(C context) {
this.context = context;
this.readBuffer = context.pollBuffer();
this.bsonConvert = context.getBsonConvert();
this.jsonConvert = context.getJsonConvert();
}
@@ -64,23 +61,6 @@ public abstract class Request<C extends Context> {
return rs;
}
protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
return buffer;
}
protected void offerReadBuffer(ByteBuffer buffer) {
if (buffer == null) return;
if (this.readBuffer == null) {
buffer.clear();
this.readBuffer = buffer;
} else {
context.offerBuffer(buffer);
}
}
/**
* 返回值Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 &gt;0: 需再读取的字节数。
*

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import javax.net.ssl.SSLContext;
/**
@@ -35,11 +36,12 @@ public class TcpAioAsyncConnection extends AsyncConnection {
private BlockingQueue<WriteEntry> writeQueue;
public TcpAioAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds,
public TcpAioAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0,
final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
super(bufferSupplier, bufferConsumer, sslContext);
this.channel = ch;
this.sslContext = sslContext;
this.readTimeoutSeconds = readTimeoutSeconds;
this.writeTimeoutSeconds = writeTimeoutSeconds;
SocketAddress addr = addr0;
@@ -91,19 +93,21 @@ public class TcpAioAsyncConnection extends AsyncConnection {
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
public void read(CompletionHandler<Integer, ByteBuffer> handler) {
this.readtime = System.currentTimeMillis();
ByteBuffer dst = pollReadBuffer();
if (readTimeoutSeconds > 0) {
channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, dst, handler);
} else {
channel.read(dst, attachment, handler);
channel.read(dst, dst, handler);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
public void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler) {
this.readtime = System.currentTimeMillis();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler);
ByteBuffer dst = pollReadBuffer();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, dst, handler);
}
private <A> void nextWrite(A attachment) {
@@ -223,13 +227,21 @@ public class TcpAioAsyncConnection extends AsyncConnection {
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
public final int read(ByteBuffer dst) throws IOException {
try {
return channel.read(dst).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
public final int write(ByteBuffer src) throws IOException {
try {
return channel.write(src).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
@Override

View File

@@ -33,7 +33,7 @@ public class TcpAioProtocolServer extends ProtocolServer {
@Override
public void open(AnyValue config) throws IOException {
//group = AsynchronousChannelGroup.withThreadPool(context.executor);
group = AsynchronousChannelGroup.withFixedThreadPool(context.executor.getCorePoolSize(), context.executor.getThreadFactory());
group = AsynchronousChannelGroup.withFixedThreadPool(context.executor.getCorePoolSize(), context.executor.getThreadFactory());
this.serverChannel = AsynchronousServerSocketChannel.open(group);
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
@@ -95,9 +95,8 @@ public class TcpAioProtocolServer extends ProtocolServer {
} catch (IOException e) {
context.logger.log(Level.INFO, channel + " setOption error", e);
}
AsyncConnection conn = new TcpAioAsyncConnection(channel, context.sslContext, null, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
conn.livingCounter = livingCounter;
conn.closedCounter = closedCounter;
AsyncConnection conn = new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), channel,
context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
context.runAsync(new PrepareRunner(context, conn, null, null));
}

View File

@@ -1,240 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpBioAsyncConnection extends AsyncConnection {
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
HashSet<SocketOption<?>> set = new HashSet<>(5);
set.add(StandardSocketOptions.SO_SNDBUF);
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_KEEPALIVE);
set.add(StandardSocketOptions.SO_REUSEADDR);
set.add(StandardSocketOptions.TCP_NODELAY);
return Collections.unmodifiableSet(set);
}
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final Socket socket;
private final ReadableByteChannel readChannel;
private final WritableByteChannel writeChannel;
private final SocketAddress remoteAddress;
public TcpBioAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.socket = socket;
ReadableByteChannel rc = null;
WritableByteChannel wc = null;
try {
socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0));
rc = Channels.newChannel(socket.getInputStream());
wc = Channels.newChannel(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
this.readChannel = rc;
this.writeChannel = wc;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = socket.getRemoteSocketAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public boolean isTCP() {
return true;
}
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
return socket.getLocalSocketAddress();
}
@Override
public int getReadTimeoutSeconds() {
return readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return writeTimeoutSeconds;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public boolean shutdownInput() {
try {
this.socket.shutdownInput();
return true;
} catch (IOException e) {
return false;
}
}
@Override
public boolean shutdownOutput() {
try {
this.socket.shutdownOutput();
return true;
} catch (IOException e) {
return false;
}
}
@Override
public <T> boolean setOption(SocketOption<T> name, T value) {
try {
if (StandardSocketOptions.SO_REUSEADDR == name) {
this.socket.setReuseAddress((Boolean) value);
return true;
}
if (StandardSocketOptions.SO_KEEPALIVE == name) {
this.socket.setKeepAlive((Boolean) value);
return true;
}
if (StandardSocketOptions.TCP_NODELAY == name) {
this.socket.setTcpNoDelay((Boolean) value);
return true;
}
if (StandardSocketOptions.SO_RCVBUF == name) {
this.socket.setReceiveBufferSize((Integer) value);
return true;
}
if (StandardSocketOptions.SO_SNDBUF == name) {
this.socket.setSendBufferSize((Integer) value);
return true;
}
} catch (IOException e) {
return false;
}
return false;
}
@Override
public Set<SocketOption<?>> supportedOptions() {
return defaultOptions;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += writeChannel.write(srcs[i]);
}
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = readChannel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = readChannel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = writeChannel.write(src);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = writeChannel.write(src);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
super.close();
this.socket.close();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
}

View File

@@ -1,366 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpNioAsyncConnection extends AsyncConnection {
protected int readTimeoutSeconds;
protected int writeTimeoutSeconds;
protected final Selector selector;
protected SelectionKey key;
protected final SocketChannel channel;
protected final SocketAddress remoteAddress;
ByteBuffer readBuffer;
Object readAttachment;
CompletionHandler readHandler;
ByteBuffer writeOneBuffer;
ByteBuffer[] writeBuffers;
int writingCount;
int writeOffset;
int writeLength;
Object writeAttachment;
CompletionHandler writeHandler;
public TcpNioAsyncConnection(final SocketChannel ch, SocketAddress addr0,
final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.selector = selector;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public boolean shutdownInput() {
try {
this.channel.shutdownInput();
return true;
} catch (IOException e) {
return false;
}
}
@Override
public boolean shutdownOutput() {
try {
this.channel.shutdownOutput();
return true;
} catch (IOException e) {
return false;
}
}
@Override
public <T> boolean setOption(SocketOption<T> name, T value) {
try {
this.channel.setOption(name, value);
return true;
} catch (IOException e) {
return false;
}
}
@Override
public Set<SocketOption<?>> supportedOptions() {
return this.channel.supportedOptions();
}
CompletionHandler removeReadHandler() {
CompletionHandler handler = this.readHandler;
this.readHandler = null;
return handler;
}
ByteBuffer removeReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
return buffer;
}
Object removeReadAttachment() {
Object attach = this.readAttachment;
this.readAttachment = null;
return attach;
}
void completeRead(int rs) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.completed(rs, attach);
}
void faileRead(Throwable t) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.failed(t, attach);
}
CompletionHandler removeWriteHandler() {
CompletionHandler handler = this.writeHandler;
this.writeHandler = null;
return handler;
}
ByteBuffer removeWriteOneBuffer() {
ByteBuffer buffer = this.writeOneBuffer;
this.writeOneBuffer = null;
return buffer;
}
ByteBuffer[] removeWriteBuffers() {
ByteBuffer[] buffers = this.writeBuffers;
this.writeBuffers = null;
return buffers;
}
int removeWritingCount() {
int rs = this.writingCount;
this.writingCount = 0;
return rs;
}
int removeWriteOffset() {
int rs = this.writeOffset;
this.writeOffset = 0;
return rs;
}
int removeWriteLength() {
int rs = this.writeLength;
this.writeLength = 0;
return rs;
}
Object removeWriteAttachment() {
Object attach = this.writeAttachment;
this.writeAttachment = null;
return attach;
}
void completeWrite(int rs) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.completed(rs, attach);
}
void faileWrite(Throwable t) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.failed(t, attach);
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.readHandler != null) throw new RuntimeException("pending read");
try {
this.readBuffer = dst;
this.readAttachment = attachment;
this.readHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_READ);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_READ);
}
selector.wakeup();
} catch (Exception e) {
faileRead(e);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
CompletableFuture future = new CompletableFuture();
read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
this.writeBuffers = srcs;
this.writeOffset = offset;
this.writeLength = length;
this.writingCount = 0;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
faileWrite(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
this.writeOneBuffer = src;
this.writingCount = 0;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
faileWrite(e);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
CompletableFuture future = new CompletableFuture();
write(src, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
key.cancel();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
}

View File

@@ -1,370 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import org.redkale.util.AnyValue;
/**
* 协议底层Server
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpNioProtocolServer extends ProtocolServer {
private Selector acceptSelector;
private ServerSocketChannel serverChannel;
private NioThreadWorker[] workers;
private NioThreadWorker currWorker;
private boolean running;
public TcpNioProtocolServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
acceptSelector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
this.running = true;
this.workers = new NioThreadWorker[Runtime.getRuntime().availableProcessors()];
final CountDownLatch wkcdl = new CountDownLatch(workers.length);
for (int i = 0; i < workers.length; i++) {
workers[i] = new NioThreadWorker(wkcdl, i + 1, workers.length);
workers[i].setDaemon(true);
workers[i].start();
}
for (int i = 0; i < workers.length - 1; i++) { //构成环形
workers[i].next = workers[i + 1];
}
workers[workers.length - 1].next = workers[0];
currWorker = workers[0];
try {
wkcdl.await(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
final CountDownLatch cdl = new CountDownLatch(1);
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
try {
acceptSelector.select();
Set<SelectionKey> selectedKeys = acceptSelector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isAcceptable()) {
try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
currWorker.addChannel(channel);
currWorker = currWorker.next;
} catch (IOException io) {
io.printStackTrace();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}.start();
try {
cdl.await(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
if (!this.running) return;
serverChannel.close();
acceptSelector.close();
for (NioThreadWorker worker : workers) {
worker.interrupt();
}
this.running = false;
}
class NioThreadWorker extends Thread {
final Selector selector;
final CountDownLatch cdl;
private final Queue<TcpNioAsyncConnection> connected;
private final CopyOnWriteArrayList<TcpNioAsyncConnection> done;
protected volatile Thread ownerThread;
NioThreadWorker next;
public NioThreadWorker(final CountDownLatch cdl, int idx, int count) {
this.cdl = cdl;
String idxstr = "000000" + idx;
this.setName("NioThreadWorker:" + context.getServerAddress().getPort() + "-" + idxstr.substring(idxstr.length() - ("" + count).length()));
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
this.connected = new ArrayBlockingQueue<>(1000000);
this.done = new CopyOnWriteArrayList<>();
}
public boolean addChannel(SocketChannel channel) throws IOException {
TcpNioAsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
return connected.add(conn);
}
protected void processConnected() {
TcpNioAsyncConnection schannel;
try {
while ((schannel = connected.poll()) != null) {
SocketChannel channel = schannel.channel;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
channel.register(selector, SelectionKey.OP_READ).attach(schannel);
}
} catch (IOException e) {
// do nothing
}
synchronized (done) {
for (TcpNioAsyncConnection conn : done) {
if (conn.key != null && conn.key.isValid()) {
conn.key.interestOps(SelectionKey.OP_WRITE);
}
}
done.clear();
}
}
public boolean isSameThread() {
return this.ownerThread == Thread.currentThread();
}
@Override
public void run() {
this.ownerThread = Thread.currentThread();
if (cdl != null) cdl.countDown();
while (running) {
processConnected();
try {
selector.select(50);
} catch (IOException e) {
e.printStackTrace();
}
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
processKey(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processKey(SelectionKey key) {
if (key == null || !key.isValid()) return;
SocketChannel socket = (SocketChannel) key.channel();
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
if (!socket.isOpen()) {
if (conn == null) {
key.cancel();
} else {
conn.dispose();
}
return;
}
if (conn == null) return;
if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
} else if (key.isWritable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn);
}
}
private void closeOP(SelectionKey key) {
if (key == null) return;
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
try {
if (key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.close();
key.attach(null);
key.cancel();
}
} catch (IOException e) {
}
conn.dispose();
}
private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) {
final CompletionHandler handler = conn.removeReadHandler();
final ByteBuffer buffer = conn.removeReadBuffer();
final Object attach = conn.removeReadAttachment();
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler);
if (handler == null || buffer == null) return;
try {
final int rs = socket.read(buffer);
{ //测试
buffer.flip();
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs));
}
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> handler.failed(t, attach));
}
}
private void writeOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) {
final CompletionHandler handler = conn.writeHandler;
final ByteBuffer oneBuffer = conn.removeWriteOneBuffer();
final ByteBuffer[] buffers = conn.removeWriteBuffers();
final Object attach = conn.removeWriteAttachment();
final int writingCount = conn.removeWritingCount();
final int writeOffset = conn.removeWriteOffset();
final int writeLength = conn.removeWriteLength();
if (handler == null || (oneBuffer == null && buffers == null)) return;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler);
try {
int rs = 0;
if (oneBuffer == null) {
int offset = writeOffset;
int length = writeLength;
rs = (int) socket.write(buffers, offset, length);
boolean over = true;
int end = offset + length;
for (int i = offset; i < end; i++) {
if (buffers[i].hasRemaining()) {
over = false;
length -= i - offset;
offset = i;
}
}
if (!over) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeBuffers = buffers;
conn.writeOffset = offset;
conn.writeLength = length;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
} else {
rs = socket.write(oneBuffer);
if (oneBuffer.hasRemaining()) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeOneBuffer = oneBuffer;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
}
conn.removeWriteHandler();
key.interestOps(SelectionKey.OP_READ); //OP_CONNECT
final int rs0 = rs + writingCount;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs0, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> handler.failed(t, attach));
}
}
}
}

View File

@@ -29,41 +29,41 @@ import org.redkale.util.*;
* @author zhangjx
*/
public final class Transport {
public static final String DEFAULT_PROTOCOL = "TCP";
protected final AtomicInteger seq = new AtomicInteger(-1);
protected final TransportFactory factory;
protected final String name; //即<group>的name属性
protected final String subprotocol; //即<group>的subprotocol属性
protected final boolean tcp;
protected final String protocol;
protected final AsynchronousChannelGroup group;
protected final InetSocketAddress clientAddress;
//不可能为null
protected TransportNode[] transportNodes = new TransportNode[0];
protected final ObjectPool<ByteBuffer> bufferPool;
protected final SSLContext sslContext;
//负载均衡策略
protected final TransportStrategy strategy;
protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, sslContext, clientAddress, addresses, strategy);
}
protected Transport(String name, String protocol, String subprotocol,
final TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
@@ -81,7 +81,7 @@ public final class Transport {
this.strategy = strategy;
updateRemoteAddresses(addresses);
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
final TransportNode[] oldNodes = this.transportNodes;
synchronized (this) {
@@ -109,7 +109,7 @@ public final class Transport {
}
return rs;
}
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
if (addr == null) return false;
if (clientAddress != null && clientAddress.equals(addr)) return false;
@@ -125,7 +125,7 @@ public final class Transport {
return true;
}
}
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
synchronized (this) {
@@ -133,15 +133,15 @@ public final class Transport {
}
return true;
}
public String getName() {
return name;
}
public String getSubprotocol() {
return subprotocol;
}
public void close() {
TransportNode[] nodes = this.transportNodes;
if (nodes == null) return;
@@ -149,22 +149,22 @@ public final class Transport {
if (node != null) node.dispose();
}
}
public InetSocketAddress getClientAddress() {
return clientAddress;
}
public TransportNode[] getTransportNodes() {
return transportNodes;
}
public TransportNode findTransportNode(SocketAddress addr) {
for (TransportNode node : this.transportNodes) {
if (node.address.equals(addr)) return node;
}
return null;
}
public InetSocketAddress[] getRemoteAddresses() {
InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length];
for (int i = 0; i < rs.length; i++) {
@@ -172,36 +172,36 @@ public final class Transport {
}
return rs;
}
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}";
}
public ByteBuffer pollBuffer() {
return bufferPool.get();
}
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool;
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
for (ByteBuffer buffer : buffers) offerBuffer(buffer);
}
public AsynchronousChannelGroup getTransportChannelGroup() {
return group;
}
public boolean isTCP() {
return tcp;
}
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr0) {
if (this.strategy != null) return strategy.pollConnection(addr0, this);
final TransportNode[] nodes = this.transportNodes;
@@ -215,12 +215,12 @@ public final class Transport {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(udpaddr);
return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSeconds, factory.writeTimeoutSeconds));
return CompletableFuture.completedFuture(AsyncConnection.create(bufferPool, channel, sslContext, udpaddr, true, factory.readTimeoutSeconds, factory.writeTimeoutSeconds));
}
if (!rand) { //指定地址
TransportNode node = findTransportNode(addr);
if (node == null) {
return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
}
final BlockingQueue<AsyncConnection> queue = node.conns;
if (!queue.isEmpty()) {
@@ -233,7 +233,7 @@ public final class Transport {
}
}
}
return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
}
//---------------------随机取地址------------------------
@@ -266,14 +266,14 @@ public final class Transport {
@Override
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
@@ -289,7 +289,7 @@ public final class Transport {
future.complete(r);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
@@ -302,7 +302,7 @@ public final class Transport {
throw new RuntimeException("transport address = " + addr, ex);
}
}
private CompletableFuture<AsyncConnection> pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException {
//从可用/不可用的地址列表中创建连接
AtomicInteger count = new AtomicInteger(nodes.length);
@@ -319,17 +319,17 @@ public final class Transport {
public void completed(Void result, TransportNode attachment) {
try {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
} catch (Exception e) {
failed(e, attachment);
failed(e, attachment);
}
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
@@ -345,7 +345,7 @@ public final class Transport {
}
return future;
}
public void offerConnection(final boolean forceClose, AsyncConnection conn) {
if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return;
if (!forceClose && conn.isTCP()) {
@@ -359,7 +359,7 @@ public final class Transport {
conn.dispose();
}
}
public <A> void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) {
pollConnection(addr).whenComplete((conn, ex) -> {
if (ex != null) {
@@ -367,118 +367,119 @@ public final class Transport {
return;
}
conn.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
buffer.clear();
conn.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
conn.setReadBuffer(buffer);
conn.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (handler != null) handler.completed(result, att);
offerBuffer(buffer);
conn.offerBuffer(attachment);
offerConnection(false, conn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
conn.offerBuffer(attachment);
offerConnection(true, conn);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
conn.offerBuffer(attachment);
offerConnection(true, conn);
}
});
});
}
public static class TransportNode {
protected InetSocketAddress address;
protected volatile long disabletime; //不可用时的时间, 为0表示可用
protected final BlockingQueue<AsyncConnection> conns;
protected final ConcurrentHashMap<String, Object> attributes = new ConcurrentHashMap<>();
public TransportNode(int poolmaxconns, InetSocketAddress address) {
this.address = address;
this.disabletime = 0;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
@ConstructorParameters({"poolmaxconns", "address", "disabletime"})
public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) {
this.address = address;
this.disabletime = disabletime;
this.conns = new LinkedBlockingQueue<>(poolmaxconns);
}
public int getPoolmaxconns() {
return this.conns.remainingCapacity() + this.conns.size();
}
public <T> T setAttribute(String name, T value) {
attributes.put(name, value);
return value;
}
@SuppressWarnings("unchecked")
public <T> T getAttribute(String name) {
return (T) attributes.get(name);
}
@SuppressWarnings("unchecked")
public <T> T removeAttribute(String name) {
return (T) attributes.remove(name);
}
public TransportNode clearAttributes() {
attributes.clear();
return this;
}
public ConcurrentHashMap<String, Object> getAttributes() {
return attributes;
}
public void setAttributes(ConcurrentHashMap<String, Object> map) {
attributes.clear();
if (map != null) attributes.putAll(map);
}
public InetSocketAddress getAddress() {
return address;
}
public long getDisabletime() {
return disabletime;
}
@ConvertDisabled
public BlockingQueue<AsyncConnection> getConns() {
return conns;
}
public void dispose() {
AsyncConnection conn;
while ((conn = conns.poll()) != null) {
conn.dispose();
}
}
@Override
public int hashCode() {
return this.address.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
@@ -487,7 +488,7 @@ public final class Transport {
final TransportNode other = (TransportNode) obj;
return this.address.equals(other.address);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);

View File

@@ -393,33 +393,34 @@ public class TransportFactory {
final BlockingQueue<AsyncConnection> localqueue = queue;
localconn.write(sendBuffer, sendBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
localconn.write(buffer, buffer, this);
public void completed(Integer result, ByteBuffer wbuffer) {
if (wbuffer.hasRemaining()) {
localconn.write(wbuffer, wbuffer, this);
return;
}
ByteBuffer pongBuffer = bufferPool.get();
localconn.read(pongBuffer, pongBuffer, new CompletionHandler<Integer, ByteBuffer>() {
localconn.read(new CompletionHandler<Integer, ByteBuffer>() {
int counter = 0;
@Override
public void completed(Integer result, ByteBuffer attachment) {
public void completed(Integer result, ByteBuffer pongBuffer) {
if (counter > 3) {
bufferPool.accept(attachment);
localconn.offerBuffer(pongBuffer);
localconn.dispose();
return;
}
if (pongLength > 0 && attachment.position() < pongLength) {
if (pongLength > 0 && pongBuffer.position() < pongLength) {
counter++;
localconn.read(pongBuffer, pongBuffer, this);
localconn.setReadBuffer(pongBuffer);
localconn.read(this);
return;
}
bufferPool.accept(attachment);
localconn.offerBuffer(pongBuffer);
localqueue.offer(localconn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
public void failed(Throwable exc, ByteBuffer pongBuffer) {
localconn.offerBuffer(pongBuffer);
localconn.dispose();
}
});

View File

@@ -12,6 +12,8 @@ import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import javax.net.ssl.SSLContext;
/**
*
@@ -32,9 +34,11 @@ public class UdpBioAsyncConnection extends AsyncConnection {
private final boolean client;
public UdpBioAsyncConnection(final DatagramChannel ch, SocketAddress addr0,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
public UdpBioAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
super(bufferSupplier, bufferConsumer, sslContext);
this.channel = ch;
this.client = client0;
this.readTimeoutSeconds = readTimeoutSeconds0;
@@ -127,30 +131,27 @@ public class UdpBioAsyncConnection extends AsyncConnection {
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
public void read(CompletionHandler<Integer, ByteBuffer> handler) {
ByteBuffer dst = pollReadBuffer();
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
if (handler != null) handler.completed(rs, dst);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
if (handler != null) handler.failed(e, dst);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
public void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler) {
read(handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
public int read(ByteBuffer dst) throws IOException {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
return rs;
}
@Override
@@ -165,14 +166,10 @@ public class UdpBioAsyncConnection extends AsyncConnection {
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
public int write(ByteBuffer src) throws IOException {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
return rs;
}
@Override

View File

@@ -85,7 +85,8 @@ public class UdpBioProtocolServer extends ProtocolServer {
try {
SocketAddress address = serchannel.receive(buffer);
buffer.flip();
AsyncConnection conn = new UdpBioAsyncConnection(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
AsyncConnection conn = new UdpBioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), serchannel,
context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, buffer, null));
} catch (Exception e) {
context.offerBuffer(buffer);

View File

@@ -34,8 +34,6 @@ class WebSocketRunner implements Runnable {
protected final HttpContext context;
private ByteBuffer readBuffer;
volatile boolean closed = false;
private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用
@@ -50,7 +48,6 @@ class WebSocketRunner implements Runnable {
this.webSocket = webSocket;
this.restMessageConsumer = messageConsumer;
this.channel = channel;
this.readBuffer = context.pollBuffer();
}
@Override
@@ -61,7 +58,7 @@ class WebSocketRunner implements Runnable {
channel.setReadTimeoutSeconds(300); //读取超时5分钟
if (channel.isOpen()) {
final int wsmaxbody = webSocket._engine.wsmaxbody;
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
//尚未解析完的数据包
private WebSocketPacket unfinishPacket;
@@ -72,31 +69,27 @@ class WebSocketRunner implements Runnable {
private final SimpleEntry<String, byte[]> halfBytes = new SimpleEntry("", null);
@Override
public void completed(Integer count, Void attachment1) {
public void completed(Integer count, ByteBuffer readBuffer) {
if (count < 1) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid=" + webSocket.getUserid() + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
closeRunner(CLOSECODE_ILLPACKET, "read buffer count is " + count);
return;
}
try {
ByteBuffer readBuf = readBuffer;
if (readBuf == null) return; //关闭后readBuffer为null
lastReadTime = System.currentTimeMillis();
readBuf.flip();
readBuffer.flip();
WebSocketPacket onePacket = null;
if (unfinishPacket != null) {
if (unfinishPacket.receiveBody(webSocket, readBuf)) { //已经接收完毕
if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕
onePacket = unfinishPacket;
unfinishPacket = null;
for (ByteBuffer b : exBuffers) {
context.offerBuffer(b);
}
exBuffers.clear();
} else { //需要继续接收
readBuf = context.pollBuffer();
readBuffer = readBuf;
channel.read(readBuf, null, this);
} else { //需要继续接收, 此处不能回收readBuffer
channel.read(this);
return;
}
}
@@ -105,37 +98,36 @@ class WebSocketRunner implements Runnable {
if (onePacket != null) packets.add(onePacket);
try {
while (true) {
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuf);
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer);
if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节
if (packet != null && !packet.isReceiveFinished()) {
unfinishPacket = packet;
if (readBuf.hasRemaining()) {
exBuffers.add(readBuf);
readBuf = context.pollBuffer();
readBuffer = readBuf;
if (readBuffer.hasRemaining()) {
exBuffers.add(readBuffer);
}
break;
}
packets.add(packet);
if (packet == null || !readBuf.hasRemaining()) break;
if (packet == null || !readBuffer.hasRemaining()) break;
}
} catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e);
webSocket.onOccurException(e, null);
}
//继续监听消息
readBuf.clear();
readBuffer.clear();
if (halfBytes.getValue() != null) {
readBuf.put(halfBytes.getValue());
readBuffer.put(halfBytes.getValue());
halfBytes.setValue(null);
}
channel.read(readBuf, null, this);
channel.setReadBuffer(readBuffer);
channel.read(this);
//消息处理
for (final WebSocketPacket packet : packets) {
if (packet == null) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
failed(null, attachment1);
failed(null, readBuffer);
return;
}
@@ -197,7 +189,7 @@ class WebSocketRunner implements Runnable {
}
@Override
public void failed(Throwable exc, Void attachment2) {
public void failed(Throwable exc, ByteBuffer attachment2) {
if (exc != null) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
closeRunner(CLOSECODE_WSEXCEPTION, "read websocket-packet failed");
@@ -302,8 +294,6 @@ class WebSocketRunner implements Runnable {
if (closed) return null;
closed = true;
channel.dispose();
context.offerBuffer(readBuffer);
readBuffer = null;
CompletableFuture<Void> future = engine.removeThenClose(webSocket);
webSocket.onClose(code, reason);
return future;

View File

@@ -370,7 +370,6 @@ public final class SncpClient {
final ByteBuffer[] sendBuffers = writer.toBuffers();
fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength);
final ByteBuffer buffer = transport.pollBuffer();
conn.write(sendBuffers, sendBuffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@Override
@@ -393,25 +392,25 @@ public final class SncpClient {
conn.write(newattachs, newattachs, this);
return;
}
//----------------------- 读取返回结果 -------------------------------------
buffer.clear();
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
//----------------------- 读取返回结果 -------------------------------------
conn.read(new CompletionHandler<Integer, ByteBuffer>() {
private byte[] body;
private int received;
@Override
public void completed(Integer count, Void attachment2) {
public void completed(Integer count, ByteBuffer buffer) {
try {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data"));
transport.offerBuffer(buffer);
conn.offerBuffer(buffer);
transport.offerConnection(true, conn);
return;
}
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
conn.read(buffer, attachment2, this);
conn.setReadBuffer(buffer);
conn.read(this);
return;
}
buffer.flip();
@@ -421,8 +420,10 @@ public final class SncpClient {
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.read(buffer, attachment2, this);
conn.setReadBuffer(buffer);
conn.read(this);
} else {
conn.offerBuffer(buffer);
success();
}
return;
@@ -441,10 +442,12 @@ public final class SncpClient {
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear();
conn.read(buffer, attachment2, this);
conn.setReadBuffer(buffer);
conn.read(this);
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
conn.offerBuffer(buffer);
success();
}
} catch (Throwable e) {
@@ -461,7 +464,6 @@ public final class SncpClient {
@SuppressWarnings("unchecked")
public void success() {
future.complete(this.body);
transport.offerBuffer(buffer);
transport.offerConnection(false, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
@@ -484,9 +486,9 @@ public final class SncpClient {
}
@Override
public void failed(Throwable exc, Void attachment2) {
public void failed(Throwable exc, ByteBuffer attachment2) {
future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer);
conn.offerBuffer(attachment2);
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
@@ -500,7 +502,6 @@ public final class SncpClient {
@Override
public void failed(Throwable exc, ByteBuffer[] attachment) {
future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer);
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;

View File

@@ -133,7 +133,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
});
}
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
return AsyncConnection.createTCP(bufferPool, group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
conn.beforeCloseListener((c) -> {
semaphore.release();
closeCounter.incrementAndGet();
@@ -143,12 +143,11 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
final ByteBuffer buffer = reqConnectBuffer(conn);
if (buffer == null) {
final ByteBuffer rbuffer = bufferPool.get();
conn.read(rbuffer, null, new CompletionHandler<Integer, Void>() {
conn.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, Void attachment2) {
public void completed(Integer result, ByteBuffer rbuffer) {
if (result < 0) {
failed(new SQLException("Read Buffer Error"), attachment2);
failed(new SQLException("Read Buffer Error"), rbuffer);
return;
}
rbuffer.flip();
@@ -156,8 +155,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
}
@Override
public void failed(Throwable exc, Void attachment2) {
bufferPool.accept(rbuffer);
public void failed(Throwable exc, ByteBuffer rbuffer) {
conn.offerBuffer(rbuffer);
future.completeExceptionally(exc);
conn.dispose();
}
@@ -175,11 +174,12 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
return;
}
buffer.clear();
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
conn.setReadBuffer(buffer);
conn.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, Void attachment2) {
public void completed(Integer result, ByteBuffer rbuffer) {
if (result < 0) {
failed(new SQLException("Read Buffer Error"), attachment2);
failed(new SQLException("Read Buffer Error"), rbuffer);
return;
}
buffer.flip();
@@ -187,8 +187,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
}
@Override
public void failed(Throwable exc, Void attachment2) {
bufferPool.accept(buffer);
public void failed(Throwable exc, ByteBuffer rbuffer) {
conn.offerBuffer(rbuffer);
future.completeExceptionally(exc);
conn.dispose();
}