AsyncConnection.write调整

This commit is contained in:
Redkale
2018-06-12 09:15:32 +08:00
parent 3c645b4c7f
commit 58eb3c5d64
3 changed files with 234 additions and 47 deletions

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net;
import java.io.IOException;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.TimeUnit;
@@ -20,7 +21,7 @@ import org.redkale.util.*;
* @author zhangjx
*/
@SuppressWarnings("unchecked")
public final class PrepareRunner implements Runnable {
public class PrepareRunner implements Runnable {
private final AsyncConnection channel;
@@ -105,4 +106,39 @@ public final class PrepareRunner implements Runnable {
}
}
protected void prepare(ByteBuffer buffer, Request request, Response response) throws IOException {
context.prepare.prepare(buffer, request, response);
}
protected void initResponse(Response response, AsyncConnection channel) {
response.init(channel);
}
protected Response pollResponse() {
return context.responsePool.get();
}
protected Request pollRequest(Response response) {
return response.request;
}
protected AsyncConnection removeChannel(Response response) {
return response.removeChannel();
}
protected ByteBuffer pollReadBuffer(Request request) {
return request.pollReadBuffer();
}
protected ByteBuffer pollReadBuffer(Response response) {
return response.request.pollReadBuffer();
}
protected void offerReadBuffer(Request request, ByteBuffer buffer) {
request.offerReadBuffer(buffer);
}
protected void offerReadBuffer(Response response, ByteBuffer buffer) {
response.request.offerReadBuffer(buffer);
}
}

View File

