From 7a762563178733acfe0d8808eae69526e56780bd Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 9 Nov 2024 23:35:35 +0800 Subject: [PATCH] =?UTF-8?q?Response=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 260 +++++---------- .../org/redkale/net/AsyncNioConnection.java | 302 ++++++++++-------- src/main/java/org/redkale/net/Response.java | 112 +------ .../org/redkale/net/http/HttpResponse.java | 24 +- .../redkale/net/http/WebSocketServlet.java | 3 +- 5 files changed, 258 insertions(+), 443 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 877f25bc3..dcbd580a4 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -256,6 +256,39 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected abstract void writeImpl( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + /** + * src写完才会回调 + * + * @see org.redkale.net.AsyncNioConnection#writeImpl(java.nio.ByteBuffer, java.util.function.Consumer, java.lang.Object, java.nio.channels.CompletionHandler) + * @param A + * @param src ByteBuffer + * @param consumer Consumer + * @param attachment A + * @param handler CompletionHandler + */ + protected abstract void writeImpl( + ByteBuffer src, Consumer consumer, A attachment, CompletionHandler handler); + + /** + * srcs写完才会回调 + * + * @see org.redkale.net.AsyncNioConnection#writeImpl(java.nio.ByteBuffer[], int, int, java.util.function.Consumer, java.lang.Object, java.nio.channels.CompletionHandler) + * @param A + * @param srcs ByteBuffer[] + * @param offset offset + * @param length length + * @param consumer Consumer + * @param attachment A + * @param handler CompletionHandler + */ + protected abstract void writeImpl( + ByteBuffer[] srcs, + int offset, + int length, + Consumer consumer, + A attachment, + CompletionHandler handler); + protected void startRead(CompletionHandler handler) { read(handler); } @@ -301,7 +334,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { * @param handler 回调函数 */ public final void write(ByteTuple array, CompletionHandler handler) { - write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler); + write(array.content(), array.offset(), array.length(), null, handler); } /** @@ -318,7 +351,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { void write(ByteBuffer src, A attachment, CompletionHandler handler) { if (sslEngine == null) { - writeImpl(src, attachment, handler); + writeImpl(src, (Consumer) null, attachment, handler); } else { try { int remain = src.remaining(); @@ -338,7 +371,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { void write( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { if (sslEngine == null) { - writeImpl(srcs, offset, length, attachment, handler); + writeImpl(srcs, offset, length, (Consumer) null, attachment, handler); } else { try { int remain = ByteBufferReader.remaining(srcs, offset, length); @@ -355,44 +388,10 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { - write(srcs, 0, srcs.length, attachment, handler); - } - - void write(byte[] bytes, CompletionHandler handler) { - write(bytes, 0, bytes.length, (byte[]) null, 0, 0, handler); - } - - void write(byte[] bytes, int offset, int length, CompletionHandler handler) { - write(bytes, offset, length, (byte[]) null, 0, 0, handler); - } - - void write( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength, - CompletionHandler handler) { - write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, null, handler); - } - - void write( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength, - Object handlerAttachment, - CompletionHandler handler) { + void write(byte[] bytes, int offset, int length, Object attachment, CompletionHandler handler) { final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer(); - if (buffer.remaining() >= headerLength + bodyLength) { - buffer.put(headerContent, headerOffset, headerLength); - if (bodyLength > 0) { - buffer.put(bodyContent, bodyOffset, bodyLength); - } + if (buffer.remaining() >= length) { + buffer.put(bytes, offset, length); buffer.flip(); CompletionHandler newHandler = new CompletionHandler() { @Override @@ -407,14 +406,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { handler.failed(exc, attachment); } }; - write(buffer, handlerAttachment, newHandler); + write(buffer, attachment, newHandler); } else { ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : this::pollWriteSSLBuffer, buffer); - writer.put(headerContent, headerOffset, headerLength); - if (bodyLength > 0) { - writer.put(bodyContent, bodyOffset, bodyLength); - } + writer.put(bytes, offset, length); final ByteBuffer[] buffers = writer.toBuffers(); CompletionHandler newHandler = new CompletionHandler() { @Override @@ -429,11 +425,19 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { handler.failed(exc, attachment); } }; - write(buffers, handlerAttachment, newHandler); + write(buffers, 0, buffers.length, attachment, newHandler); } } - // src写完才会回调 + /** + * src写完才会回调 + * + * @see Response#finish(boolean, byte[], int, int) + * @param 泛型 + * @param src ByteBuffer + * @param attachment 附件 + * @param handler 回调函数 + */ public final void writeInIOThread(ByteBuffer src, A attachment, CompletionHandler handler) { if (inCurrWriteThread()) { write(src, attachment, handler); @@ -441,6 +445,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { executeWrite(() -> write(src, attachment, handler)); } } + /** + * src写完才会回调 + * + * @see Response#finish(boolean, byte[], int, int) + * @param bytes 内容 + * @param offset 培偏移量 + * @param length 长度 + * @param handler 回调函数 + */ + public final void writeInIOThread(byte[] bytes, int offset, int length, CompletionHandler handler) { + if (inCurrWriteThread()) { + write(bytes, offset, length, null, handler); + } else { + executeWrite(() -> write(bytes, offset, length, null, handler)); + } + } public final void writeInIOThread( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { @@ -454,17 +474,17 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public final void writeInIOThread( ByteBuffer[] srcs, A attachment, CompletionHandler handler) { if (inCurrWriteThread()) { - write(srcs, attachment, handler); + write(srcs, 0, srcs.length, attachment, handler); } else { - executeWrite(() -> write(srcs, attachment, handler)); + executeWrite(() -> write(srcs, 0, srcs.length, attachment, handler)); } } public final void writeInIOThread(byte[] bytes, CompletionHandler handler) { if (inCurrWriteThread()) { - write(bytes, handler); + write(bytes, 0, bytes.length, null, handler); } else { - executeWrite(() -> write(bytes, handler)); + executeWrite(() -> write(bytes, 0, bytes.length, null, handler)); } } @@ -476,30 +496,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void writeInIOThread(byte[] bytes, int offset, int length, CompletionHandler handler) { - if (inCurrWriteThread()) { - write(bytes, offset, length, handler); - } else { - executeWrite(() -> write(bytes, offset, length, handler)); - } - } - - public final void writeInIOThread( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength, - CompletionHandler handler) { - if (inCurrWriteThread()) { - write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler); - } else { - executeWrite(() -> - write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler)); - } - } - public void setReadBuffer(ByteBuffer buffer) { if (this.readBuffer != null) { throw new RedkaleException("repeat AsyncConnection.setReadBuffer"); @@ -539,7 +535,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { if (srcs.length == 1) { write(srcs[0], attachment, newHandler); } else { - write(srcs, attachment, newHandler); + write(srcs, 0, srcs.length, attachment, newHandler); } } } @@ -566,7 +562,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } // 返回pipelineCount个数数据是否全部写入完毕 - public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) { + public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bytes, int offset, int length) { writeLock.lock(); try { ByteBufferWriter writer = this.pipelineWriter; @@ -575,7 +571,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { this.pipelineWriter = writer; } if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) { - writer.put(bs, offset, length); + writer.put(bytes, offset, length); return (pipelineIndex == pipelineCount); } else { PipelineDataNode dataNode = this.pipelineDataNode; @@ -586,65 +582,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { if (pipelineIndex == pipelineCount) { // 此时pipelineCount为最大值 dataNode.pipelineCount = pipelineCount; } - dataNode.put(pipelineIndex, bs, offset, length); - if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { - for (PipelineDataItem item : dataNode.arrayItems()) { - writer.put(item.data); - } - this.pipelineDataNode = null; - return true; - } - return false; - } - } finally { - writeLock.unlock(); - } - } - - // 返回pipelineCount个数数据是否全部写入完毕 - public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) { - return appendPipeline( - pipelineIndex, - pipelineCount, - header.content(), - header.offset(), - header.length(), - body == null ? null : body.content(), - body == null ? 0 : body.offset(), - body == null ? 0 : body.length()); - } - - // 返回pipelineCount个数数据是否全部写入完毕 - public boolean appendPipeline( - int pipelineIndex, - int pipelineCount, - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - writeLock.lock(); - try { - ByteBufferWriter writer = this.pipelineWriter; - if (writer == null) { - writer = ByteBufferWriter.create(getWriteBufferSupplier()); - this.pipelineWriter = writer; - } - if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) { - writer.put(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - return (pipelineIndex == pipelineCount); - } else { - PipelineDataNode dataNode = this.pipelineDataNode; - if (dataNode == null) { - dataNode = new PipelineDataNode(); - this.pipelineDataNode = dataNode; - } - if (pipelineIndex == pipelineCount) { // 此时pipelineCount为最大值 - dataNode.pipelineCount = pipelineCount; - } - dataNode.put( - pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); + dataNode.put(pipelineIndex, bytes, offset, length); if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { for (PipelineDataItem item : dataNode.arrayItems()) { writer.put(item.data); @@ -694,27 +632,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } itemsize++; } - - public void put( - int pipelineIndex, - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - if (tail == null) { - head = new PipelineDataItem( - pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - tail = head; - } else { - PipelineDataItem item = new PipelineDataItem( - pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - tail.next = item; - tail = item; - } - itemsize++; - } } private static class PipelineDataItem implements Comparable { @@ -730,33 +647,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { this.data = Arrays.copyOfRange(bs, offset, offset + length); } - public PipelineDataItem( - int index, - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - this.index = index; - this.data = bodyLength > 0 - ? copyOfRange(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength) - : Arrays.copyOfRange(headerContent, headerOffset, headerOffset + headerLength); - } - - private static byte[] copyOfRange( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - byte[] result = new byte[headerLength + bodyLength]; - System.arraycopy(headerContent, headerOffset, result, 0, headerLength); - System.arraycopy(bodyContent, bodyOffset, result, headerLength, bodyLength); - return result; - } - @Override public int compareTo(PipelineDataItem o) { return this.index - o.index; diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index b0bd94d84..0a831ed40 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -152,61 +152,160 @@ abstract class AsyncNioConnection extends AsyncConnection { doRead(this.ioReadThread.inCurrThread()); } + public void doRead(boolean direct) { + try { + this.readTime = System.currentTimeMillis(); + int readCount = 0; + if (direct) { + if (this.readByteBuffer == null) { + this.readByteBuffer = sslEngine == null ? pollReadBuffer() : pollReadSSLBuffer(); + } + readCount = implRead(readByteBuffer); + } + + if (readCount != 0) { + handleRead(readCount, null); + } else if (readKey == null) { + ioReadThread.register(selector -> { + try { + if (readKey == null) { + readKey = keyFor(selector); + } + if (readKey == null) { + readKey = implRegister(selector, SelectionKey.OP_READ); + readKey.attach(this); + } else { + readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); + } + } catch (ClosedChannelException e) { + handleRead(0, e); + } + }); + } else { + ioReadThread.interestOpsOr(readKey, SelectionKey.OP_READ); + } + } catch (Exception e) { + handleRead(0, e); + } + } + + protected void handleRead(final int totalCount, Throwable t) { + CompletionHandler handler = this.readCompletionHandler; + ByteBuffer attach = this.readByteBuffer; + // 清空读参数 + this.readCompletionHandler = null; + this.readByteBuffer = null; + this.readPending = false; // 必须放最后 + + if (handler == null) { + if (t == null) { + protocolCodec.completed(totalCount, attach); + } else { + protocolCodec.failed(t, attach); + } + } else { + if (t == null) { + handler.completed(totalCount, attach); + } else { + handler.failed(t, attach); + } + } + } + @Override - public void write( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength, - CompletionHandler handler) { - - if (sslEngine != null) { - super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler); - return; - } - Objects.requireNonNull(headerContent); + public void writeImpl( + ByteBuffer src, + Consumer consumer, + A attachment, + CompletionHandler handler) { + Objects.requireNonNull(src); Objects.requireNonNull(handler); - if (!this.isConnected()) { - handler.failed(new NotYetConnectedException(), null); - return; - } - if (this.writePending) { - handler.failed(new WritePendingException(), null); - return; - } - this.writePending = true; - this.writeByteTuple1Array = headerContent; - this.writeByteTuple1Offset = headerOffset; - this.writeByteTuple1Length = headerLength; - this.writeByteTuple2Array = bodyContent; - this.writeByteTuple2Offset = bodyOffset; - this.writeByteTuple2Length = bodyLength; - this.writeAttachment = null; - CompletionHandler newHandler = new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - if (writeByteBuffers != null) { - offerWriteBuffers(writeByteBuffers); - } else { - offerWriteBuffer(writeByteBuffer); - } - handler.completed(result, attachment); + int total = 0; + Exception t = null; + try { + if (this.writePending) { + handler.failed(new WritePendingException(), attachment); + return; } + this.writePending = true; + while (src.hasRemaining()) { // 必须要将buffer写完为止 + int c = implWrite(src); + if (c < 0) { + t = new ClosedChannelException(); + total = c; + break; + } + total += c; + } + } catch (Exception e) { + t = e; + } + this.writePending = false; + if (consumer != null) { + consumer.accept(src); + } + if (t != null) { + handler.failed(t, attachment); + } else { + handler.completed(total, attachment); + } + } - @Override - public void failed(Throwable exc, Void attachment) { - if (writeByteBuffers != null) { - offerWriteBuffers(writeByteBuffers); - } else { - offerWriteBuffer(writeByteBuffer); - } - handler.failed(exc, attachment); + @Override + public void writeImpl( + ByteBuffer[] srcs, + int offset, + int length, + Consumer consumer, + A attachment, + CompletionHandler handler) { + Objects.requireNonNull(srcs); + Objects.requireNonNull(handler); + int total = 0; + Exception t = null; + int batchOffset = offset; + int batchLength = length; + ByteBuffer[] batchBuffers = srcs; + try { + if (this.writePending) { + handler.failed(new WritePendingException(), attachment); + return; } - }; - this.writeCompletionHandler = newHandler; - doWrite(); // 如果不是true,则bodyCallback的执行可能会切换线程 + this.writePending = true; + boolean hasRemain = true; + while (hasRemain) { // 必须要将buffer写完为止 + int c = implWrite(batchBuffers, batchOffset, batchLength); + if (c < 0) { + t = new ClosedChannelException(); + total = c; + break; + } + boolean remain = false; + for (int i = 0; i < batchLength; i++) { + if (batchBuffers[batchOffset + i].hasRemaining()) { + remain = true; + batchOffset += i; + batchLength -= i; + break; + } + } + hasRemain = remain; + total += c; + } + } catch (Exception e) { + t = e; + } + this.writePending = false; + if (consumer != null) { + for (int i = 0; i < length; i++) { + consumer.accept(srcs[offset + i]); + } + } + if (t != null) { + handler.failed(t, attachment); + } else { + handler.completed(total, attachment); + } } @Override @@ -250,43 +349,6 @@ abstract class AsyncNioConnection extends AsyncConnection { doWrite(); } - public void doRead(boolean direct) { - try { - this.readTime = System.currentTimeMillis(); - int readCount = 0; - if (direct) { - if (this.readByteBuffer == null) { - this.readByteBuffer = sslEngine == null ? pollReadBuffer() : pollReadSSLBuffer(); - } - readCount = implRead(readByteBuffer); - } - - if (readCount != 0) { - handleRead(readCount, null); - } else if (readKey == null) { - ioReadThread.register(selector -> { - try { - if (readKey == null) { - readKey = keyFor(selector); - } - if (readKey == null) { - readKey = implRegister(selector, SelectionKey.OP_READ); - readKey.attach(this); - } else { - readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); - } - } catch (ClosedChannelException e) { - handleRead(0, e); - } - }); - } else { - ioReadThread.interestOpsOr(readKey, SelectionKey.OP_READ); - } - } catch (Exception e) { - handleRead(0, e); - } - } - public void doWrite() { try { this.writeTime = System.currentTimeMillis(); @@ -372,9 +434,6 @@ abstract class AsyncNioConnection extends AsyncConnection { handleWrite(totalCount, new ClosedChannelException()); } else if (writeCompleted && (totalCount != 0 || !hasRemain)) { handleWrite(this.writeTotal + totalCount, null); - // if (fastWriteCount.get() > 0) { - // doWrite(); - // } } else if (writeKey == null) { ioWriteThread.register(selector -> { try { @@ -399,6 +458,26 @@ abstract class AsyncNioConnection extends AsyncConnection { } } + protected void handleWrite(final int totalCount, Throwable t) { + CompletionHandler handler = this.writeCompletionHandler; + Object attach = this.writeAttachment; + // 清空写参数 + this.writeCompletionHandler = null; + this.writeAttachment = null; + this.writeByteBuffer = null; + this.writeByteBuffers = null; + this.writeBuffersOffset = 0; + this.writeBuffersLength = 0; + this.writeTotal = 0; + this.writePending = false; // 必须放最后 + + if (t == null) { + handler.completed(totalCount, attach); + } else { + handler.failed(t, attach); + } + } + protected void handleConnect(Throwable t) { if (connectKey != null) { connectKey.cancel(); @@ -420,49 +499,6 @@ abstract class AsyncNioConnection extends AsyncConnection { } } - protected void handleRead(final int totalCount, Throwable t) { - CompletionHandler handler = this.readCompletionHandler; - ByteBuffer attach = this.readByteBuffer; - // 清空读参数 - this.readCompletionHandler = null; - this.readByteBuffer = null; - this.readPending = false; // 必须放最后 - - if (handler == null) { - if (t == null) { - protocolCodec.completed(totalCount, attach); - } else { - protocolCodec.failed(t, attach); - } - } else { - if (t == null) { - handler.completed(totalCount, attach); - } else { - handler.failed(t, attach); - } - } - } - - protected void handleWrite(final int totalCount, Throwable t) { - CompletionHandler handler = this.writeCompletionHandler; - Object attach = this.writeAttachment; - // 清空写参数 - this.writeCompletionHandler = null; - this.writeAttachment = null; - this.writeByteBuffer = null; - this.writeByteBuffers = null; - this.writeBuffersOffset = 0; - this.writeBuffersLength = 0; - this.writeTotal = 0; - this.writePending = false; // 必须放最后 - - if (t == null) { - handler.completed(totalCount, attach); - } else { - handler.failed(t, attach); - } - } - @Deprecated(since = "2.5.0") protected abstract ReadableByteChannel readableByteChannel(); diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index daafcbe18..850c9e55a 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -345,27 +345,27 @@ public abstract class Response> { } } - public final void finish(final byte[] bs) { + public final void finish(byte[] bs) { finish(false, bs, 0, bs.length); } - public final void finish(final byte[] bs, int offset, int length) { + public final void finish(byte[] bs, int offset, int length) { finish(false, bs, offset, length); } - public final void finish(final ByteTuple array) { + public final void finish(ByteTuple array) { finish(false, array.content(), array.offset(), array.length()); } - public final void finish(boolean kill, final byte[] bs) { + public final void finish(boolean kill, byte[] bs) { finish(kill, bs, 0, bs.length); } - public final void finish(boolean kill, final ByteTuple array) { + public final void finish(boolean kill, ByteTuple array) { finish(kill, array.content(), array.offset(), array.length()); } - public void finish(boolean kill, final byte[] bs, int offset, int length) { + public void finish(boolean kill, byte[] bs, int offset, int length) { if (kill) { refuseAlive(); } @@ -395,104 +395,8 @@ public abstract class Response> { } } - public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) { - if (kill) { - refuseAlive(); - } - if (request.pipelineIndex > 0) { - boolean allCompleted = this.channel.appendPipeline( - request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); - if (allCompleted) { - request.pipelineCompleted = true; - this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); - } else { - removeChannel(); - this.responseConsumer.accept(this); - } - } else if (this.channel.hasPipelineData()) { - this.channel.appendPipeline( - request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); - this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); - } else { - this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler); - } - } - - protected void send(final ByteTuple array, final CompletionHandler handler) { - ByteBuffer buffer = this.writeBuffer; - if (buffer != null && buffer.capacity() >= array.length()) { - buffer.clear(); - buffer.put(array.content(), array.offset(), array.length()); - buffer.flip(); - this.channel.write(buffer, buffer, new CompletionHandler() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - attachment.clear(); - handler.completed(result, null); - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - attachment.clear(); - handler.failed(exc, null); - } - }); - } else { - this.channel.write(array, handler); - } - } - - protected void send(ByteBuffer buffer, A attachment, CompletionHandler handler) { - this.channel.write(buffer, attachment, new CompletionHandler() { - - @Override - public void completed(Integer result, A attachment) { - if (buffer != writeBuffer) { - channel.offerWriteBuffer(buffer); - } else { - buffer.clear(); - } - if (handler != null) { - handler.completed(result, attachment); - } - } - - @Override - public void failed(Throwable exc, A attachment) { - if (buffer != writeBuffer) { - channel.offerWriteBuffer(buffer); - } else { - buffer.clear(); - } - if (handler != null) { - handler.failed(exc, attachment); - } - } - }); - } - - protected void send(ByteBuffer[] buffers, A attachment, CompletionHandler handler) { - this.channel.write(buffers, attachment, new CompletionHandler() { - - @Override - public void completed(Integer result, A attachment) { - channel.offerWriteBuffers(buffers); - if (handler != null) { - handler.completed(result, attachment); - } - } - - @Override - public void failed(Throwable exc, A attachment) { - for (ByteBuffer buffer : buffers) { - channel.offerWriteBuffer(buffer); - } - if (handler != null) { - handler.failed(exc, attachment); - } - } - }); + protected void send(ByteTuple array, CompletionHandler handler) { + this.channel.writeInIOThread(array, handler); } public C getContext() { diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 4e6aa525a..082fa606a 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -1241,28 +1241,14 @@ public class HttpResponse extends Response { } /** - * 异步输出指定内容, 供WebSocketServlet使用 + * 异步输出header * - * @param buffer 输出内容 * @param handler 异步回调函数 */ - protected void sendBody(ByteBuffer buffer, CompletionHandler handler) { - if (this.headWritedSize < 0) { - if (this.contentLength < 0) { - this.contentLength = buffer == null ? 0 : buffer.remaining(); - } - createHeader(); - if (buffer == null) { // 只发header - super.send(headerArray, handler); - } else { - ByteBuffer headBuf = channel.pollWriteBuffer(); - headBuf.put(headerArray.content(), 0, headerArray.length()); - headBuf.flip(); - super.send(new ByteBuffer[] {headBuf, buffer}, null, handler); - } - } else { - super.send(buffer, null, handler); - } + protected void sendHeader(CompletionHandler handler) { + this.contentLength = 0; + createHeader(); + super.send(headerArray, handler); } /** diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 28b078400..561f6d34c 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -9,7 +9,6 @@ import java.io.*; import java.lang.reflect.*; import java.math.BigInteger; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.security.MessageDigest; import java.util.*; @@ -322,7 +321,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate"); } - response.sendBody((ByteBuffer) null, new CompletionHandler() { + response.sendHeader(new CompletionHandler() { @Override public void completed(Integer result, Void attachment) {