This commit is contained in:
Redkale
2019-01-18 16:55:48 +08:00
parent cf2ab617c2
commit d605045858

View File

@@ -112,9 +112,20 @@ public class TcpAioAsyncConnection extends AsyncConnection {
private <A> void nextWrite(Throwable exc, A attachment) { private <A> void nextWrite(Throwable exc, A attachment) {
BlockingQueue<WriteEntry> queue = this.writeQueue; BlockingQueue<WriteEntry> queue = this.writeQueue;
if (exc != null && !isOpen()) {
WriteEntry entry;
while ((entry = queue.poll()) != null) {
try {
entry.writeHandler.failed(exc, entry.writeAttachment);
} catch (Throwable e) {
e.printStackTrace(System.err);
}
}
return;
}
WriteEntry entry = queue == null ? null : queue.poll(); WriteEntry entry = queue == null ? null : queue.poll();
if (entry != null) { if (entry != null) {
if (exc == null) {
try { try {
if (entry.writeOneBuffer == null) { if (entry.writeOneBuffer == null) {
write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler); write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler);
@@ -124,9 +135,6 @@ public class TcpAioAsyncConnection extends AsyncConnection {
} catch (Exception e) { } catch (Exception e) {
entry.writeHandler.failed(e, entry.writeAttachment); entry.writeHandler.failed(e, entry.writeAttachment);
} }
} else { //当连接已经关掉了不需要调用write方法直接报异常
entry.writeHandler.failed(exc, entry.writeAttachment);
}
} else { } else {
semaphore.release(); semaphore.release();
} }
@@ -316,18 +324,27 @@ public class TcpAioAsyncConnection extends AsyncConnection {
failed(e, attachment); failed(e, attachment);
return; return;
} }
nextWrite(null, attachment); try {
writeHandler.completed(writeCount, attachment); writeHandler.completed(writeCount, attachment);
} else { } finally {
nextWrite(null, attachment); nextWrite(null, attachment);
}
} else {
try {
writeHandler.completed(result.intValue(), attachment); writeHandler.completed(result.intValue(), attachment);
} finally {
nextWrite(null, attachment);
}
} }
} }
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
nextWrite(isOpen() ? null : exc, attachment); try {
writeHandler.failed(exc, attachment); writeHandler.failed(exc, attachment);
} finally {
nextWrite(exc, attachment);
}
} }
} }
@@ -354,14 +371,21 @@ public class TcpAioAsyncConnection extends AsyncConnection {
failed(e, attachment); failed(e, attachment);
return; return;
} }
nextWrite(null, attachment); try {
writeHandler.completed(result, attachment); writeHandler.completed(result, attachment);
} finally {
nextWrite(null, attachment);
}
} }
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
nextWrite(isOpen() ? null : exc, attachment); try {
writeHandler.failed(exc, attachment); writeHandler.failed(exc, attachment);
} finally {
nextWrite(exc, attachment);
}
} }
} }