移除writeLock
This commit is contained in:
@@ -5,6 +5,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.redkale.cluster.spi;
|
package org.redkale.cluster.spi;
|
||||||
|
|
||||||
|
import static org.redkale.util.Utility.isEmpty;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@@ -18,7 +20,6 @@ import org.redkale.convert.json.JsonConvert;
|
|||||||
import org.redkale.net.http.*;
|
import org.redkale.net.http.*;
|
||||||
import org.redkale.util.RedkaleException;
|
import org.redkale.util.RedkaleException;
|
||||||
import org.redkale.util.Traces;
|
import org.redkale.util.Traces;
|
||||||
import static org.redkale.util.Utility.isEmpty;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例
|
* 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例
|
||||||
|
|||||||
@@ -5,13 +5,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.redkale.mq.spi;
|
package org.redkale.mq.spi;
|
||||||
|
|
||||||
|
import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT;
|
||||||
|
|
||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import org.redkale.convert.Convert;
|
import org.redkale.convert.Convert;
|
||||||
import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT;
|
|
||||||
import org.redkale.net.Response;
|
import org.redkale.net.Response;
|
||||||
import org.redkale.net.http.*;
|
import org.redkale.net.http.*;
|
||||||
import org.redkale.service.RetResult;
|
import org.redkale.service.RetResult;
|
||||||
|
|||||||
@@ -264,23 +264,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
protected abstract <A> void writeImpl(
|
protected abstract <A> void writeImpl(
|
||||||
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
|
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
|
||||||
|
|
||||||
public abstract <A> void writeInLock(
|
|
||||||
ByteBuffer src, Consumer<ByteBuffer> consumer, A attachment, CompletionHandler<Integer, ? super A> handler);
|
|
||||||
|
|
||||||
public abstract <A> void writeInLock(
|
|
||||||
ByteBuffer[] srcs,
|
|
||||||
int offset,
|
|
||||||
int length,
|
|
||||||
Consumer<ByteBuffer> consumer,
|
|
||||||
A attachment,
|
|
||||||
CompletionHandler<Integer, ? super A> handler);
|
|
||||||
|
|
||||||
public abstract <A> void writeInLock(
|
|
||||||
Supplier<ByteBuffer> supplier,
|
|
||||||
Consumer<ByteBuffer> consumer,
|
|
||||||
A attachment,
|
|
||||||
CompletionHandler<Integer, ? super A> handler);
|
|
||||||
|
|
||||||
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
|
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
|
||||||
read(handler);
|
read(handler);
|
||||||
}
|
}
|
||||||
@@ -301,14 +284,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void readRegisterInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
|
|
||||||
if (inCurrReadThread()) {
|
|
||||||
readRegister(handler);
|
|
||||||
} else {
|
|
||||||
executeRead(() -> readRegister(handler));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void read(CompletionHandler<Integer, ByteBuffer> handler) {
|
public final void read(CompletionHandler<Integer, ByteBuffer> handler) {
|
||||||
if (sslEngine == null) {
|
if (sslEngine == null) {
|
||||||
readImpl(handler);
|
readImpl(handler);
|
||||||
@@ -537,67 +512,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public final <A> void writeInLock(
|
|
||||||
ByteBuffer[] srcs,
|
|
||||||
Consumer<ByteBuffer> consumer,
|
|
||||||
A attachment,
|
|
||||||
CompletionHandler<Integer, ? super A> handler) {
|
|
||||||
writeInLock(srcs, 0, srcs.length, consumer, attachment, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void writeInLock(byte[] bytes, CompletionHandler<Integer, Void> handler) {
|
|
||||||
writeInLock(ByteBuffer.wrap(bytes), (Consumer) null, null, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void writeInLock(ByteTuple array, CompletionHandler<Integer, Void> handler) {
|
|
||||||
writeInLock(ByteBuffer.wrap(array.content(), array.offset(), array.length()), null, null, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void writeInLock(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
|
|
||||||
writeInLock(ByteBuffer.wrap(bytes, offset, length), null, null, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void writeInLock(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) {
|
|
||||||
if (body == null) {
|
|
||||||
writeInLock(ByteBuffer.wrap(header.content(), header.offset(), header.length()), null, null, handler);
|
|
||||||
} else if (header == null) {
|
|
||||||
writeInLock(ByteBuffer.wrap(body.content(), body.offset(), body.length()), null, null, handler);
|
|
||||||
} else {
|
|
||||||
writeInLock(
|
|
||||||
new ByteBuffer[] {
|
|
||||||
ByteBuffer.wrap(header.content(), header.offset(), header.length()),
|
|
||||||
ByteBuffer.wrap(body.content(), body.offset(), body.length())
|
|
||||||
},
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void writeInLock(
|
|
||||||
byte[] headerContent,
|
|
||||||
int headerOffset,
|
|
||||||
int headerLength,
|
|
||||||
byte[] bodyContent,
|
|
||||||
int bodyOffset,
|
|
||||||
int bodyLength,
|
|
||||||
CompletionHandler<Integer, Void> handler) {
|
|
||||||
if (bodyContent == null) {
|
|
||||||
writeInLock(ByteBuffer.wrap(headerContent, headerOffset, headerLength), null, null, handler);
|
|
||||||
} else if (headerContent == null) {
|
|
||||||
writeInLock(ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength), null, null, handler);
|
|
||||||
} else {
|
|
||||||
writeInLock(
|
|
||||||
new ByteBuffer[] {
|
|
||||||
ByteBuffer.wrap(headerContent, headerOffset, headerLength),
|
|
||||||
ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength)
|
|
||||||
},
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setReadBuffer(ByteBuffer buffer) {
|
public void setReadBuffer(ByteBuffer buffer) {
|
||||||
if (this.readBuffer != null) {
|
if (this.readBuffer != null) {
|
||||||
throw new RedkaleException("repeat AsyncConnection.setReadBuffer");
|
throw new RedkaleException("repeat AsyncConnection.setReadBuffer");
|
||||||
@@ -611,44 +525,33 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public final void writePipeline(CompletionHandler<Integer, Void> handler) {
|
public final void writePipeline(CompletionHandler<Integer, Void> handler) {
|
||||||
|
writePipeline(null, handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <A> void writePipeline(A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||||
ByteBufferWriter writer = this.pipelineWriter;
|
ByteBufferWriter writer = this.pipelineWriter;
|
||||||
this.pipelineWriter = null;
|
this.pipelineWriter = null;
|
||||||
if (writer == null) {
|
if (writer == null) {
|
||||||
handler.completed(0, null);
|
handler.completed(0, attachment);
|
||||||
} else {
|
} else {
|
||||||
ByteBuffer[] srcs = writer.toBuffers();
|
ByteBuffer[] srcs = writer.toBuffers();
|
||||||
CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
|
CompletionHandler<Integer, ? super A> newHandler = new CompletionHandler<Integer, A>() {
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, Void attachment) {
|
public void completed(Integer result, A attachment) {
|
||||||
offerWriteBuffers(srcs);
|
offerWriteBuffers(srcs);
|
||||||
handler.completed(result, attachment);
|
handler.completed(result, attachment);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable exc, Void attachment) {
|
public void failed(Throwable exc, A attachment) {
|
||||||
offerWriteBuffers(srcs);
|
offerWriteBuffers(srcs);
|
||||||
handler.failed(exc, attachment);
|
handler.failed(exc, attachment);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (srcs.length == 1) {
|
if (srcs.length == 1) {
|
||||||
write(srcs[0], null, newHandler);
|
write(srcs[0], attachment, newHandler);
|
||||||
} else {
|
} else {
|
||||||
write(srcs, null, newHandler);
|
write(srcs, attachment, newHandler);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void writePipelineInLock(CompletionHandler<Integer, Void> handler) {
|
|
||||||
ByteBufferWriter writer = this.pipelineWriter;
|
|
||||||
this.pipelineWriter = null;
|
|
||||||
if (writer == null) {
|
|
||||||
handler.completed(0, null);
|
|
||||||
} else {
|
|
||||||
ByteBuffer[] srcs = writer.toBuffers();
|
|
||||||
if (srcs.length == 1) {
|
|
||||||
writeInLock(srcs[0], this.writeBufferConsumer, null, handler);
|
|
||||||
} else {
|
|
||||||
writeInLock(srcs, this.writeBufferConsumer, null, handler);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -661,6 +564,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final <A> void writePipelineInIOThread(A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||||
|
if (inCurrWriteThread()) {
|
||||||
|
writePipeline(attachment, handler);
|
||||||
|
} else {
|
||||||
|
executeWrite(() -> writePipeline(attachment, handler));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 返回pipelineCount个数数据是否全部写入完毕
|
// 返回pipelineCount个数数据是否全部写入完毕
|
||||||
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) {
|
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) {
|
||||||
return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
|
return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
|
||||||
@@ -688,7 +599,65 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
dataNode.pipelineCount = pipelineCount;
|
dataNode.pipelineCount = pipelineCount;
|
||||||
}
|
}
|
||||||
dataNode.put(pipelineIndex, bs, offset, length);
|
dataNode.put(pipelineIndex, bs, offset, length);
|
||||||
if (writer.getWriteBytesCounter() + dataNode.size == dataNode.pipelineCount) {
|
if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) {
|
||||||
|
for (PipelineDataItem item : dataNode.arrayItems()) {
|
||||||
|
writer.put(item.data);
|
||||||
|
}
|
||||||
|
this.pipelineDataNode = null;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 返回pipelineCount个数数据是否全部写入完毕
|
||||||
|
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) {
|
||||||
|
return appendPipeline(
|
||||||
|
pipelineIndex,
|
||||||
|
pipelineCount,
|
||||||
|
header.content(),
|
||||||
|
header.offset(),
|
||||||
|
header.length(),
|
||||||
|
body == null ? null : body.content(),
|
||||||
|
body == null ? 0 : body.offset(),
|
||||||
|
body == null ? 0 : body.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 返回pipelineCount个数数据是否全部写入完毕
|
||||||
|
public boolean appendPipeline(
|
||||||
|
int pipelineIndex,
|
||||||
|
int pipelineCount,
|
||||||
|
byte[] headerContent,
|
||||||
|
int headerOffset,
|
||||||
|
int headerLength,
|
||||||
|
byte[] bodyContent,
|
||||||
|
int bodyOffset,
|
||||||
|
int bodyLength) {
|
||||||
|
writeLock.lock();
|
||||||
|
try {
|
||||||
|
ByteBufferWriter writer = this.pipelineWriter;
|
||||||
|
if (writer == null) {
|
||||||
|
writer = ByteBufferWriter.create(getWriteBufferSupplier());
|
||||||
|
this.pipelineWriter = writer;
|
||||||
|
}
|
||||||
|
if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) {
|
||||||
|
writer.put(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
|
||||||
|
return (pipelineIndex == pipelineCount);
|
||||||
|
} else {
|
||||||
|
PipelineDataNode dataNode = this.pipelineDataNode;
|
||||||
|
if (dataNode == null) {
|
||||||
|
dataNode = new PipelineDataNode();
|
||||||
|
this.pipelineDataNode = dataNode;
|
||||||
|
}
|
||||||
|
if (pipelineIndex == pipelineCount) { // 此时pipelineCount为最大值
|
||||||
|
dataNode.pipelineCount = pipelineCount;
|
||||||
|
}
|
||||||
|
dataNode.put(
|
||||||
|
pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
|
||||||
|
if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) {
|
||||||
for (PipelineDataItem item : dataNode.arrayItems()) {
|
for (PipelineDataItem item : dataNode.arrayItems()) {
|
||||||
writer.put(item.data);
|
writer.put(item.data);
|
||||||
}
|
}
|
||||||
@@ -706,14 +675,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
|
|
||||||
public int pipelineCount;
|
public int pipelineCount;
|
||||||
|
|
||||||
public int size;
|
public int itemsize;
|
||||||
|
|
||||||
private PipelineDataItem head;
|
private PipelineDataItem head;
|
||||||
|
|
||||||
private PipelineDataItem tail;
|
private PipelineDataItem tail;
|
||||||
|
|
||||||
public PipelineDataItem[] arrayItems() {
|
public PipelineDataItem[] arrayItems() {
|
||||||
PipelineDataItem[] items = new PipelineDataItem[size];
|
PipelineDataItem[] items = new PipelineDataItem[itemsize];
|
||||||
PipelineDataItem item = head;
|
PipelineDataItem item = head;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (item != null) {
|
while (item != null) {
|
||||||
@@ -735,7 +704,28 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
tail.next = item;
|
tail.next = item;
|
||||||
tail = item;
|
tail = item;
|
||||||
}
|
}
|
||||||
size++;
|
itemsize++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(
|
||||||
|
int pipelineIndex,
|
||||||
|
byte[] headerContent,
|
||||||
|
int headerOffset,
|
||||||
|
int headerLength,
|
||||||
|
byte[] bodyContent,
|
||||||
|
int bodyOffset,
|
||||||
|
int bodyLength) {
|
||||||
|
if (tail == null) {
|
||||||
|
head = new PipelineDataItem(
|
||||||
|
pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
|
||||||
|
tail = head;
|
||||||
|
} else {
|
||||||
|
PipelineDataItem item = new PipelineDataItem(
|
||||||
|
pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength);
|
||||||
|
tail.next = item;
|
||||||
|
tail = item;
|
||||||
|
}
|
||||||
|
itemsize++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -749,9 +739,34 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
|
|
||||||
public PipelineDataItem(int index, byte[] bs, int offset, int length) {
|
public PipelineDataItem(int index, byte[] bs, int offset, int length) {
|
||||||
this.index = index;
|
this.index = index;
|
||||||
byte[] result = new byte[length];
|
this.data = Arrays.copyOfRange(bs, offset, offset + length);
|
||||||
System.arraycopy(bs, offset, result, 0, length);
|
}
|
||||||
this.data = result;
|
|
||||||
|
public PipelineDataItem(
|
||||||
|
int index,
|
||||||
|
byte[] headerContent,
|
||||||
|
int headerOffset,
|
||||||
|
int headerLength,
|
||||||
|
byte[] bodyContent,
|
||||||
|
int bodyOffset,
|
||||||
|
int bodyLength) {
|
||||||
|
this.index = index;
|
||||||
|
this.data = bodyLength > 0
|
||||||
|
? copyOfRange(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength)
|
||||||
|
: Arrays.copyOfRange(headerContent, headerOffset, headerOffset + headerLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] copyOfRange(
|
||||||
|
byte[] headerContent,
|
||||||
|
int headerOffset,
|
||||||
|
int headerLength,
|
||||||
|
byte[] bodyContent,
|
||||||
|
int bodyOffset,
|
||||||
|
int bodyLength) {
|
||||||
|
byte[] result = new byte[headerLength + bodyLength];
|
||||||
|
System.arraycopy(headerContent, headerOffset, result, 0, headerLength);
|
||||||
|
System.arraycopy(bodyContent, bodyOffset, result, headerLength, bodyLength);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import java.nio.channels.*;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import org.redkale.util.ByteBufferWriter;
|
import org.redkale.util.ByteBufferWriter;
|
||||||
|
|
||||||
@@ -324,149 +323,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public <A> void writeInLock(
|
|
||||||
ByteBuffer buffer,
|
|
||||||
Consumer<ByteBuffer> consumer,
|
|
||||||
A attachment,
|
|
||||||
CompletionHandler<Integer, ? super A> handler) {
|
|
||||||
int total = 0;
|
|
||||||
Exception t = null;
|
|
||||||
lockWrite();
|
|
||||||
try {
|
|
||||||
if (this.writePending) {
|
|
||||||
handler.failed(new WritePendingException(), attachment);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.writePending = true;
|
|
||||||
while (buffer.hasRemaining()) { // 必须要将buffer写完为止
|
|
||||||
int c = implWrite(buffer);
|
|
||||||
if (c < 0) {
|
|
||||||
t = new ClosedChannelException();
|
|
||||||
total = c;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
total += c;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
t = e;
|
|
||||||
} finally {
|
|
||||||
this.writePending = false;
|
|
||||||
unlockWrite();
|
|
||||||
}
|
|
||||||
if (consumer != null) {
|
|
||||||
consumer.accept(buffer);
|
|
||||||
}
|
|
||||||
if (t != null) {
|
|
||||||
handler.failed(t, attachment);
|
|
||||||
} else {
|
|
||||||
handler.completed(total, attachment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <A> void writeInLock(
|
|
||||||
ByteBuffer[] srcs,
|
|
||||||
int offset,
|
|
||||||
int length,
|
|
||||||
Consumer<ByteBuffer> consumer,
|
|
||||||
A attachment,
|
|
||||||
CompletionHandler<Integer, ? super A> handler) {
|
|
||||||
int total = 0;
|
|
||||||
Exception t = null;
|
|
||||||
int batchOffset = offset;
|
|
||||||
int batchLength = length;
|
|
||||||
ByteBuffer[] batchBuffers = srcs;
|
|
||||||
lockWrite();
|
|
||||||
try {
|
|
||||||
if (this.writePending) {
|
|
||||||
handler.failed(new WritePendingException(), attachment);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.writePending = true;
|
|
||||||
boolean hasRemain = true;
|
|
||||||
while (hasRemain) { // 必须要将buffer写完为止
|
|
||||||
int c = implWrite(batchBuffers, batchOffset, batchLength);
|
|
||||||
if (c < 0) {
|
|
||||||
t = new ClosedChannelException();
|
|
||||||
total = c;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
boolean remain = false;
|
|
||||||
for (int i = 0; i < batchLength; i++) {
|
|
||||||
if (batchBuffers[batchOffset + i].hasRemaining()) {
|
|
||||||
remain = true;
|
|
||||||
batchOffset += i;
|
|
||||||
batchLength -= i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hasRemain = remain;
|
|
||||||
total += c;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
t = e;
|
|
||||||
} finally {
|
|
||||||
this.writePending = false;
|
|
||||||
unlockWrite();
|
|
||||||
}
|
|
||||||
if (consumer != null) {
|
|
||||||
for (int i = 0; i < length; i++) {
|
|
||||||
consumer.accept(srcs[offset + i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (t != null) {
|
|
||||||
handler.failed(t, attachment);
|
|
||||||
} else {
|
|
||||||
handler.completed(total, attachment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <A> void writeInLock(
|
|
||||||
Supplier<ByteBuffer> supplier,
|
|
||||||
Consumer<ByteBuffer> consumer,
|
|
||||||
A attachment,
|
|
||||||
CompletionHandler<Integer, ? super A> handler) {
|
|
||||||
int total = 0;
|
|
||||||
Exception t = null;
|
|
||||||
lockWrite();
|
|
||||||
try {
|
|
||||||
ByteBuffer buffer = supplier.get();
|
|
||||||
if (buffer == null || !buffer.hasRemaining()) {
|
|
||||||
handler.completed(total, attachment);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.writePending) {
|
|
||||||
handler.failed(new WritePendingException(), attachment);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.writePending = true;
|
|
||||||
while (buffer.hasRemaining()) { // 必须要将buffer写完为止
|
|
||||||
int c = implWrite(buffer);
|
|
||||||
if (c < 0) {
|
|
||||||
t = new ClosedChannelException();
|
|
||||||
total = c;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
total += c;
|
|
||||||
}
|
|
||||||
if (consumer != null) {
|
|
||||||
consumer.accept(buffer);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
t = e;
|
|
||||||
} finally {
|
|
||||||
this.writePending = false;
|
|
||||||
unlockWrite();
|
|
||||||
}
|
|
||||||
if (t != null) {
|
|
||||||
handler.failed(t, attachment);
|
|
||||||
} else {
|
|
||||||
handler.completed(total, attachment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doWrite() {
|
public void doWrite() {
|
||||||
try {
|
try {
|
||||||
this.writeTime = System.currentTimeMillis();
|
this.writeTime = System.currentTimeMillis();
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
|
|
||||||
private final ByteBuffer writeBuffer;
|
private final ByteBuffer writeBuffer;
|
||||||
|
|
||||||
private final CompletionHandler<Integer, Void> finishVoidIOThreadHandler = new CompletionHandler<Integer, Void>() {
|
private final CompletionHandler finishBytesIOThreadHandler = new CompletionHandler<Integer, Void>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, Void attachment) {
|
public void completed(Integer result, Void attachment) {
|
||||||
@@ -74,6 +74,29 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private final CompletionHandler finishBufferIOThreadHandler = new CompletionHandler<Integer, ByteBuffer>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completed(Integer result, ByteBuffer attachment) {
|
||||||
|
if (attachment != writeBuffer) {
|
||||||
|
channel.offerWriteBuffer(attachment);
|
||||||
|
} else {
|
||||||
|
attachment.clear();
|
||||||
|
}
|
||||||
|
completeInIOThread(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(Throwable exc, ByteBuffer attachment) {
|
||||||
|
if (attachment != writeBuffer) {
|
||||||
|
channel.offerWriteBuffer(attachment);
|
||||||
|
} else {
|
||||||
|
attachment.clear();
|
||||||
|
}
|
||||||
|
completeInIOThread(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
protected Response(C context, final R request) {
|
protected Response(C context, final R request) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
@@ -82,6 +105,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
this.workExecutor = context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
|
this.workExecutor = context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ByteBuffer writeBuffer() {
|
||||||
|
return writeBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
protected AsyncConnection removeChannel() {
|
protected AsyncConnection removeChannel() {
|
||||||
AsyncConnection ch = this.channel;
|
AsyncConnection ch = this.channel;
|
||||||
this.channel = null;
|
this.channel = null;
|
||||||
@@ -299,15 +326,27 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
}
|
}
|
||||||
if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) {
|
if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) {
|
||||||
AsyncConnection conn = removeChannel();
|
AsyncConnection conn = removeChannel();
|
||||||
if (conn != null) {
|
if (conn != null && conn.protocolCodec != null) {
|
||||||
this.responseConsumer.accept(this);
|
this.responseConsumer.accept(this);
|
||||||
if (!request.readCompleted) {
|
if (!request.readCompleted) {
|
||||||
conn.readRegisterInIOThread(conn.protocolCodec);
|
conn.readRegister(conn.protocolCodec);
|
||||||
}
|
}
|
||||||
return;
|
} else {
|
||||||
|
Supplier<Response> poolSupplier = this.responseSupplier;
|
||||||
|
Consumer<Response> poolConsumer = this.responseConsumer;
|
||||||
|
this.recycle();
|
||||||
|
new ProtocolCodec(context, poolSupplier, poolConsumer, conn)
|
||||||
|
.response(this)
|
||||||
|
.run(null);
|
||||||
|
request.readCompleted = false;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
this.responseConsumer.accept(this);
|
||||||
}
|
}
|
||||||
this.responseConsumer.accept(this);
|
}
|
||||||
|
|
||||||
|
protected void writeInIOThread(ByteBuffer buffer) {
|
||||||
|
this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void finish(final byte[] bs) {
|
public final void finish(final byte[] bs) {
|
||||||
@@ -339,27 +378,50 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
|
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
|
||||||
if (allCompleted) {
|
if (allCompleted) {
|
||||||
request.pipelineCompleted = true;
|
request.pipelineCompleted = true;
|
||||||
this.channel.writePipelineInLock(this.finishVoidIOThreadHandler);
|
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
|
||||||
} else {
|
} else {
|
||||||
removeChannel();
|
removeChannel();
|
||||||
this.responseConsumer.accept(this);
|
this.responseConsumer.accept(this);
|
||||||
}
|
}
|
||||||
} else if (this.channel.hasPipelineData()) {
|
} else if (this.channel.hasPipelineData()) {
|
||||||
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
|
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
|
||||||
this.channel.writePipelineInLock(this.finishVoidIOThreadHandler);
|
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
|
||||||
} else {
|
} else {
|
||||||
ByteBuffer buffer = this.writeBuffer;
|
ByteBuffer buffer = this.writeBuffer;
|
||||||
if (buffer != null && buffer.capacity() >= length) {
|
if (buffer != null && buffer.capacity() >= length) {
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
buffer.put(bs, offset, length);
|
buffer.put(bs, offset, length);
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
this.channel.writeInLock(buffer, (Consumer) null, null, finishVoidIOThreadHandler);
|
this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler);
|
||||||
} else {
|
} else {
|
||||||
this.channel.writeInLock(bs, offset, length, finishVoidIOThreadHandler);
|
this.channel.writeInIOThread(bs, offset, length, finishBytesIOThreadHandler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) {
|
||||||
|
if (kill) {
|
||||||
|
refuseAlive();
|
||||||
|
}
|
||||||
|
if (request.pipelineIndex > 0) {
|
||||||
|
boolean allCompleted = this.channel.appendPipeline(
|
||||||
|
request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
|
||||||
|
if (allCompleted) {
|
||||||
|
request.pipelineCompleted = true;
|
||||||
|
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
|
||||||
|
} else {
|
||||||
|
removeChannel();
|
||||||
|
this.responseConsumer.accept(this);
|
||||||
|
}
|
||||||
|
} else if (this.channel.hasPipelineData()) {
|
||||||
|
this.channel.appendPipeline(
|
||||||
|
request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
|
||||||
|
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
|
||||||
|
} else {
|
||||||
|
this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) {
|
protected void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) {
|
||||||
ByteBuffer buffer = this.writeBuffer;
|
ByteBuffer buffer = this.writeBuffer;
|
||||||
if (buffer != null && buffer.capacity() >= array.length()) {
|
if (buffer != null && buffer.capacity() >= array.length()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user