@@ -23,6 +23,8 @@ import javax.net.ssl.SSLContext;
*/
public class TcpAioAsyncConnection extends AsyncConnection {
private final Semaphore semaphore = new Semaphore(1);
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
@@ -31,6 +33,8 @@ public class TcpAioAsyncConnection extends AsyncConnection {
private final SocketAddress remoteAddress;
private BlockingQueue<WriteEntry> writeQueue;
public TcpAioAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
@@ -102,33 +106,78 @@ public class TcpAioAsyncConnection extends AsyncConnection {
channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler);
}
private <A> void nextWrite(A attachment) {
BlockingQueue<WriteEntry> queue = this.writeQueue;
WriteEntry entry = queue == null ? null : queue.poll();
if (entry != null) {
try {
if (entry.writeOneBuffer == null) {
write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler);
} else {
write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler);
}
} catch (Exception e) {
entry.writeHandler.failed(e, entry.writeAttachment);
}
} else {
semaphore.release();
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(true, src, attachment, handler);
}
private <A> void write(boolean acquire, ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (acquire && !semaphore.tryAcquire()) {
if (this.writeQueue == null) {
synchronized (semaphore) {
if (this.writeQueue == null) {
this.writeQueue = new LinkedBlockingDeque<>();
}
}
}
this.writeQueue.add(new WriteEntry(src, attachment, handler));
return;
}
WriteOneCompletionHandler newHandler = new WriteOneCompletionHandler(src, handler);
if (!channel.isOpen()) {
newHandler.failed(new ClosedChannelException(), attachment);
return;
}
this.writetime = System.currentTimeMillis();
if (writeTimeoutSeconds > 0) {
channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, newHandler);
} else {
channel.write(src, attachment, handler);
channel.write(src, attachment, newHandler);
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
write(true, srcs, offset, length, attachment, handler);
}
private <A> void write(boolean acquire, ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
if (acquire && !semaphore.tryAcquire()) {
if (this.writeQueue == null) {
synchronized (semaphore) {
if (this.writeQueue == null) {
this.writeQueue = new LinkedBlockingDeque<>();
}
}
}
this.writeQueue.add(new WriteEntry(srcs, offset, length, attachment, handler));
return;
}
WriteMoreCompletionHandler newHandler = new WriteMoreCompletionHandler(srcs, offset, length, handler);
if (!channel.isOpen()) {
newHandler.failed(new ClosedChannelException(), attachment);
return;
}
this.writetime = System.currentTimeMillis();
channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS,
attachment, new CompletionHandler<Long, A>() {
@Override
public void completed(Long result, A attachment) {
handler.completed(result.intValue(), attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, attachment, newHandler);
}
@Override
@@ -179,6 +228,17 @@ public class TcpAioAsyncConnection extends AsyncConnection {
public final void close() throws IOException {
super.close();
channel.close();
BlockingQueue<WriteEntry> queue = this.writeQueue;
if (queue == null) return;
WriteEntry entry;
Exception ex = null;
while ((entry = queue.poll()) != null) {
if (ex == null) ex = new ClosedChannelException();
try {
entry.writeHandler.failed(ex, entry.writeAttachment);
} catch (Exception e) {
}
}
}
@Override
@@ -191,4 +251,125 @@ public class TcpAioAsyncConnection extends AsyncConnection {
return true;
}
private class WriteMoreCompletionHandler<A> implements CompletionHandler<Long, A> {
private final CompletionHandler<Integer, A> writeHandler;
private final ByteBuffer[] writeBuffers;
private int writeOffset;
private int writeLength;
private int writeCount;
public WriteMoreCompletionHandler(ByteBuffer[] buffers, int offset, int length, CompletionHandler handler) {
this.writeBuffers = buffers;
this.writeOffset = offset;
this.writeLength = length;
this.writeHandler = handler;
}
@Override
public void completed(Long result, A attachment) {
if (result >= 0) {
writeCount += result;
try {
int index = -1;
for (int i = writeOffset; i < (writeOffset + writeLength); i++) {
if (writeBuffers[i].hasRemaining()) {
index = i;
break;
}
}
if (index >= 0) {
writeOffset += index;
writeLength -= index;
channel.write(writeBuffers, writeOffset, writeLength, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, attachment, this);
return;
}
} catch (Exception e) {
failed(e, attachment);
return;
}
nextWrite(attachment);
writeHandler.completed(writeCount, attachment);
} else {
nextWrite(attachment);
writeHandler.completed(result.intValue(), attachment);
}
}
@Override
public void failed(Throwable exc, A attachment) {
nextWrite(attachment);
writeHandler.failed(exc, attachment);
}
}
private class WriteOneCompletionHandler<A> implements CompletionHandler<Integer, A> {
private final CompletionHandler writeHandler;
private final ByteBuffer writeOneBuffer;
public WriteOneCompletionHandler(ByteBuffer buffer, CompletionHandler handler) {
this.writeOneBuffer = buffer;
this.writeHandler = handler;
}
@Override
public void completed(Integer result, A attachment) {
try {
if (writeOneBuffer.hasRemaining()) {
channel.write(writeOneBuffer, attachment, this);
return;
}
} catch (Exception e) {
failed(e, attachment);
return;
}
nextWrite(attachment);
writeHandler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
nextWrite(attachment);
writeHandler.failed(exc, attachment);
}
}
private static class WriteEntry {
ByteBuffer writeOneBuffer;
ByteBuffer[] writeBuffers;
int writingCount;
int writeOffset;
int writeLength;
Object writeAttachment;
CompletionHandler writeHandler;
public WriteEntry(ByteBuffer writeOneBuffer, Object writeAttachment, CompletionHandler writeHandler) {
this.writeOneBuffer = writeOneBuffer;
this.writeAttachment = writeAttachment;
this.writeHandler = writeHandler;
}
public WriteEntry(ByteBuffer[] writeBuffers, int writeOffset, int writeLength, Object writeAttachment, CompletionHandler writeHandler) {
this.writeBuffers = writeBuffers;
this.writeOffset = writeOffset;
this.writeLength = writeLength;
this.writeAttachment = writeAttachment;
this.writeHandler = writeHandler;
}
}
}

View File

@@ -13,7 +13,6 @@ import java.nio.channels.*;
import java.util.*;
import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.logging.*;
@@ -39,10 +38,6 @@ class WebSocketRunner implements Runnable {
volatile boolean closed = false;
private final AtomicBoolean writing = new AtomicBoolean();
private final BlockingQueue<QueueEntry> writeQueue = new ArrayBlockingQueue(512);
private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用
protected long lastSendTime;
@@ -228,12 +223,6 @@ class WebSocketRunner implements Runnable {
//System.out.println("推送消息");
final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
try {
synchronized (writing) {
if (writing.getAndSet(true)) {
writeQueue.add(new QueueEntry(futureResult, packet));
return futureResult;
}
}
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor);
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
@@ -277,18 +266,6 @@ class WebSocketRunner implements Runnable {
}
}
}
QueueEntry entry = null;
synchronized (writing) {
entry = writeQueue.poll();
if (entry == null) writing.set(false);
}
if (entry != null) {
future = entry.future;
ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), webSocket._engine.cryptor);
lastSendTime = System.currentTimeMillis();
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + entry.packet);
channel.write(buffers, buffers, this);
}
} catch (Exception e) {
future.complete(RETCODE_SENDEXCEPTION);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite");
@@ -298,7 +275,6 @@ class WebSocketRunner implements Runnable {
@Override
public void failed(Throwable exc, ByteBuffer[] attachments) {
writing.set(false);
future.complete(RETCODE_SENDEXCEPTION);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler");
if (exc != null) {
@@ -308,7 +284,6 @@ class WebSocketRunner implements Runnable {
}
});
} catch (Exception t) {
writing.set(false);
futureResult.complete(RETCODE_SENDEXCEPTION);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on channel.write");
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
@@ -331,11 +306,6 @@ class WebSocketRunner implements Runnable {
readBuffer = null;
engine.removeThenClose(webSocket);
webSocket.onClose(code, reason);
QueueEntry entry = writeQueue.poll();
while (entry != null) {
entry.future.complete(RETCODE_WSOCKET_CLOSED);
entry = writeQueue.poll();
}
}
}