TcpAioAsyncConnection去掉队列写

This commit is contained in:
Redkale
2020-01-17 13:57:41 +08:00
parent f5f3c48f38
commit 0f52d32424

View File

@@ -24,7 +24,7 @@ import javax.net.ssl.SSLContext;
*/
public class TcpAioAsyncConnection extends AsyncConnection {
private final Semaphore semaphore = new Semaphore(1);
//private final Semaphore semaphore = new Semaphore(1);
private int readTimeoutSeconds;
@@ -103,35 +103,35 @@ public class TcpAioAsyncConnection extends AsyncConnection {
}
}
private <A> void nextWrite(Throwable exc, A attachment) {
BlockingQueue<WriteEntry> queue = this.writeQueue;
if (queue != null && exc != null && !isOpen()) {
WriteEntry entry;
while ((entry = queue.poll()) != null) {
try {
entry.writeHandler.failed(exc, entry.writeAttachment);
} catch (Throwable e) {
e.printStackTrace(System.err);
}
}
return;
}
WriteEntry entry = queue == null ? null : queue.poll();
if (entry != null) {
try {
if (entry.writeOneBuffer == null) {
write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler);
} else {
write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler);
}
} catch (Exception e) {
entry.writeHandler.failed(e, entry.writeAttachment);
}
} else {
semaphore.release();
}
}
// private <A> void nextWrite(Throwable exc, A attachment) {
// BlockingQueue<WriteEntry> queue = this.writeQueue;
// if (queue != null && exc != null && !isOpen()) {
// WriteEntry entry;
// while ((entry = queue.poll()) != null) {
// try {
// entry.writeHandler.failed(exc, entry.writeAttachment);
// } catch (Throwable e) {
// e.printStackTrace(System.err);
// }
// }
// return;
// }
// WriteEntry entry = queue == null ? null : queue.poll();
//
// if (entry != null) {
// try {
// if (entry.writeOneBuffer == null) {
// write(false, entry.writeBuffers, entry.writeOffset, entry.writeLength, entry.writeAttachment, entry.writeHandler);
// } else {
// write(false, entry.writeOneBuffer, entry.writeAttachment, entry.writeHandler);
// }
// } catch (Exception e) {
// entry.writeHandler.failed(e, entry.writeAttachment);
// }
// } else {
// semaphore.release();
// }
// }
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
@@ -139,17 +139,17 @@ public class TcpAioAsyncConnection extends AsyncConnection {
}
private <A> void write(boolean acquire, ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (acquire && !semaphore.tryAcquire()) {
if (this.writeQueue == null) {
synchronized (semaphore) {
if (this.writeQueue == null) {
this.writeQueue = new LinkedBlockingDeque<>();
}
}
}
this.writeQueue.add(new WriteEntry(src, attachment, handler));
return;
}
// if (acquire && !semaphore.tryAcquire()) {
// if (this.writeQueue == null) {
// synchronized (semaphore) {
// if (this.writeQueue == null) {
// this.writeQueue = new LinkedBlockingDeque<>();
// }
// }
// }
// this.writeQueue.add(new WriteEntry(src, attachment, handler));
// return;
// }
WriteOneCompletionHandler newHandler = new WriteOneCompletionHandler(src, handler);
if (!channel.isOpen()) {
newHandler.failed(new ClosedChannelException(), attachment);
@@ -173,17 +173,17 @@ public class TcpAioAsyncConnection extends AsyncConnection {
}
private <A> void write(boolean acquire, ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
if (acquire && !semaphore.tryAcquire()) {
if (this.writeQueue == null) {
synchronized (semaphore) {
if (this.writeQueue == null) {
this.writeQueue = new LinkedBlockingDeque<>();
}
}
}
this.writeQueue.add(new WriteEntry(srcs, offset, length, attachment, handler));
return;
}
// if (acquire && !semaphore.tryAcquire()) {
// if (this.writeQueue == null) {
// synchronized (semaphore) {
// if (this.writeQueue == null) {
// this.writeQueue = new LinkedBlockingDeque<>();
// }
// }
// }
// this.writeQueue.add(new WriteEntry(srcs, offset, length, attachment, handler));
// return;
// }
WriteMoreCompletionHandler newHandler = new WriteMoreCompletionHandler(srcs, offset, length, handler);
if (!channel.isOpen()) {
newHandler.failed(new ClosedChannelException(), attachment);
@@ -317,27 +317,27 @@ public class TcpAioAsyncConnection extends AsyncConnection {
failed(e, attachment);
return;
}
try {
// try {
writeHandler.completed(writeCount, attachment);
} finally {
nextWrite(null, attachment);
}
// } finally {
// nextWrite(null, attachment);
// }
} else {
try {
// try {
writeHandler.completed(result.intValue(), attachment);
} finally {
nextWrite(null, attachment);
}
// } finally {
// nextWrite(null, attachment);
// }
}
}
@Override
public void failed(Throwable exc, A attachment) {
try {
// try {
writeHandler.failed(exc, attachment);
} finally {
nextWrite(exc, attachment);
}
// } finally {
// nextWrite(exc, attachment);
// }
}
}
@@ -364,21 +364,21 @@ public class TcpAioAsyncConnection extends AsyncConnection {
failed(e, attachment);
return;
}
try {
// try {
writeHandler.completed(result, attachment);
} finally {
nextWrite(null, attachment);
}
// } finally {
// nextWrite(null, attachment);
// }
}
@Override
public void failed(Throwable exc, A attachment) {
try {
// try {
writeHandler.failed(exc, attachment);
} finally {
nextWrite(exc, attachment);
}
// } finally {
// nextWrite(exc, attachment);
// }
}
}