diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index 62e2585b1..81bf3779c 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -24,7 +24,7 @@ import javax.net.ssl.SSLContext; */ public class TcpAioAsyncConnection extends AsyncConnection { - private final Semaphore semaphore = new Semaphore(1); + //private final Semaphore semaphore = new Semaphore(1); private int readTimeoutSeconds; @@ -103,35 +103,35 @@ public class TcpAioAsyncConnection extends AsyncConnection { } } - private void nextWrite(Throwable exc, A attachment) { - BlockingQueue queue = this.writeQueue; - if (queue != null && exc != null && !isOpen()) { - WriteEntry entry; - while ((entry = queue.poll()) != null) { - try { - entry.writeHandler.failed(exc, entry.writeAttachment); - } catch (Throwable e) { - e.printStackTrace(System.err); - } - } - return; - } - WriteEntry entry = queue == null ? null : queue.poll(); - - if (entry != null) { - try { - if (entry.writeOneBuffer == null) { - write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler); - } else { - write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler); - } - } catch (Exception e) { - entry.writeHandler.failed(e, entry.writeAttachment); - } - } else { - semaphore.release(); - } - } +// private void nextWrite(Throwable exc, A attachment) { +// BlockingQueue queue = this.writeQueue; +// if (queue != null && exc != null && !isOpen()) { +// WriteEntry entry; +// while ((entry = queue.poll()) != null) { +// try { +// entry.writeHandler.failed(exc, entry.writeAttachment); +// } catch (Throwable e) { +// e.printStackTrace(System.err); +// } +// } +// return; +// } +// WriteEntry entry = queue == null ? null : queue.poll(); +// +// if (entry != null) { +// try { +// if (entry.writeOneBuffer == null) { +// write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler); +// } else { +// write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler); +// } +// } catch (Exception e) { +// entry.writeHandler.failed(e, entry.writeAttachment); +// } +// } else { +// semaphore.release(); +// } +// } @Override public void write(ByteBuffer src, A attachment, CompletionHandler handler) { @@ -139,17 +139,17 @@ public class TcpAioAsyncConnection extends AsyncConnection { } private void write(boolean acquire, ByteBuffer src, A attachment, CompletionHandler handler) { - if (acquire && !semaphore.tryAcquire()) { - if (this.writeQueue == null) { - synchronized (semaphore) { - if (this.writeQueue == null) { - this.writeQueue = new LinkedBlockingDeque<>(); - } - } - } - this.writeQueue.add(new WriteEntry(src, attachment, handler)); - return; - } +// if (acquire && !semaphore.tryAcquire()) { +// if (this.writeQueue == null) { +// synchronized (semaphore) { +// if (this.writeQueue == null) { +// this.writeQueue = new LinkedBlockingDeque<>(); +// } +// } +// } +// this.writeQueue.add(new WriteEntry(src, attachment, handler)); +// return; +// } WriteOneCompletionHandler newHandler = new WriteOneCompletionHandler(src, handler); if (!channel.isOpen()) { newHandler.failed(new ClosedChannelException(), attachment); @@ -173,17 +173,17 @@ public class TcpAioAsyncConnection extends AsyncConnection { } private void write(boolean acquire, ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler handler) { - if (acquire && !semaphore.tryAcquire()) { - if (this.writeQueue == null) { - synchronized (semaphore) { - if (this.writeQueue == null) { - this.writeQueue = new LinkedBlockingDeque<>(); - } - } - } - this.writeQueue.add(new WriteEntry(srcs, offset, length, attachment, handler)); - return; - } +// if (acquire && !semaphore.tryAcquire()) { +// if (this.writeQueue == null) { +// synchronized (semaphore) { +// if (this.writeQueue == null) { +// this.writeQueue = new LinkedBlockingDeque<>(); +// } +// } +// } +// this.writeQueue.add(new WriteEntry(srcs, offset, length, attachment, handler)); +// return; +// } WriteMoreCompletionHandler newHandler = new WriteMoreCompletionHandler(srcs, offset, length, handler); if (!channel.isOpen()) { newHandler.failed(new ClosedChannelException(), attachment); @@ -317,27 +317,27 @@ public class TcpAioAsyncConnection extends AsyncConnection { failed(e, attachment); return; } - try { +// try { writeHandler.completed(writeCount, attachment); - } finally { - nextWrite(null, attachment); - } +// } finally { +// nextWrite(null, attachment); +// } } else { - try { +// try { writeHandler.completed(result.intValue(), attachment); - } finally { - nextWrite(null, attachment); - } +// } finally { +// nextWrite(null, attachment); +// } } } @Override public void failed(Throwable exc, A attachment) { - try { +// try { writeHandler.failed(exc, attachment); - } finally { - nextWrite(exc, attachment); - } +// } finally { +// nextWrite(exc, attachment); +// } } } @@ -364,21 +364,21 @@ public class TcpAioAsyncConnection extends AsyncConnection { failed(e, attachment); return; } - try { +// try { writeHandler.completed(result, attachment); - } finally { - nextWrite(null, attachment); - } +// } finally { +// nextWrite(null, attachment); +// } } @Override public void failed(Throwable exc, A attachment) { - try { +// try { writeHandler.failed(exc, attachment); - } finally { - nextWrite(exc, attachment); - } +// } finally { +// nextWrite(exc, attachment); +// } } }