From 58eb3c5d64ecd6eb86008954ab9c163d7d729fb3 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Tue, 12 Jun 2018 09:15:32 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection.write=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/PrepareRunner.java | 38 +++- .../redkale/net/TcpAioAsyncConnection.java | 213 ++++++++++++++++-- src/org/redkale/net/http/WebSocketRunner.java | 30 --- 3 files changed, 234 insertions(+), 47 deletions(-) diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index e126129a7..030b68393 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -5,6 +5,7 @@ */ package org.redkale.net; +import java.io.IOException; import java.nio.*; import java.nio.channels.*; import java.util.concurrent.TimeUnit; @@ -20,7 +21,7 @@ import org.redkale.util.*; * @author zhangjx */ @SuppressWarnings("unchecked") -public final class PrepareRunner implements Runnable { +public class PrepareRunner implements Runnable { private final AsyncConnection channel; @@ -105,4 +106,39 @@ public final class PrepareRunner implements Runnable { } } + protected void prepare(ByteBuffer buffer, Request request, Response response) throws IOException { + context.prepare.prepare(buffer, request, response); + } + + protected void initResponse(Response response, AsyncConnection channel) { + response.init(channel); + } + + protected Response pollResponse() { + return context.responsePool.get(); + } + + protected Request pollRequest(Response response) { + return response.request; + } + + protected AsyncConnection removeChannel(Response response) { + return response.removeChannel(); + } + + protected ByteBuffer pollReadBuffer(Request request) { + return request.pollReadBuffer(); + } + + protected ByteBuffer pollReadBuffer(Response response) { + return response.request.pollReadBuffer(); + } + + protected void offerReadBuffer(Request request, ByteBuffer buffer) { + request.offerReadBuffer(buffer); + } + + protected void offerReadBuffer(Response response, ByteBuffer buffer) { + response.request.offerReadBuffer(buffer); + } } diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index bb0458090..985b272e4 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -23,6 +23,8 @@ import javax.net.ssl.SSLContext; */ public class TcpAioAsyncConnection extends AsyncConnection { + private final Semaphore semaphore = new Semaphore(1); + private int readTimeoutSeconds; private int writeTimeoutSeconds; @@ -31,6 +33,8 @@ public class TcpAioAsyncConnection extends AsyncConnection { private final SocketAddress remoteAddress; + private BlockingQueue writeQueue; + public TcpAioAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { @@ -102,33 +106,78 @@ public class TcpAioAsyncConnection extends AsyncConnection { channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler); } + private void nextWrite(A attachment) { + BlockingQueue queue = this.writeQueue; + WriteEntry entry = queue == null ? null : queue.poll(); + if (entry != null) { + try { + if (entry.writeOneBuffer == null) { + write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler); + } else { + write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler); + } + } catch (Exception e) { + entry.writeHandler.failed(e, entry.writeAttachment); + } + } else { + semaphore.release(); + } + } + @Override public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + write(true, src, attachment, handler); + } + + private void write(boolean acquire, ByteBuffer src, A attachment, CompletionHandler handler) { + if (acquire && !semaphore.tryAcquire()) { + if (this.writeQueue == null) { + synchronized (semaphore) { + if (this.writeQueue == null) { + this.writeQueue = new LinkedBlockingDeque<>(); + } + } + } + this.writeQueue.add(new WriteEntry(src, attachment, handler)); + return; + } + WriteOneCompletionHandler newHandler = new WriteOneCompletionHandler(src, handler); + if (!channel.isOpen()) { + newHandler.failed(new ClosedChannelException(), attachment); + return; + } this.writetime = System.currentTimeMillis(); if (writeTimeoutSeconds > 0) { - channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler); + channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, newHandler); } else { - channel.write(src, attachment, handler); + channel.write(src, attachment, newHandler); } } @Override public void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler handler) { + write(true, srcs, offset, length, attachment, handler); + } + + private void write(boolean acquire, ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler handler) { + if (acquire && !semaphore.tryAcquire()) { + if (this.writeQueue == null) { + synchronized (semaphore) { + if (this.writeQueue == null) { + this.writeQueue = new LinkedBlockingDeque<>(); + } + } + } + this.writeQueue.add(new WriteEntry(srcs, offset, length, attachment, handler)); + return; + } + WriteMoreCompletionHandler newHandler = new WriteMoreCompletionHandler(srcs, offset, length, handler); + if (!channel.isOpen()) { + newHandler.failed(new ClosedChannelException(), attachment); + return; + } this.writetime = System.currentTimeMillis(); - channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, - attachment, new CompletionHandler() { - - @Override - public void completed(Long result, A attachment) { - handler.completed(result.intValue(), attachment); - } - - @Override - public void failed(Throwable exc, A attachment) { - handler.failed(exc, attachment); - } - - }); + channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, attachment, newHandler); } @Override @@ -179,6 +228,17 @@ public class TcpAioAsyncConnection extends AsyncConnection { public final void close() throws IOException { super.close(); channel.close(); + BlockingQueue queue = this.writeQueue; + if (queue == null) return; + WriteEntry entry; + Exception ex = null; + while ((entry = queue.poll()) != null) { + if (ex == null) ex = new ClosedChannelException(); + try { + entry.writeHandler.failed(ex, entry.writeAttachment); + } catch (Exception e) { + } + } } @Override @@ -191,4 +251,125 @@ public class TcpAioAsyncConnection extends AsyncConnection { return true; } + private class WriteMoreCompletionHandler implements CompletionHandler { + + private final CompletionHandler writeHandler; + + private final ByteBuffer[] writeBuffers; + + private int writeOffset; + + private int writeLength; + + private int writeCount; + + public WriteMoreCompletionHandler(ByteBuffer[] buffers, int offset, int length, CompletionHandler handler) { + this.writeBuffers = buffers; + this.writeOffset = offset; + this.writeLength = length; + this.writeHandler = handler; + } + + @Override + public void completed(Long result, A attachment) { + if (result >= 0) { + writeCount += result; + try { + int index = -1; + for (int i = writeOffset; i < (writeOffset + writeLength); i++) { + if (writeBuffers[i].hasRemaining()) { + index = i; + break; + } + } + if (index >= 0) { + writeOffset += index; + writeLength -= index; + channel.write(writeBuffers, writeOffset, writeLength, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, attachment, this); + return; + } + } catch (Exception e) { + failed(e, attachment); + return; + } + nextWrite(attachment); + writeHandler.completed(writeCount, attachment); + } else { + nextWrite(attachment); + writeHandler.completed(result.intValue(), attachment); + } + } + + @Override + public void failed(Throwable exc, A attachment) { + nextWrite(attachment); + writeHandler.failed(exc, attachment); + } + + } + + private class WriteOneCompletionHandler implements CompletionHandler { + + private final CompletionHandler writeHandler; + + private final ByteBuffer writeOneBuffer; + + public WriteOneCompletionHandler(ByteBuffer buffer, CompletionHandler handler) { + this.writeOneBuffer = buffer; + this.writeHandler = handler; + } + + @Override + public void completed(Integer result, A attachment) { + try { + if (writeOneBuffer.hasRemaining()) { + channel.write(writeOneBuffer, attachment, this); + return; + } + } catch (Exception e) { + failed(e, attachment); + return; + } + nextWrite(attachment); + writeHandler.completed(result, attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + nextWrite(attachment); + writeHandler.failed(exc, attachment); + } + + } + + private static class WriteEntry { + + ByteBuffer writeOneBuffer; + + ByteBuffer[] writeBuffers; + + int writingCount; + + int writeOffset; + + int writeLength; + + Object writeAttachment; + + CompletionHandler writeHandler; + + public WriteEntry(ByteBuffer writeOneBuffer, Object writeAttachment, CompletionHandler writeHandler) { + this.writeOneBuffer = writeOneBuffer; + this.writeAttachment = writeAttachment; + this.writeHandler = writeHandler; + } + + public WriteEntry(ByteBuffer[] writeBuffers, int writeOffset, int writeLength, Object writeAttachment, CompletionHandler writeHandler) { + this.writeBuffers = writeBuffers; + this.writeOffset = writeOffset; + this.writeLength = writeLength; + this.writeAttachment = writeAttachment; + this.writeHandler = writeHandler; + } + } } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index d48836bb3..444f7267c 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -13,7 +13,6 @@ import java.nio.channels.*; import java.util.*; import java.util.AbstractMap.SimpleEntry; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.logging.*; @@ -39,10 +38,6 @@ class WebSocketRunner implements Runnable { volatile boolean closed = false; - private final AtomicBoolean writing = new AtomicBoolean(); - - private final BlockingQueue writeQueue = new ArrayBlockingQueue(512); - private final BiConsumer restMessageConsumer; //主要供RestWebSocket使用 protected long lastSendTime; @@ -228,12 +223,6 @@ class WebSocketRunner implements Runnable { //System.out.println("推送消息"); final CompletableFuture futureResult = new CompletableFuture<>(); try { - synchronized (writing) { - if (writing.getAndSet(true)) { - writeQueue.add(new QueueEntry(futureResult, packet)); - return futureResult; - } - } ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); @@ -277,18 +266,6 @@ class WebSocketRunner implements Runnable { } } } - QueueEntry entry = null; - synchronized (writing) { - entry = writeQueue.poll(); - if (entry == null) writing.set(false); - } - if (entry != null) { - future = entry.future; - ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), webSocket._engine.cryptor); - lastSendTime = System.currentTimeMillis(); - //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + entry.packet); - channel.write(buffers, buffers, this); - } } catch (Exception e) { future.complete(RETCODE_SENDEXCEPTION); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite"); @@ -298,7 +275,6 @@ class WebSocketRunner implements Runnable { @Override public void failed(Throwable exc, ByteBuffer[] attachments) { - writing.set(false); future.complete(RETCODE_SENDEXCEPTION); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler"); if (exc != null) { @@ -308,7 +284,6 @@ class WebSocketRunner implements Runnable { } }); } catch (Exception t) { - writing.set(false); futureResult.complete(RETCODE_SENDEXCEPTION); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on channel.write"); context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); @@ -331,11 +306,6 @@ class WebSocketRunner implements Runnable { readBuffer = null; engine.removeThenClose(webSocket); webSocket.onClose(code, reason); - QueueEntry entry = writeQueue.poll(); - while (entry != null) { - entry.future.complete(RETCODE_WSOCKET_CLOSED); - entry = writeQueue.poll(); - } } }