AsyncConnection区分read/write

This commit is contained in:
Redkale
2023-01-03 14:46:28 +08:00
parent 979a263c88
commit 44500b6500
17 changed files with 285 additions and 170 deletions

View File

@@ -44,7 +44,9 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
protected final AsyncGroup ioGroup;
protected final AsyncIOThread ioThread;
protected final AsyncIOThread ioReadThread;
protected final AsyncIOThread ioWriteThread;
protected final boolean client;
@@ -89,7 +91,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
Objects.requireNonNull(bufferConsumer);
this.client = client;
this.ioGroup = ioGroup;
this.ioThread = ioThread;
this.ioReadThread = ioThread;
this.ioWriteThread = ioThread;
this.bufferCapacity = bufferCapacity;
this.bufferSupplier = bufferSupplier;
this.bufferConsumer = bufferConsumer;
@@ -110,11 +113,19 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
public Supplier<ByteBuffer> getBufferSupplier() {
public Supplier<ByteBuffer> getReadBufferSupplier() {
return this.bufferSupplier;
}
public Consumer<ByteBuffer> getBufferConsumer() {
public Consumer<ByteBuffer> getReadBufferConsumer() {
return this.bufferConsumer;
}
public Supplier<ByteBuffer> getWriteBufferSupplier() {
return this.bufferSupplier;
}
public Consumer<ByteBuffer> getWriteBufferConsumer() {
return this.bufferConsumer;
}
@@ -138,24 +149,44 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
return eventing.decrementAndGet();
}
public final void execute(Runnable command) {
ioThread.execute(command);
public final void executeRead(Runnable command) {
ioReadThread.execute(command);
}
public final void execute(Runnable... commands) {
ioThread.execute(commands);
public final void executeRead(Runnable... commands) {
ioReadThread.execute(commands);
}
public final void execute(Collection<Runnable> commands) {
ioThread.execute(commands);
public final void executeRead(Collection<Runnable> commands) {
ioReadThread.execute(commands);
}
public final boolean inCurrThread() {
return ioThread.inCurrThread();
public final void executeWrite(Runnable command) {
ioWriteThread.execute(command);
}
public final AsyncIOThread getAsyncIOThread() {
return ioThread;
public final void executeWrite(Runnable... commands) {
ioWriteThread.execute(commands);
}
public final void executeWrite(Collection<Runnable> commands) {
ioWriteThread.execute(commands);
}
public final boolean inCurrReadThread() {
return ioReadThread.inCurrThread();
}
public final boolean inCurrWriteThread() {
return ioWriteThread.inCurrThread();
}
public final AsyncIOThread getReadIOThread() {
return ioReadThread;
}
public final AsyncIOThread getWriteIOThread() {
return ioWriteThread;
}
@Override
@@ -196,10 +227,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrThread()) {
if (inCurrReadThread()) {
startRead(handler);
} else {
execute(() -> startRead(handler));
executeRead(() -> startRead(handler));
}
}
@@ -212,10 +243,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
public final void readInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrThread()) {
if (inCurrReadThread()) {
read(handler);
} else {
execute(() -> read(handler));
executeRead(() -> read(handler));
}
}
@@ -294,13 +325,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
CompletionHandler<Integer, Void> newhandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
offerBuffer(buffer);
offerWriteBuffer(buffer);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, Void attachment) {
offerBuffer(buffer);
offerWriteBuffer(buffer);
handler.failed(exc, attachment);
}
};
@@ -318,13 +349,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
CompletionHandler<Integer, Void> newhandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
offerBuffer(buffers);
offerWriteBuffer(buffers);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, Void attachment) {
offerBuffer(buffers);
offerWriteBuffer(buffers);
handler.failed(exc, attachment);
}
};
@@ -358,13 +389,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
CompletionHandler<Integer, ? super A> newhandler = new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer result, A attachment) {
offerBuffer(srcs);
offerWriteBuffer(srcs);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
offerBuffer(srcs);
offerWriteBuffer(srcs);
handler.failed(exc, attachment);
}
};
@@ -386,7 +417,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
synchronized (this) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getBufferSupplier());
writer = ByteBufferWriter.create(getWriteBufferSupplier());
this.pipelineWriter = writer;
}
if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) {
@@ -424,7 +455,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
synchronized (this) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getBufferSupplier());
writer = ByteBufferWriter.create(getWriteBufferSupplier());
this.pipelineWriter = writer;
}
if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) {
@@ -564,14 +595,31 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
return bufferSupplier.get();
}
public void offerBuffer(ByteBuffer buffer) {
public void offerReadBuffer(ByteBuffer buffer) {
if (buffer == null) {
return;
}
bufferConsumer.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
public void offerReadBuffer(ByteBuffer... buffers) {
if (buffers == null) {
return;
}
Consumer<ByteBuffer> consumer = this.bufferConsumer;
for (ByteBuffer buffer : buffers) {
consumer.accept(buffer);
}
}
public void offerWriteBuffer(ByteBuffer buffer) {
if (buffer == null) {
return;
}
bufferConsumer.accept(buffer);
}
public void offerWriteBuffer(ByteBuffer... buffers) {
if (buffers == null) {
return;
}
@@ -712,8 +760,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
SSLEngineResult engineResult = engine.unwrap(netBuffer, appBuffer);
if (engineResult.getStatus() == SSLEngineResult.Status.CLOSED
&& (engineResult.getHandshakeStatus() == NOT_HANDSHAKING || engineResult.getHandshakeStatus() == FINISHED)) {
offerBuffer(netBuffer);
offerBuffer(appBuffer);
offerReadBuffer(netBuffer);
offerReadBuffer(appBuffer);
return null;
}
hss = engineResult.getHandshakeStatus();
@@ -750,7 +798,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
return; //CLOSEDnetBuffer已被回收
}
if (AsyncConnection.this.readSSLHalfBuffer != netBuffer) {
offerBuffer(netBuffer);
offerReadBuffer(netBuffer);
}
if (AsyncConnection.this.readBuffer != null) {
ByteBuffer rsBuffer = AsyncConnection.this.readBuffer;
@@ -758,7 +806,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
appBuffer.flip();
if (rsBuffer.remaining() >= appBuffer.remaining()) {
rsBuffer.put(appBuffer);
offerBuffer(appBuffer);
offerReadBuffer(appBuffer);
appBuffer = rsBuffer;
} else {
while (rsBuffer.hasRemaining()) rsBuffer.put(appBuffer.get());
@@ -860,13 +908,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeImpl(netBuffers[0], null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment) {
offerBuffer(netBuffers[0]);
offerWriteBuffer(netBuffers[0]);
callback.accept(null);
}
@Override
public void failed(Throwable t, Void attachment) {
offerBuffer(netBuffers[0]);
offerWriteBuffer(netBuffers[0]);
callback.accept(t);
}
});
@@ -874,20 +922,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment) {
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
callback.accept(null);
}
@Override
public void failed(Throwable t, Void attachment) {
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
callback.accept(t);
}
});
}
return true;
} else {
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
return false;
}
}
@@ -899,13 +947,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeImpl(netBuffers[0], null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment) {
offerBuffer(netBuffers[0]);
offerWriteBuffer(netBuffers[0]);
callback.accept(null);
}
@Override
public void failed(Throwable t, Void attachment) {
offerBuffer(netBuffers[0]);
offerWriteBuffer(netBuffers[0]);
callback.accept(t);
}
});
@@ -913,20 +961,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment) {
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
callback.accept(null);
}
@Override
public void failed(Throwable t, Void attachment) {
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
callback.accept(t);
}
});
}
return true;
} else {
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
return false;
}
}
@@ -974,7 +1022,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
if (count < 1) {
callback.accept(new IOException("read data error"));
} else {
offerBuffer(attachment);
offerReadBuffer(attachment);
doHandshake(callback);
}
}

