临时调整writeInLock

This commit is contained in:
redkale
2024-11-06 21:18:30 +08:00
parent 39b1ea3e4e
commit 550f955678
5 changed files with 100 additions and 361 deletions

View File

@@ -5,11 +5,8 @@
*/ */
package org.redkale.cluster.spi; package org.redkale.cluster.spi;
import static org.redkale.util.Utility.isEmpty;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Level; import java.util.logging.Level;
@@ -21,6 +18,7 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.util.RedkaleException; import org.redkale.util.RedkaleException;
import org.redkale.util.Traces; import org.redkale.util.Traces;
import static org.redkale.util.Utility.isEmpty;
/** /**
* 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例 * 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例
@@ -318,34 +316,5 @@ public class HttpLocalRpcClient extends HttpRpcClient {
byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length);
future.complete(rs); future.complete(rs);
} }
@Override
public void finishBuffer(boolean kill, ByteBuffer buffer) {
if (future == null) {
return;
}
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
future.complete(bs);
}
@Override
public void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (future == null) {
return;
}
int size = 0;
for (ByteBuffer buf : buffers) {
size += buf.remaining();
}
byte[] bs = new byte[size];
int index = 0;
for (ByteBuffer buf : buffers) {
int r = buf.remaining();
buf.get(bs, index, r);
index += r;
}
future.complete(bs);
}
} }
} }

View File

@@ -5,15 +5,13 @@
*/ */
package org.redkale.mq.spi; package org.redkale.mq.spi;
import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT;
import org.redkale.net.Response; import org.redkale.net.Response;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.service.RetResult; import org.redkale.service.RetResult;
@@ -238,33 +236,4 @@ public class HttpMessageResponse extends HttpResponse {
byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length);
finishHttpResult(null, new HttpResult(rs).contentType(contentType)); finishHttpResult(null, new HttpResult(rs).contentType(contentType));
} }
@Override
public void finishBuffer(boolean kill, ByteBuffer buffer) {
if (message.isEmptyRespTopic()) {
return;
}
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
finishHttpResult(null, new HttpResult(bs));
}
@Override
public void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (message.isEmptyRespTopic()) {
return;
}
int size = 0;
for (ByteBuffer buf : buffers) {
size += buf.remaining();
}
byte[] bs = new byte[size];
int index = 0;
for (ByteBuffer buf : buffers) {
int r = buf.remaining();
buf.get(bs, index, r);
index += r;
}
finishHttpResult(null, new HttpResult(bs));
}
} }

View File

