From d6050458586fde9d6588bc8144431e030a663cb8 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 18 Jan 2019 16:55:48 +0800 Subject: [PATCH] --- .../redkale/net/TcpAioAsyncConnection.java | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index da1c13ef7..703f599bd 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -112,20 +112,28 @@ public class TcpAioAsyncConnection extends AsyncConnection { private void nextWrite(Throwable exc, A attachment) { BlockingQueue queue = this.writeQueue; - WriteEntry entry = queue == null ? null : queue.poll(); - if (entry != null) { - if (exc == null) { + if (exc != null && !isOpen()) { + WriteEntry entry; + while ((entry = queue.poll()) != null) { try { - if (entry.writeOneBuffer == null) { - write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler); - } else { - write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler); - } - } catch (Exception e) { - entry.writeHandler.failed(e, entry.writeAttachment); + entry.writeHandler.failed(exc, entry.writeAttachment); + } catch (Throwable e) { + e.printStackTrace(System.err); } - } else { //当连接已经关掉了,不需要调用write方法,直接报异常 - entry.writeHandler.failed(exc, entry.writeAttachment); + } + return; + } + WriteEntry entry = queue == null ? null : queue.poll(); + + if (entry != null) { + try { + if (entry.writeOneBuffer == null) { + write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler); + } else { + write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler); + } + } catch (Exception e) { + entry.writeHandler.failed(e, entry.writeAttachment); } } else { semaphore.release(); @@ -316,18 +324,27 @@ public class TcpAioAsyncConnection extends AsyncConnection { failed(e, attachment); return; } - nextWrite(null, attachment); - writeHandler.completed(writeCount, attachment); + try { + writeHandler.completed(writeCount, attachment); + } finally { + nextWrite(null, attachment); + } } else { - nextWrite(null, attachment); - writeHandler.completed(result.intValue(), attachment); + try { + writeHandler.completed(result.intValue(), attachment); + } finally { + nextWrite(null, attachment); + } } } @Override public void failed(Throwable exc, A attachment) { - nextWrite(isOpen() ? null : exc, attachment); - writeHandler.failed(exc, attachment); + try { + writeHandler.failed(exc, attachment); + } finally { + nextWrite(exc, attachment); + } } } @@ -354,14 +371,21 @@ public class TcpAioAsyncConnection extends AsyncConnection { failed(e, attachment); return; } - nextWrite(null, attachment); - writeHandler.completed(result, attachment); + try { + writeHandler.completed(result, attachment); + } finally { + nextWrite(null, attachment); + } + } @Override public void failed(Throwable exc, A attachment) { - nextWrite(isOpen() ? null : exc, attachment); - writeHandler.failed(exc, attachment); + try { + writeHandler.failed(exc, attachment); + } finally { + nextWrite(exc, attachment); + } } }