View File

@@ -30,6 +30,7 @@ public class AsyncIOThread extends WorkThread {
final Selector selector;
//如果有read/write两IOThread只记readThread
final AtomicInteger connCounter = new AtomicInteger();
private final Supplier<ByteBuffer> bufferSupplier;

View File

@@ -22,6 +22,8 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
private final AsyncNioConnection conn;
private final boolean readMode;
private CompletionHandler<Integer, A> handler;
private A attachment;
@@ -32,8 +34,9 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
private ByteBuffer buffer;
public AsyncNioCompletionHandler(AsyncNioConnection conn) {
public AsyncNioCompletionHandler(boolean readFlag, AsyncNioConnection conn) {
this.conn = conn;
this.readMode = readFlag;
}
public void handler(CompletionHandler<Integer, A> handler, A attachment) {
@@ -70,9 +73,17 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
}
if (conn != null) {
if (buffers != null) {
conn.offerBuffer(buffers);
if (readMode) {
conn.offerReadBuffer(buffers);
} else {
conn.offerWriteBuffer(buffers);
}
} else if (buffer != null) {
conn.offerBuffer(buffer);
if (readMode) {
conn.offerReadBuffer(buffer);
} else {
conn.offerWriteBuffer(buffer);
}
}
}
CompletionHandler<Integer, A> handler0 = handler;
@@ -90,9 +101,17 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
}
if (conn != null) {
if (buffers != null) {
conn.offerBuffer(buffers);
if (readMode) {
conn.offerReadBuffer(buffers);
} else {
conn.offerWriteBuffer(buffers);
}
} else if (buffer != null) {
conn.offerBuffer(buffer);
if (readMode) {
conn.offerReadBuffer(buffer);
} else {
conn.offerWriteBuffer(buffer);
}
}
}
CompletionHandler<Integer, A> handler0 = handler;
@@ -105,9 +124,17 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
public void run() {
if (conn != null) {
if (buffers != null) {
conn.offerBuffer(buffers);
if (readMode) {
conn.offerReadBuffer(buffers);
} else {
conn.offerWriteBuffer(buffers);
}
} else if (buffer != null) {
conn.offerBuffer(buffer);
if (readMode) {
conn.offerReadBuffer(buffer);
} else {
conn.offerWriteBuffer(buffer);
}
}
}
CompletionHandler<Integer, A> handler0 = handler;

View File

@@ -43,7 +43,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey connectKey;
//-------------------------------- 读操作 --------------------------------------
protected final AsyncNioCompletionHandler<ByteBuffer> readTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(this);
protected final AsyncNioCompletionHandler<ByteBuffer> readTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(true, this);
protected int readTimeoutSeconds;
@@ -58,7 +58,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey readKey;
//-------------------------------- 写操作 --------------------------------------
protected final AsyncNioCompletionHandler<Object> writeTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(this);
protected final AsyncNioCompletionHandler<Object> writeTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(false, this);
protected int writeTimeoutSeconds;
@@ -138,7 +138,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override
protected void startHandshake(final Consumer<Throwable> callback) {
((AsyncIOThread) ioThread).register(t -> super.startHandshake(callback));
ioReadThread.register(t -> super.startHandshake(callback));
}
@Override
@@ -168,9 +168,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.readCompletionHandler = handler;
}
if (client) {
doRead(this.ioThread.inCurrThread());
doRead(this.ioReadThread.inCurrThread());
} else {
doRead(currReadInvoker < MAX_INVOKER_ONSTACK || this.ioThread.inCurrThread()); //同一线程中Selector.wakeup无效
doRead(currReadInvoker < MAX_INVOKER_ONSTACK || this.ioReadThread.inCurrThread()); //同一线程中Selector.wakeup无效
}
}
@@ -285,7 +285,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (readCount != 0) {
handleRead(readCount, null);
} else if (readKey == null) {
((AsyncIOThread) ioThread).register(selector -> {
ioReadThread.register(selector -> {
try {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
@@ -294,7 +294,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
});
} else {
((AsyncIOGroup) ioGroup).interestOpsOr((AsyncIOThread) ioThread, readKey, SelectionKey.OP_READ);
((AsyncIOGroup) ioGroup).interestOpsOr(ioReadThread, readKey, SelectionKey.OP_READ);
}
} catch (Exception e) {
handleRead(0, e);
@@ -333,7 +333,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
writeByteTuple2Callback = null;
writeByteTuple2Attachment = null;
} else {
ByteBufferWriter writer = ByteBufferWriter.create(getBufferSupplier(), buffer);
ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
if (writeByteTuple2Length > 0) {
writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
@@ -399,7 +399,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeOver && (totalCount != 0 || !hasRemain)) {
handleWrite(writeTotal + totalCount, null);
} else if (writeKey == null) {
((AsyncIOThread) ioThread).register(selector -> {
ioWriteThread.register(selector -> {
try {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
@@ -408,7 +408,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
});
} else {
((AsyncIOGroup) ioGroup).interestOpsOr((AsyncIOThread) ioThread, writeKey, SelectionKey.OP_WRITE);
((AsyncIOGroup) ioGroup).interestOpsOr(ioWriteThread, writeKey, SelectionKey.OP_WRITE);
}
} catch (IOException e) {
handleWrite(0, e);
@@ -428,14 +428,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.connectPending = false;//必须放最后
if (handler != null) {
if (!client || inCurrThread()) { //client模式下必须保证read、write在ioThread内运行
if (!client || inCurrWriteThread()) { //client模式下必须保证read、write在ioThread内运行
if (t == null) {
handler.completed(null, attach);
} else {
handler.failed(t, attach);
}
} else {
ioThread.execute(() -> {
ioWriteThread.execute(() -> {
if (t == null) {
handler.completed(null, attach);
} else {
@@ -537,7 +537,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override
public void close() throws IOException {
if (bb != null) {
offerBuffer(bb);
offerReadBuffer(bb);
bb = null;
}
reader.close();

View File

@@ -11,7 +11,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.*;
import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferReader;
@@ -44,21 +43,6 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
ioThread.connCounter.incrementAndGet();
}
public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) {
super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, bufferSupplier, bufferConsumer, sslBuilder, sslContext, livingCounter, closedCounter);
this.channel = ch;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
}
@Override
public boolean isOpen() {
return this.channel.isOpen();
@@ -113,6 +97,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
}
}
@Override
public ReadableByteChannel readableByteChannel() {
if (this.sslEngine == null) {
return this.channel;
@@ -131,7 +116,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
bb.put(halfBuffer.get());
}
if (!halfBuffer.hasRemaining()) {
offerBuffer(halfBuffer);
offerReadBuffer(halfBuffer);
halfBuffer = null;
}
return bb.position() - pos;
@@ -150,11 +135,11 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
if (appBuffer.hasRemaining()) {
halfBuffer = appBuffer;
} else {
offerBuffer(appBuffer);
offerReadBuffer(appBuffer);
}
return bb.position() - pos;
} else {
offerBuffer(netBuffer);
offerReadBuffer(netBuffer);
return 0;
}
}
@@ -167,7 +152,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
@Override
public void close() throws IOException {
if (halfBuffer != null) {
offerBuffer(halfBuffer);
offerReadBuffer(halfBuffer);
halfBuffer = null;
}
self.close();
@@ -199,7 +184,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
channel.write(netBuffers);
}
}
offerBuffer(netBuffers);
offerWriteBuffer(netBuffers);
return len;
}
@@ -285,7 +270,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
@Override
public final void close() throws IOException {
super.close();
((AsyncIOThread) ioThread).connCounter.decrementAndGet();
ioReadThread.connCounter.decrementAndGet();
channel.shutdownInput();
channel.shutdownOutput();
channel.close();

View File

@@ -107,7 +107,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
return pool == null ? safeResponsePool.get() : pool.get();
};
this.responseConsumer = (v) -> {
WorkThread thread = v.channel != null ? v.channel.getAsyncIOThread() : v.thread;
WorkThread thread = v.channel != null ? v.channel.getWriteIOThread() : v.thread;
if (thread != null && !thread.inCurrThread()) {
thread.execute(() -> {
ObjectPool<Response> pool = localResponsePool.get();

View File

@@ -11,7 +11,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.*;
import javax.net.ssl.SSLContext;
/**
@@ -40,23 +39,6 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
this.remoteAddress = addr;
}
public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread,
Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0,
LongAdder livingCounter, LongAdder closedCounter) {
super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, bufferSupplier, bufferConsumer, sslBuilder, sslContext, livingCounter, closedCounter);
this.channel = ch;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
}
@Override
public boolean isOpen() {
return this.channel.isOpen();

View File

@@ -56,7 +56,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
@Override
public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) {
channel.offerBuffer(buffer);
channel.offerReadBuffer(buffer);
channel.dispose(); // response.init(channel); 在调用之前异常
return;
}
@@ -78,7 +78,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
channel.offerBuffer(buffer);
channel.offerReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
if (exc != null && context.logger.isLoggable(Level.FINEST)
&& !(exc instanceof SocketException && "Connection reset".equals(exc.getMessage()))) {
@@ -135,7 +135,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (rs < 0) { //表示数据格式不正确
final DispatcherServlet preparer = context.prepare;
preparer.incrExecuteCounter();
channel.offerBuffer(buffer);
channel.offerReadBuffer(buffer);
if (rs != Integer.MIN_VALUE) {
preparer.incrIllRequestCounter();
}
@@ -178,7 +178,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
@Override
public void completed(Integer count, ByteBuffer attachment) {
if (count < 1) {
channel.offerBuffer(attachment);
channel.offerReadBuffer(attachment);
channel.dispose();
return;
}
@@ -189,7 +189,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
context.prepare.incrIllRequestCounter();
channel.offerBuffer(attachment);
channel.offerReadBuffer(attachment);
response.finish(true);
if (exc != null) {
request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);

View File

@@ -68,7 +68,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment != writeBuffer) {
channel.offerBuffer(attachment);
channel.offerWriteBuffer(attachment);
} else {
attachment.clear();
}
@@ -78,7 +78,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
if (attachment != writeBuffer) {
channel.offerBuffer(attachment);
channel.offerWriteBuffer(attachment);
} else {
attachment.clear();
}
@@ -93,7 +93,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void completed(final Integer result, final ByteBuffer[] attachments) {
if (attachments != null) {
for (ByteBuffer attachment : attachments) {
channel.offerBuffer(attachment);
channel.offerWriteBuffer(attachment);
}
}
finish();
@@ -103,7 +103,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void failed(Throwable exc, final ByteBuffer[] attachments) {
if (attachments != null) {
for (ByteBuffer attachment : attachments) {
channel.offerBuffer(attachment);
channel.offerWriteBuffer(attachment);
}
}
finish(true);
@@ -402,7 +402,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, A attachment) {
if (buffer != writeBuffer) {
channel.offerBuffer(buffer);
channel.offerWriteBuffer(buffer);
} else {
buffer.clear();
}
@@ -414,7 +414,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void failed(Throwable exc, A attachment) {
if (buffer != writeBuffer) {
channel.offerBuffer(buffer);
channel.offerWriteBuffer(buffer);
} else {
buffer.clear();
}
@@ -431,7 +431,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, A attachment) {
channel.offerBuffer(buffers);
channel.offerWriteBuffer(buffers);
if (handler != null) {
handler.completed(result, attachment);
}
@@ -440,7 +440,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) {
channel.offerBuffer(buffer);
channel.offerWriteBuffer(buffer);
}
if (handler != null) {
handler.failed(exc, attachment);

View File

@@ -365,13 +365,13 @@ public final class Transport {
if (handler != null) {
handler.completed(result, att);
}
conn.offerBuffer(attachment);
conn.offerWriteBuffer(attachment);
offerConnection(false, conn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
conn.offerBuffer(attachment);
conn.offerWriteBuffer(attachment);
offerConnection(true, conn);
}
});
@@ -380,7 +380,7 @@ public final class Transport {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
conn.offerBuffer(attachment);
conn.offerWriteBuffer(attachment);
offerConnection(true, conn);
}
});

View File

@@ -347,7 +347,7 @@ public class TransportFactory {
@Override
public void completed(Integer result, ByteBuffer pongBuffer) {
if (counter > 3) {
localconn.offerBuffer(pongBuffer);
localconn.offerWriteBuffer(pongBuffer);
localconn.dispose();
return;
}
@@ -357,13 +357,13 @@ public class TransportFactory {
localconn.read(this);
return;
}
localconn.offerBuffer(pongBuffer);
localconn.offerWriteBuffer(pongBuffer);
localqueue.offer(localconn);
}
@Override
public void failed(Throwable exc, ByteBuffer pongBuffer) {
localconn.offerBuffer(pongBuffer);
localconn.offerWriteBuffer(pongBuffer);
localconn.dispose();
}
});

View File

@@ -63,7 +63,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
public void completed(Integer result, Void attachment) {
if (writeLastRequest != null && writeLastRequest == client.closeRequest) {
if (closeFuture != null) {
channel.getAsyncIOThread().runWork(() -> {
channel.getWriteIOThread().runWork(() -> {
closeFuture.complete(null);
});
}
@@ -251,7 +251,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
// }
// }
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = channel.getAsyncIOThread();
workThread = channel.getReadIOThread();
}
if (rs.exc != null) {
workThread.runWork(() -> {
@@ -362,10 +362,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
}
respWaitingCounter.increment(); //放在writeChannelInThread计数会延迟导致不准确
if (channel.inCurrThread()) {
if (channel.inCurrWriteThread()) {
writeChannelInThread(request, respFuture);
} else {
channel.execute(() -> writeChannelInThread(request, respFuture));
channel.executeWrite(() -> writeChannelInThread(request, respFuture));
}
return respFuture;
}
@@ -427,7 +427,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f;
respWaitingCounter.reset();
WorkThread thread = channel.getAsyncIOThread();
WorkThread thread = channel.getReadIOThread();
if (!responseQueue.isEmpty()) {
while ((f = responseQueue.poll()) != null) {
CompletableFuture future = f;

View File

@@ -64,10 +64,10 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
return;
}
AsyncConnection channel = conn.getChannel();
if (channel.inCurrThread()) {
if (channel.inCurrReadThread()) {
this.runTimeout();
} else {
channel.execute(this::runTimeout);
channel.executeRead(this::runTimeout);
}
}
@@ -93,7 +93,7 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
// workThread.execute(() -> completeExceptionally(ex));
// }
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = conn.getChannel().getAsyncIOThread();
workThread = conn.getChannel().getReadIOThread();
}
workThread.runWork(() -> completeExceptionally(ex));
}

View File

@@ -1331,7 +1331,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
try {
if (fileChannel != null && sends >= limit) {
if (buffer != null) {
channel.offerBuffer(buffer);
channel.offerWriteBuffer(buffer);
}
try {
fileChannel.close();
@@ -1377,7 +1377,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override
public void failed(Throwable exc, Void attachment) {
if (buffer != null) {
channel.offerBuffer(buffer);
channel.offerWriteBuffer(buffer);
}
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "finishFile error", exc);

View File

@@ -120,7 +120,9 @@ public class HttpSimpleClient {
});
}
array.put((byte) '\r', (byte) '\n');
if (body != null) array.put(body);
if (body != null) {
array.put(body);
}
final CompletableFuture<HttpResult<byte[]>> future = new CompletableFuture();
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
@@ -213,14 +215,16 @@ public class HttpSimpleClient {
return;
}
}
if (buffer.hasRemaining()) array.put(buffer, buffer.remaining());
if (buffer.hasRemaining()) {
array.put(buffer, buffer.remaining());
}
this.readState = READ_STATE_END;
}
if (responseResult.getStatus() <= 200) {
this.responseResult.setResult(array.getBytes());
}
this.future.complete(this.responseResult);
conn.offerBuffer(buffer);
conn.offerReadBuffer(buffer);
conn.dispose();
}
@@ -240,7 +244,9 @@ public class HttpSimpleClient {
buffer.put((byte) '\r');
return 1;
}
if (buffer.get() != '\n') return -1;
if (buffer.get() != '\n') {
return -1;
}
break;
}
bytes.put(b);
@@ -273,7 +279,9 @@ public class HttpSimpleClient {
remain--;
byte b1 = buffer.get();
byte b2 = buffer.get();
if (b1 == '\r' && b2 == '\n') return 0;
if (b1 == '\r' && b2 == '\n') {
return 0;
}
bytes.put(b1, b2);
for (;;) { // name
if (remain-- < 1) {
@@ -282,7 +290,9 @@ public class HttpSimpleClient {
return 1;
}
byte b = buffer.get();
if (b == ':') break;
if (b == ':') {
break;
}
bytes.put(b);
}
String name = parseHeaderName(bytes, null);
@@ -317,7 +327,9 @@ public class HttpSimpleClient {
buffer.put((byte) '\r');
return 1;
}
if (buffer.get() != '\n') return -1;
if (buffer.get() != '\n') {
return -1;
}
break;
}
if (first) {
@@ -346,7 +358,7 @@ public class HttpSimpleClient {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
conn.offerBuffer(attachment);
conn.offerReadBuffer(attachment);
conn.dispose();
future.completeExceptionally(exc);
}

View File

@@ -139,7 +139,9 @@ public abstract class WebSocket<G extends Serializable, T> {
}
public final CompletableFuture<Integer> sendPing(byte[] data) {
if (data == null) return sendPing();
if (data == null) {
return sendPing();
}
this.lastPingTime = System.currentTimeMillis();
return sendPacket(new WebSocketPacket(FrameType.PING, data));
}
@@ -237,7 +239,9 @@ public abstract class WebSocket<G extends Serializable, T> {
*/
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
if (this._writeHandler == null) {
if (delayPackets == null) delayPackets = new ArrayList<>();
if (delayPackets == null) {
delayPackets = new ArrayList<>();
}
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
}
@@ -355,12 +359,16 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, boolean last, Serializable... userids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (_engine.node == null) {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids));
}
CompletableFuture<Integer> rs = _engine.node.sendMessage(convert, message, last, userids);
if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")");
if (_engine.logger.isLoggable(Level.FINER)) {
_engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")");
}
return rs;
}
@@ -461,12 +469,16 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (_engine.node == null) {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last));
}
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(wsrange, convert, message, last);
if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("broadcast send websocket message(" + message + ")");
if (_engine.logger.isLoggable(Level.FINER)) {
_engine.logger.finer("broadcast send websocket message(" + message + ")");
}
return rs;
}
@@ -479,9 +491,13 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendAction(final WebSocketAction action, Serializable... userids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (_engine.node == null) {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
CompletableFuture<Integer> rs = _engine.node.sendAction(action, userids);
if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")");
if (_engine.logger.isLoggable(Level.FINER)) {
_engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")");
}
return rs;
}
@@ -493,9 +509,13 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (_engine.node == null) {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
CompletableFuture<Integer> rs = _engine.node.broadcastAction(action);
if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("broadcast send websocket action(" + action + ")");
if (_engine.logger.isLoggable(Level.FINER)) {
_engine.logger.finer("broadcast send websocket action(" + action + ")");
}
return rs;
}
@@ -508,7 +528,9 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 地址列表
*/
public CompletableFuture<Set<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null);
if (_engine.node == null) {
return CompletableFuture.completedFuture(null);
}
return _engine.node.getRpcNodeAddresses(userid);
}
@@ -522,7 +544,9 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 地址集合
*/
public CompletableFuture<Map<WebSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null);
if (_engine.node == null) {
return CompletableFuture.completedFuture(null);
}
return _engine.node.getRpcNodeWebSocketAddresses(userid);
}
@@ -534,7 +558,9 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return CompletableFuture
*/
public CompletableFuture<Void> changeUserid(final G newuserid) {
if (newuserid == null) throw new NullPointerException("newuserid is null");
if (newuserid == null) {
throw new NullPointerException("newuserid is null");
}
return _engine.changeLocalUserid(this, newuserid);
}
@@ -593,7 +619,9 @@ public abstract class WebSocket<G extends Serializable, T> {
* @param value 属性值
*/
public final void setAttribute(String name, Object value) {
if (attributes == null) attributes = new HashMap<>();
if (attributes == null) {
attributes = new HashMap<>();
}
attributes.put(name, value);
}
@@ -696,8 +724,8 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return Supplier
*/
protected Supplier<ByteBuffer> getBufferSupplier() {
return this._channel.getBufferSupplier();
protected Supplier<ByteBuffer> getReadBufferSupplier() {
return this._channel.getReadBufferSupplier();
}
/**
@@ -705,8 +733,26 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getBufferConsumer() {
return this._channel.getBufferConsumer();
protected Consumer<ByteBuffer> getReadBufferConsumer() {
return this._channel.getReadBufferConsumer();
}
/**
* 获取ByteBuffer生成器
*
* @return Supplier
*/
protected Supplier<ByteBuffer> getWriteBufferSupplier() {
return this._channel.getWriteBufferSupplier();
}
/**
* 获取ByteBuffer回收器
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getWriteBufferConsumer() {
return this._channel.getWriteBufferConsumer();
}
//-------------------------------------------------------------------
@@ -896,23 +942,37 @@ public abstract class WebSocket<G extends Serializable, T> {
* 显式地关闭WebSocket
*/
public final void close() {
if (this.deflater != null) this.deflater.end();
if (this.inflater != null) this.inflater.end();
if (this.deflater != null) {
this.deflater.end();
}
if (this.inflater != null) {
this.inflater.end();
}
CompletableFuture<Void> future = kill(CLOSECODE_SERVERCLOSE, "user close");
if (future != null) future.join();
if (future != null) {
future.join();
}
}
//closeRunner
CompletableFuture<Void> kill(int code, String reason) {
if (closed) return null;
if (closed) {
return null;
}
synchronized (this) {
if (closed) return null;
if (closed) {
return null;
}
closed = true;
if (_channel == null) return null;
if (_channel == null) {
return null;
}
CompletableFuture<Void> future = _engine.removeLocalThenDisconnect(this);
_channel.dispose();
CompletableFuture closeFuture = onClose(code, reason);
if (closeFuture == null) return future;
if (closeFuture == null) {
return future;
}
return CompletableFuture.allOf(future, closeFuture);
}
}

View File

@@ -381,7 +381,7 @@ public final class SncpClient {
try {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params)));
conn.offerBuffer(buffer);
conn.offerReadBuffer(buffer);
transport.offerConnection(true, conn);
return;
}
@@ -400,7 +400,7 @@ public final class SncpClient {
conn.setReadBuffer(buffer);
conn.read(this);
} else {
conn.offerBuffer(buffer);
conn.offerReadBuffer(buffer);
success();
}
return;
@@ -424,7 +424,7 @@ public final class SncpClient {
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
conn.offerBuffer(buffer);
conn.offerReadBuffer(buffer);
success();
}
} catch (Throwable e) {
@@ -465,7 +465,7 @@ public final class SncpClient {
@Override
public void failed(Throwable exc, ByteBuffer attachment2) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
conn.offerBuffer(attachment2);
conn.offerReadBuffer(attachment2);
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;