This commit is contained in:
Redkale
2018-05-19 14:58:31 +08:00
parent 00034981ef
commit 8d1022f70f

View File

@@ -127,45 +127,67 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
conn.beforeCloseListener((c) -> usingCounter.decrementAndGet());
CompletableFuture<AsyncConnection> future = new CompletableFuture();
final ByteBuffer buffer = reqConnectBuffer(conn);
conn.write(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment1) {
if (result < 0) {
failed(new SQLException("Write Buffer Error"), attachment1);
return;
if (buffer == null) {
final ByteBuffer rbuffer = bufferPool.get();
conn.read(rbuffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment2) {
if (result < 0) {
failed(new SQLException("Read Buffer Error"), attachment2);
return;
}
rbuffer.flip();
respConnectBuffer(rbuffer, future, conn);
}
if (buffer.hasRemaining()) {
conn.write(buffer, attachment1, this);
return;
@Override
public void failed(Throwable exc, Void attachment2) {
bufferPool.accept(rbuffer);
future.completeExceptionally(exc);
conn.dispose();
}
buffer.clear();
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment2) {
if (result < 0) {
failed(new SQLException("Read Buffer Error"), attachment2);
return;
});
} else {
conn.write(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment1) {
if (result < 0) {
failed(new SQLException("Write Buffer Error"), attachment1);
return;
}
if (buffer.hasRemaining()) {
conn.write(buffer, attachment1, this);
return;
}
buffer.clear();
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment2) {
if (result < 0) {
failed(new SQLException("Read Buffer Error"), attachment2);
return;
}
buffer.flip();
respConnectBuffer(buffer, future, conn);
}
buffer.flip();
respConnectBuffer(buffer, future, conn);
}
@Override
public void failed(Throwable exc, Void attachment2) {
bufferPool.accept(buffer);
future.completeExceptionally(exc);
conn.dispose();
}
});
}
@Override
public void failed(Throwable exc, Void attachment2) {
bufferPool.accept(buffer);
future.completeExceptionally(exc);
conn.dispose();
}
});
}
@Override
public void failed(Throwable exc, Void attachment1) {
bufferPool.accept(buffer);
future.completeExceptionally(exc);
conn.dispose();
}
});
@Override
public void failed(Throwable exc, Void attachment1) {
bufferPool.accept(buffer);
future.completeExceptionally(exc);
conn.dispose();
}
});
}
return future;
}).whenComplete((c, t) -> {
if (t == null) {