This commit is contained in:
Redkale
2018-05-25 15:31:43 +08:00
parent 4cd5bd37d3
commit cc864e3e69
2 changed files with 46 additions and 18 deletions

View File

@@ -247,6 +247,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
ByteBuffer[] writeBuffers; ByteBuffer[] writeBuffers;
int writingCount;
int writeOffset; int writeOffset;
int writeLength; int writeLength;
@@ -364,6 +366,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return buffers; return buffers;
} }
int removeWritingCount() {
int rs = this.writingCount;
this.writingCount = 0;
return rs;
}
int removeWriteOffset() { int removeWriteOffset() {
int rs = this.writeOffset; int rs = this.writeOffset;
this.writeOffset = 0; this.writeOffset = 0;
@@ -454,6 +462,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
this.writeBuffers = srcs; this.writeBuffers = srcs;
this.writeOffset = offset; this.writeOffset = offset;
this.writeLength = length; this.writeLength = length;
this.writingCount = 0;
this.writeAttachment = attachment; this.writeAttachment = attachment;
this.writeHandler = handler; this.writeHandler = handler;
if (key == null) { if (key == null) {
@@ -473,6 +482,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
if (this.writeHandler != null) throw new RuntimeException("pending write"); if (this.writeHandler != null) throw new RuntimeException("pending write");
try { try {
this.writeOneBuffer = src; this.writeOneBuffer = src;
this.writingCount = 0;
this.writeAttachment = attachment; this.writeAttachment = attachment;
this.writeHandler = handler; this.writeHandler = handler;
if (key == null) { if (key == null) {

View File

@@ -461,10 +461,10 @@ public abstract class ProtocolServer {
return; return;
} }
if (conn == null) return; if (conn == null) return;
if (key.isReadable()) { if (key.isWritable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
} else if (key.isWritable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn); if (conn.writeHandler != null) writeOP(key, socket, conn);
} else if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
} }
} }
@@ -496,10 +496,11 @@ public abstract class ProtocolServer {
} }
private void writeOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) { private void writeOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) {
final CompletionHandler handler = conn.removeWriteHandler(); final CompletionHandler handler = conn.writeHandler;
final ByteBuffer oneBuffer = conn.removeWriteOneBuffer(); final ByteBuffer oneBuffer = conn.removeWriteOneBuffer();
final ByteBuffer[] buffers = conn.removeWriteBuffers(); final ByteBuffer[] buffers = conn.removeWriteBuffers();
final Object attach = conn.removeWriteAttachment(); final Object attach = conn.removeWriteAttachment();
final int writingCount = conn.removeWritingCount();
final int writeOffset = conn.removeWriteOffset(); final int writeOffset = conn.removeWriteOffset();
final int writeLength = conn.removeWriteLength(); final int writeLength = conn.removeWriteLength();
if (handler == null || (oneBuffer == null && buffers == null)) return; if (handler == null || (oneBuffer == null && buffers == null)) return;
@@ -509,25 +510,42 @@ public abstract class ProtocolServer {
if (oneBuffer == null) { if (oneBuffer == null) {
int offset = writeOffset; int offset = writeOffset;
int length = writeLength; int length = writeLength;
for (;;) { rs = (int) socket.write(buffers, offset, length);
long sr = socket.write(buffers, offset, length); boolean over = true;
if (sr > 0) rs += sr; int end = offset + length;
boolean over = true; for (int i = offset; i < end; i++) {
int end = offset + length; if (buffers[i].hasRemaining()) {
for (int i = offset; i < end; i++) { over = false;
if (buffers[i].hasRemaining()) { length -= i - offset;
over = false; offset = i;
length -= i - offset;
offset = i;
}
} }
if (over) break; }
if (!over) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeBuffers = buffers;
conn.writeOffset = offset;
conn.writeLength = length;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
} }
} else { } else {
while (oneBuffer.hasRemaining()) rs += socket.write(oneBuffer); rs = socket.write(oneBuffer);
if (oneBuffer.hasRemaining()) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeOneBuffer = oneBuffer;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
} }
conn.removeWriteHandler();
key.interestOps(SelectionKey.OP_READ); //OP_CONNECT key.interestOps(SelectionKey.OP_READ); //OP_CONNECT
final int rs0 = rs; final int rs0 = rs + writingCount;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs); //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs);
context.runAsync(() -> { context.runAsync(() -> {
try { try {