@@ -264,13 +264,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
protected abstract <A> void writeImpl( protected abstract <A> void writeImpl(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler); ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract <A> void writeInLock(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> void writeInLock(
ByteBuffer src, Consumer<ByteBuffer> consumer, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract <A> void writeInLock( public abstract <A> void writeInLock(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler); ByteBuffer[] srcs,
int offset,
int length,
Consumer<ByteBuffer> consumer,
A attachment,
CompletionHandler<Integer, ? super A> handler);
public abstract <A> void writeInLock( public abstract <A> void writeInLock(
Supplier<ByteBuffer> supplier, A attachment, CompletionHandler<Integer, ? super A> handler); Supplier<ByteBuffer> supplier,
Consumer<ByteBuffer> consumer,
A attachment,
CompletionHandler<Integer, ? super A> handler);
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) { protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
read(handler); read(handler);
@@ -292,6 +301,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
public final void readRegisterInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrReadThread()) {
readRegister(handler);
} else {
executeRead(() -> readRegister(handler));
}
}
public final void read(CompletionHandler<Integer, ByteBuffer> handler) { public final void read(CompletionHandler<Integer, ByteBuffer> handler) {
if (sslEngine == null) { if (sslEngine == null) {
readImpl(handler); readImpl(handler);
@@ -520,27 +537,31 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
public final <A> void writeInLock(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) { public final <A> void writeInLock(
writeInLock(srcs, 0, srcs.length, attachment, handler); ByteBuffer[] srcs,
Consumer<ByteBuffer> consumer,
A attachment,
CompletionHandler<Integer, ? super A> handler) {
writeInLock(srcs, 0, srcs.length, consumer, attachment, handler);
} }
public final void writeInLock(byte[] bytes, CompletionHandler<Integer, Void> handler) { public final void writeInLock(byte[] bytes, CompletionHandler<Integer, Void> handler) {
writeInLock(ByteBuffer.wrap(bytes), null, handler); writeInLock(ByteBuffer.wrap(bytes), (Consumer) null, null, handler);
} }
public final void writeInLock(ByteTuple array, CompletionHandler<Integer, Void> handler) { public final void writeInLock(ByteTuple array, CompletionHandler<Integer, Void> handler) {
writeInLock(ByteBuffer.wrap(array.content(), array.offset(), array.length()), null, handler); writeInLock(ByteBuffer.wrap(array.content(), array.offset(), array.length()), null, null, handler);
} }
public final void writeInLock(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) { public final void writeInLock(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
writeInLock(ByteBuffer.wrap(bytes, offset, length), null, handler); writeInLock(ByteBuffer.wrap(bytes, offset, length), null, null, handler);
} }
public final void writeInLock(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) { public final void writeInLock(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) {
if (body == null) { if (body == null) {
writeInLock(ByteBuffer.wrap(header.content(), header.offset(), header.length()), null, handler); writeInLock(ByteBuffer.wrap(header.content(), header.offset(), header.length()), null, null, handler);
} else if (header == null) { } else if (header == null) {
writeInLock(ByteBuffer.wrap(body.content(), body.offset(), body.length()), null, handler); writeInLock(ByteBuffer.wrap(body.content(), body.offset(), body.length()), null, null, handler);
} else { } else {
writeInLock( writeInLock(
new ByteBuffer[] { new ByteBuffer[] {
@@ -548,6 +569,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
ByteBuffer.wrap(body.content(), body.offset(), body.length()) ByteBuffer.wrap(body.content(), body.offset(), body.length())
}, },
null, null,
null,
handler); handler);
} }
} }
@@ -561,9 +583,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
int bodyLength, int bodyLength,
CompletionHandler<Integer, Void> handler) { CompletionHandler<Integer, Void> handler) {
if (bodyContent == null) { if (bodyContent == null) {
writeInLock(ByteBuffer.wrap(headerContent, headerOffset, headerLength), null, handler); writeInLock(ByteBuffer.wrap(headerContent, headerOffset, headerLength), null, null, handler);
} else if (headerContent == null) { } else if (headerContent == null) {
writeInLock(ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength), null, handler); writeInLock(ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength), null, null, handler);
} else { } else {
writeInLock( writeInLock(
new ByteBuffer[] { new ByteBuffer[] {
@@ -571,6 +593,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength) ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength)
}, },
null, null,
null,
handler); handler);
} }
} }
@@ -588,33 +611,44 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
public final void writePipeline(CompletionHandler<Integer, Void> handler) { public final void writePipeline(CompletionHandler<Integer, Void> handler) {
writePipeline(null, handler);
}
public <A> void writePipeline(A attachment, CompletionHandler<Integer, ? super A> handler) {
ByteBufferWriter writer = this.pipelineWriter; ByteBufferWriter writer = this.pipelineWriter;
this.pipelineWriter = null; this.pipelineWriter = null;
if (writer == null) { if (writer == null) {
handler.completed(0, attachment); handler.completed(0, null);
} else { } else {
ByteBuffer[] srcs = writer.toBuffers(); ByteBuffer[] srcs = writer.toBuffers();
CompletionHandler<Integer, ? super A> newHandler = new CompletionHandler<Integer, A>() { CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, A attachment) { public void completed(Integer result, Void attachment) {
offerWriteBuffers(srcs); offerWriteBuffers(srcs);
handler.completed(result, attachment); handler.completed(result, attachment);
} }
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, Void attachment) {
offerWriteBuffers(srcs); offerWriteBuffers(srcs);
handler.failed(exc, attachment); handler.failed(exc, attachment);
} }
}; };
if (srcs.length == 1) { if (srcs.length == 1) {
write(srcs[0], attachment, newHandler); write(srcs[0], null, newHandler);
} else { } else {
write(srcs, attachment, newHandler); write(srcs, null, newHandler);
}
}
}
public void writePipelineInLock(CompletionHandler<Integer, Void> handler) {
ByteBufferWriter writer = this.pipelineWriter;
this.pipelineWriter = null;
if (writer == null) {
handler.completed(0, null);
} else {
ByteBuffer[] srcs = writer.toBuffers();
if (srcs.length == 1) {
writeInLock(srcs[0], this.writeBufferConsumer, null, handler);
} else {
writeInLock(srcs, this.writeBufferConsumer, null, handler);
} }
} }
} }
@@ -627,14 +661,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
public final <A> void writePipelineInIOThread(A attachment, CompletionHandler<Integer, ? super A> handler) {
if (inCurrWriteThread()) {
writePipeline(attachment, handler);
} else {
executeWrite(() -> writePipeline(attachment, handler));
}
}
// 返回pipelineCount个数数据是否全部写入完毕 // 返回pipelineCount个数数据是否全部写入完毕
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) { public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) {
return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length()); return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
@@ -662,65 +688,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
dataNode.pipelineCount = pipelineCount; dataNode.pipelineCount = pipelineCount;
} }
dataNode.put(pipelineIndex, bs, offset, length); dataNode.put(pipelineIndex, bs, offset, length);
if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { if (writer.getWriteBytesCounter() + dataNode.size == dataNode.pipelineCount) {
for (PipelineDataItem item : dataNode.arrayItems()) {
writer.put(item.data);
}
this.pipelineDataNode = null;
return true;
}
return false;
}
} finally {
writeLock.unlock();
}
}
// 返回pipelineCount个数数据是否全部写入完毕
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) {
return appendPipeline(
pipelineIndex,
pipelineCount,
header.content(),
header.offset(),
header.length(),
body == null ? null : body.content(),
body == null ? 0 : body.offset(),
body == null ? 0 : body.length());
}
// 返回pipelineCount个数数据是否全部写入完毕
public boolean appendPipeline(
int pipelineIndex,
int pipelineCount,
byte[] headerContent,
int headerOffset,
int headerLength,
byte[] bodyContent,
int bodyOffset,
int bodyLength) {
writeLock.lock();
try {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());
this.pipelineWriter = writer;
}
if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) {
writer.put(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
return (pipelineIndex == pipelineCount);
} else {
PipelineDataNode dataNode = this.pipelineDataNode;
if (dataNode == null) {
dataNode = new PipelineDataNode();
this.pipelineDataNode = dataNode;
}
if (pipelineIndex == pipelineCount) { // 此时pipelineCount为最大值
dataNode.pipelineCount = pipelineCount;
}
dataNode.put(
pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) {
for (PipelineDataItem item : dataNode.arrayItems()) { for (PipelineDataItem item : dataNode.arrayItems()) {
writer.put(item.data); writer.put(item.data);
} }
@@ -738,14 +706,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public int pipelineCount; public int pipelineCount;
public int itemsize; public int size;
private PipelineDataItem head; private PipelineDataItem head;
private PipelineDataItem tail; private PipelineDataItem tail;
public PipelineDataItem[] arrayItems() { public PipelineDataItem[] arrayItems() {
PipelineDataItem[] items = new PipelineDataItem[itemsize]; PipelineDataItem[] items = new PipelineDataItem[size];
PipelineDataItem item = head; PipelineDataItem item = head;
int i = 0; int i = 0;
while (item != null) { while (item != null) {
@@ -767,28 +735,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
tail.next = item; tail.next = item;
tail = item; tail = item;
} }
itemsize++; size++;
}
public void put(
int pipelineIndex,
byte[] headerContent,
int headerOffset,
int headerLength,
byte[] bodyContent,
int bodyOffset,
int bodyLength) {
if (tail == null) {
head = new PipelineDataItem(
pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
tail = head;
} else {
PipelineDataItem item = new PipelineDataItem(
pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
tail.next = item;
tail = item;
}
itemsize++;
} }
} }
@@ -802,34 +749,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public PipelineDataItem(int index, byte[] bs, int offset, int length) { public PipelineDataItem(int index, byte[] bs, int offset, int length) {
this.index = index; this.index = index;
this.data = Arrays.copyOfRange(bs, offset, offset + length); byte[] result = new byte[length];
} System.arraycopy(bs, offset, result, 0, length);
this.data = result;
public PipelineDataItem(
int index,
byte[] headerContent,
int headerOffset,
int headerLength,
byte[] bodyContent,
int bodyOffset,
int bodyLength) {
this.index = index;
this.data = bodyLength > 0
? copyOfRange(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength)
: Arrays.copyOfRange(headerContent, headerOffset, headerOffset + headerLength);
}
private static byte[] copyOfRange(
byte[] headerContent,
int headerOffset,
int headerLength,
byte[] bodyContent,
int bodyOffset,
int bodyLength) {
byte[] result = new byte[headerLength + bodyLength];
System.arraycopy(headerContent, headerOffset, result, 0, headerLength);
System.arraycopy(bodyContent, bodyOffset, result, headerLength, bodyLength);
return result;
} }
@Override @Override

View File

@@ -325,7 +325,11 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
@Override @Override
public <A> void writeInLock(ByteBuffer buffer, A attachment, CompletionHandler<Integer, ? super A> handler) { public <A> void writeInLock(
ByteBuffer buffer,
Consumer<ByteBuffer> consumer,
A attachment,
CompletionHandler<Integer, ? super A> handler) {
int total = 0; int total = 0;
Exception t = null; Exception t = null;
lockWrite(); lockWrite();
@@ -350,6 +354,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writePending = false; this.writePending = false;
unlockWrite(); unlockWrite();
} }
if (consumer != null) {
consumer.accept(buffer);
}
if (t != null) { if (t != null) {
handler.failed(t, attachment); handler.failed(t, attachment);
} else { } else {
@@ -359,7 +366,12 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override @Override
public <A> void writeInLock( public <A> void writeInLock(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) { ByteBuffer[] srcs,
int offset,
int length,
Consumer<ByteBuffer> consumer,
A attachment,
CompletionHandler<Integer, ? super A> handler) {
int total = 0; int total = 0;
Exception t = null; Exception t = null;
int batchOffset = offset; int batchOffset = offset;
@@ -398,6 +410,11 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writePending = false; this.writePending = false;
unlockWrite(); unlockWrite();
} }
if (consumer != null) {
for (int i = 0; i < length; i++) {
consumer.accept(srcs[offset + i]);
}
}
if (t != null) { if (t != null) {
handler.failed(t, attachment); handler.failed(t, attachment);
} else { } else {
@@ -407,7 +424,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override @Override
public <A> void writeInLock( public <A> void writeInLock(
Supplier<ByteBuffer> supplier, A attachment, CompletionHandler<Integer, ? super A> handler) { Supplier<ByteBuffer> supplier,
Consumer<ByteBuffer> consumer,
A attachment,
CompletionHandler<Integer, ? super A> handler) {
int total = 0; int total = 0;
Exception t = null; Exception t = null;
lockWrite(); lockWrite();
@@ -431,6 +451,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
total += c; total += c;
} }
if (consumer != null) {
consumer.accept(buffer);
}
} catch (Exception e) { } catch (Exception e) {
t = e; t = e;
} finally { } finally {

View File

@@ -61,7 +61,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private final ByteBuffer writeBuffer; private final ByteBuffer writeBuffer;
private final CompletionHandler finishBytesIOThreadHandler = new CompletionHandler<Integer, Void>() { private final CompletionHandler<Integer, Void> finishVoidIOThreadHandler = new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
@@ -74,60 +74,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
}; };
private final CompletionHandler finishBufferIOThreadHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment != writeBuffer) {
channel.offerWriteBuffer(attachment);
} else {
attachment.clear();
}
completeInIOThread(false);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
if (attachment != writeBuffer) {
channel.offerWriteBuffer(attachment);
} else {
attachment.clear();
}
completeInIOThread(true);
}
};
private final CompletionHandler finishBuffersIOThreadHandler = new CompletionHandler<Integer, ByteBuffer[]>() {
@Override
public void completed(final Integer result, final ByteBuffer[] attachments) {
if (attachments != null) {
for (ByteBuffer attachment : attachments) {
if (attachment != writeBuffer) {
channel.offerWriteBuffer(attachment);
} else {
attachment.clear();
}
}
}
completeInIOThread(false);
}
@Override
public void failed(Throwable exc, final ByteBuffer[] attachments) {
if (attachments != null) {
for (ByteBuffer attachment : attachments) {
if (attachment != writeBuffer) {
channel.offerWriteBuffer(attachment);
} else {
attachment.clear();
}
}
}
completeInIOThread(true);
}
};
protected Response(C context, final R request) { protected Response(C context, final R request) {
this.context = context; this.context = context;
this.request = request; this.request = request;
@@ -136,10 +82,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.workExecutor = context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor; this.workExecutor = context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
} }
protected ByteBuffer writeBuffer() {
return writeBuffer;
}
protected AsyncConnection removeChannel() { protected AsyncConnection removeChannel() {
AsyncConnection ch = this.channel; AsyncConnection ch = this.channel;
this.channel = null; this.channel = null;
@@ -357,28 +299,16 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) { if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) {
AsyncConnection conn = removeChannel(); AsyncConnection conn = removeChannel();
if (conn != null && conn.protocolCodec != null) { if (conn != null) {
this.responseConsumer.accept(this); this.responseConsumer.accept(this);
if (!request.readCompleted) { if (!request.readCompleted) {
conn.readRegister(conn.protocolCodec); conn.readRegisterInIOThread(conn.protocolCodec);
}
return;
} }
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
Consumer<Response> poolConsumer = this.responseConsumer;
this.recycle();
new ProtocolCodec(context, poolSupplier, poolConsumer, conn)
.response(this)
.run(null);
request.readCompleted = false;
} }
} else {
this.responseConsumer.accept(this); this.responseConsumer.accept(this);
} }
}
protected void writeInIOThread(ByteBuffer buffer) {
this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler);
}
public final void finish(final byte[] bs) { public final void finish(final byte[] bs) {
finish(false, bs, 0, bs.length); finish(false, bs, 0, bs.length);
@@ -409,101 +339,27 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
if (allCompleted) { if (allCompleted) {
request.pipelineCompleted = true; request.pipelineCompleted = true;
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); this.channel.writePipelineInLock(this.finishVoidIOThreadHandler);
} else { } else {
removeChannel(); removeChannel();
this.responseConsumer.accept(this); this.responseConsumer.accept(this);
} }
} else if (this.channel.hasPipelineData()) { } else if (this.channel.hasPipelineData()) {
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); this.channel.writePipelineInLock(this.finishVoidIOThreadHandler);
} else { } else {
ByteBuffer buffer = this.writeBuffer; ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= length) { if (buffer != null && buffer.capacity() >= length) {
buffer.clear(); buffer.clear();
buffer.put(bs, offset, length); buffer.put(bs, offset, length);
buffer.flip(); buffer.flip();
this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler); this.channel.writeInLock(buffer, (Consumer) null, null, finishVoidIOThreadHandler);
} else { } else {
this.channel.writeInIOThread(bs, offset, length, finishBytesIOThreadHandler); this.channel.writeInLock(bs, offset, length, finishVoidIOThreadHandler);
} }
} }
} }
public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) {
if (kill) {
refuseAlive();
}
if (request.pipelineIndex > 0) {
boolean allCompleted = this.channel.appendPipeline(
request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.appendPipeline(
request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler);
}
}
protected void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (kill) {
refuseAlive();
}
if (request.pipelineIndex > 0) {
ByteArray array = new ByteArray();
for (ByteBuffer buffer : buffers) {
array.put(buffer);
}
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, array);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.writeInIOThread(buffers, buffers, this.finishBuffersIOThreadHandler);
} else {
AsyncConnection conn = removeChannel();
if (conn != null) {
conn.offerWriteBuffers(buffers);
}
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
// 先将pipeline数据写入完再写入buffers
this.channel.writePipelineInIOThread(new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
channel.write(buffers, buffers, finishBuffersIOThreadHandler);
}
@Override
public void failed(Throwable exc, Void attachment) {
finishBuffersIOThreadHandler.failed(exc, buffers);
}
});
} else {
this.channel.writeInIOThread(buffers, buffers, finishBuffersIOThreadHandler);
}
}
protected final void finishBuffer(ByteBuffer buffer) {
finishBuffers(false, buffer);
}
protected final void finishBuffers(ByteBuffer... buffers) {
finishBuffers(false, buffers);
}
protected void finishBuffer(boolean kill, ByteBuffer buffer) {
finishBuffers(kill, buffer);
}
protected void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) { protected void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) {
ByteBuffer buffer = this.writeBuffer; ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= array.length()) { if (buffer != null && buffer.capacity() >= array.length()) {