AsyncConnection移除超时参数

This commit is contained in:
redkale
2024-06-10 13:21:43 +08:00
parent 4478c521e6
commit 2ee9dcfeb2
11 changed files with 90 additions and 209 deletions

View File

@@ -217,14 +217,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract SocketAddress getLocalAddress();
public abstract int getReadTimeoutSeconds();
public abstract int getWriteTimeoutSeconds();
public abstract void setReadTimeoutSeconds(int readTimeoutSeconds);
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
// public abstract <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler);
//
// public abstract <A> void fastWrite(byte[] data);
@@ -430,8 +422,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
};
write(buffer, handlerAttachment, newHandler);
} else {
ByteBufferWriter writer = ByteBufferWriter.create(
sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer);
ByteBufferWriter writer =
ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : this::pollWriteSSLBuffer, buffer);
writer.put(headerContent, headerOffset, headerLength);
if (bodyLength > 0) {
writer.put(bodyContent, bodyOffset, bodyLength);

View File

@@ -35,60 +35,44 @@ public abstract class AsyncGroup {
}
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {
return createTCPClient(address, 0, 0, 0);
return createTCPClient(address, 0);
}
/**
* 创建TCP连接
*
* @see org.redkale.net.AsyncIOGroup#createTCPClient(java.net.SocketAddress, int, int, int)
* @see org.redkale.net.AsyncIOGroup#createTCPClient(java.net.SocketAddress, int)
*
* @param address 地址
* @param connectTimeoutSeconds 连接超时
* @param readTimeoutSeconds 读超时
* @param writeTimeoutSeconds 写超时
* @return AsyncConnection
*/
public abstract CompletableFuture<AsyncConnection> createTCPClient(
final SocketAddress address,
final int connectTimeoutSeconds,
final int readTimeoutSeconds,
final int writeTimeoutSeconds);
SocketAddress address, int connectTimeoutSeconds);
public CompletableFuture<AsyncConnection> createUDPClient(final SocketAddress address) {
return createUDPClient(address, 0, 0, 0);
return createUDPClient(address, 0);
}
/**
* 创建UDP连接
*
* @see org.redkale.net.AsyncIOGroup#createUDPClient(java.net.SocketAddress, int, int, int)
* @see org.redkale.net.AsyncIOGroup#createUDPClient(java.net.SocketAddress, int)
*
* @param address 地址
* @param connectTimeoutSeconds 连接超时
* @param readTimeoutSeconds 读超时
* @param writeTimeoutSeconds 写超时
* @return AsyncConnection
*/
public abstract CompletableFuture<AsyncConnection> createUDPClient(
final SocketAddress address,
final int connectTimeoutSeconds,
final int readTimeoutSeconds,
final int writeTimeoutSeconds);
SocketAddress address, int connectTimeoutSeconds);
public CompletableFuture<AsyncConnection> createClient(final boolean tcp, final SocketAddress address) {
return tcp ? createTCPClient(address) : createUDPClient(address);
}
public CompletableFuture<AsyncConnection> createClient(
final boolean tcp,
final SocketAddress address,
final int connectTimeoutSeconds,
final int readTimeoutSeconds,
final int writeTimeoutSeconds) {
return tcp
? createTCPClient(address, connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds)
: createUDPClient(address, connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds);
boolean tcp, SocketAddress address, int connectTimeoutSeconds) {
return tcp ? createTCPClient(address, connectTimeoutSeconds) : createUDPClient(address, connectTimeoutSeconds);
}
/**

View File

@@ -246,11 +246,7 @@ public class AsyncIOGroup extends AsyncGroup {
}
@Override
public CompletableFuture<AsyncConnection> createTCPClient(
final SocketAddress address,
final int connectTimeoutSeconds,
final int readTimeoutSeconds,
final int writeTimeoutSeconds) {
public CompletableFuture<AsyncConnection> createTCPClient(SocketAddress address, int connectTimeoutSeconds) {
Objects.requireNonNull(address);
AsyncNioTcpConnection conn;
try {
@@ -262,8 +258,6 @@ public class AsyncIOGroup extends AsyncGroup {
conn.connect(address, null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
conn.setReadTimeoutSeconds(readTimeoutSeconds);
conn.setWriteTimeoutSeconds(writeTimeoutSeconds);
connCreateCounter.increment();
connLivingCounter.increment();
if (conn.sslEngine == null) {
@@ -328,11 +322,7 @@ public class AsyncIOGroup extends AsyncGroup {
}
@Override
public CompletableFuture<AsyncConnection> createUDPClient(
final SocketAddress address,
final int connectTimeoutSeconds,
final int readTimeoutSeconds,
final int writeTimeoutSeconds) {
public CompletableFuture<AsyncConnection> createUDPClient(SocketAddress address, int connectTimeoutSeconds) {
AsyncNioUdpConnection conn;
try {
conn = newUDPClientConnection(address);

View File

@@ -12,9 +12,12 @@ import java.util.concurrent.*;
/**
* 详情见: https://redkale.org
*
* @deprecated
*
* @author zhangjx
* @since 2.1.0
*/
@Deprecated(since = "2.8.0")
class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Runnable {
private final AsyncNioConnection conn;

View File

@@ -36,12 +36,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey connectKey;
// -------------------------------- 读操作 --------------------------------------
protected final AsyncNioCompletionHandler<ByteBuffer> readTimeoutCompletionHandler =
new AsyncNioCompletionHandler<>(true, this);
// 值大于0才有效
protected int readTimeoutSeconds;
protected ByteBuffer readByteBuffer;
protected CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
@@ -49,12 +43,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey readKey;
// -------------------------------- 写操作 --------------------------------------
protected final AsyncNioCompletionHandler<Object> writeTimeoutCompletionHandler =
new AsyncNioCompletionHandler<>(false, this);
// 值大于0才有效
protected int writeTimeoutSeconds;
protected byte[] writeByteTuple1Array;
protected int writeByteTuple1Offset;
@@ -101,26 +89,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
return remoteAddress;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
// @Override
// public <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler) {
// Objects.requireNonNull(handler);
@@ -144,22 +112,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
handler.failed(new NotYetConnectedException(), null);
return;
}
if (handler != readCompletionHandler && handler != readTimeoutCompletionHandler.handler) {
if (handler != readCompletionHandler) { // 如果是Codec无需重复赋值
if (this.readPending) {
handler.failed(new ReadPendingException(), null);
return;
}
this.readPending = true;
if (this.readTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler;
newHandler.handler(
handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newHandler;
newHandler.timeoutFuture =
ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.readCompletionHandler = handler;
}
this.readCompletionHandler = handler;
} else {
this.readPending = true;
}
@@ -200,15 +159,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
return;
}
this.readPending = true;
if (this.readTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler;
newHandler.handler(
handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.readCompletionHandler = handler;
}
this.readCompletionHandler = handler;
doRead(this.ioReadThread.inCurrThread());
}
@@ -244,16 +195,28 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteTuple2Offset = bodyOffset;
this.writeByteTuple2Length = bodyLength;
this.writeAttachment = null;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
}
CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (writeByteBuffers != null) {
offerWriteBuffers(writeByteBuffers);
} else {
offerWriteBuffer(writeByteBuffer);
}
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, Void attachment) {
if (writeByteBuffers != null) {
offerWriteBuffers(writeByteBuffers);
} else {
offerWriteBuffer(writeByteBuffer);
}
handler.failed(exc, attachment);
}
};
this.writeCompletionHandler = (CompletionHandler) newHandler;
doWrite(); // 如果不是true则bodyCallback的执行可能会切换线程
}
@@ -272,14 +235,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writePending = true;
this.writeByteBuffer = src;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
this.writeCompletionHandler = (CompletionHandler) handler;
doWrite();
}
@@ -301,14 +257,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeBuffersOffset = offset;
this.writeBuffersLength = length;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
this.writeCompletionHandler = (CompletionHandler) handler;
doWrite();
}
@@ -357,9 +306,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (direct) {
if (this.readByteBuffer == null) {
this.readByteBuffer = sslEngine == null ? pollReadBuffer() : pollReadSSLBuffer();
if (this.readTimeoutSeconds > 0) {
this.readTimeoutCompletionHandler.attachment(this.readByteBuffer);
}
}
readCount = implRead(readByteBuffer);
}
@@ -465,13 +411,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteTuple2Offset = 0;
this.writeByteTuple2Length = 0;
}
if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) {
if (writeByteBuffer == null) {
this.writeTimeoutCompletionHandler.buffers(writeByteBuffers);
} else {
this.writeTimeoutCompletionHandler.buffer(writeByteBuffer);
}
}
}
int writeCount;
if (writeByteBuffer != null) {

View File

@@ -289,18 +289,14 @@ public final class Transport {
try {
if (!tcp) { // UDP
SocketAddress udpaddr = rand ? nodes[0].address : addr;
return asyncGroup.createUDPClient(udpaddr, 6, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return asyncGroup.createUDPClient(udpaddr, 6);
}
if (!rand) { // 指定地址
TransportNode node = findTransportNode(addr);
if (node == null) {
return asyncGroup.createTCPClient(addr, 6, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return asyncGroup.createTCPClient(addr, 6);
}
return pollAsync(
node,
addr,
() -> asyncGroup.createTCPClient(
addr, 6, factory.readTimeoutSeconds, factory.writeTimeoutSeconds));
return pollAsync(node, addr, () -> asyncGroup.createTCPClient(addr, 6));
}
// ---------------------随机取地址------------------------
@@ -327,11 +323,9 @@ public final class Transport {
}
}
return pollAsync(one, one.getAddress(), () -> {
return asyncGroup
.createTCPClient(one.address, 6, factory.readTimeoutSeconds, factory.writeTimeoutSeconds)
.whenComplete((c, t) -> {
one.disabletime = t == null ? 0 : System.currentTimeMillis();
});
return asyncGroup.createTCPClient(one.address, 6).whenComplete((c, t) -> {
one.disabletime = t == null ? 0 : System.currentTimeMillis();
});
});
}
return pollConnection0(nodes, null, now);
@@ -351,14 +345,12 @@ public final class Transport {
if (future.isDone()) {
return future;
}
asyncGroup
.createTCPClient(node.address, 6, factory.readTimeoutSeconds, factory.writeTimeoutSeconds)
.whenComplete((c, t) -> {
if (c != null && !future.complete(c)) {
node.connQueue.offer(c);
}
node.disabletime = t == null ? 0 : System.currentTimeMillis();
});
asyncGroup.createTCPClient(node.address, 6).whenComplete((c, t) -> {
if (c != null && !future.complete(c)) {
node.connQueue.offer(c);
}
node.disabletime = t == null ? 0 : System.currentTimeMillis();
});
}
return future;
}

View File

@@ -333,15 +333,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return connect(addr, WorkThread.currentWorkThread(), false);
}
public final CompletableFuture<C> newConnection(int readTimeout, int writeTimeout) {
return connect(getAddress(null), WorkThread.currentWorkThread(), false, readTimeout, writeTimeout);
}
// 指定地址获取连接
public final CompletableFuture<C> newConnection(SocketAddress addr, int readTimeout, int writeTimeout) {
return connect(addr, WorkThread.currentWorkThread(), false, readTimeout, writeTimeout);
}
public final CompletableFuture<C> connect() {
return connect(getAddress(null), WorkThread.currentWorkThread(), true);
}
@@ -361,18 +352,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
// 指定地址获取连接
private CompletableFuture<C> connect(
@Nonnull final SocketAddress addr, @Nullable final WorkThread workThread, final boolean pool) {
return connect(addr, workThread, pool, readTimeoutSeconds, writeTimeoutSeconds);
}
// 指定地址获取连接
private CompletableFuture<C> connect(
@Nonnull final SocketAddress addr,
@Nullable final WorkThread workThread,
final boolean pool,
final int readTimeout,
final int writeTimeout) {
private CompletableFuture<C> connect(@Nonnull SocketAddress addr, @Nullable WorkThread workThread, boolean pool) {
if (addr == null) {
return CompletableFuture.failedFuture(new NullPointerException("address is empty"));
}
@@ -384,8 +364,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (!pool || entry.connOpenState.compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(
tcp, addr, connectTimeoutSeconds, readTimeout, writeTimeout)
CompletableFuture<C> future = group.createClient(tcp, addr, connectTimeoutSeconds)
.thenApply(c ->
(C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect();
@@ -433,7 +412,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
});
} else {
int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6;
int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 3;
CompletableFuture rs = new CompletableFuture();
waitQueue.offer(rs);
return Utility.orTimeout(rs, () -> addr + " connect timeout", seconds, TimeUnit.SECONDS);

View File

@@ -81,7 +81,7 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
if (cr.isError()) {
connection.dispose(cr.cause);
return;
} else if (messageListener != null) {
} else if (cr.request == null && messageListener != null) { // listener模式的第一次请求一般需要有resputure
messageListener.onMessage(connection, cr);
respPool.accept(cr);
} else {

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.annotation.*;
import org.redkale.net.*;
import org.redkale.util.*;
@@ -119,16 +120,13 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
// respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<T> writeChannel(R request, Function<P, T> respTransfer) {
// if (client.debug) {
// client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ":
// "
// + this + ", 发送请求: " + request);
// }
request.respTransfer = respTransfer;
ClientFuture respFuture = createClientFuture(request);
int rts = this.channel.getReadTimeoutSeconds();
if (rts > 0 && !request.isCloseType()) {
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
if (client.debug) {
client.logger.log(
Level.FINEST,
Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", sendRequest: "
+ request + ", respFuture: " + respFuture);
}
respWaitingCounter.increment(); // 放在writeChannelInWriteThread计数会延迟导致不准确
writeLock.lock();
@@ -142,6 +140,14 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
} finally {
writeLock.unlock();
}
if (client.debug) {
return respFuture.whenComplete((v, t) -> {
client.logger.log(
Level.FINEST,
Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", respResult: "
+ (t != null ? t : v));
});
}
return respFuture;
}
@@ -169,22 +175,18 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
// respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<List<T>> writeChannel(R[] requests, Function<P, T> respTransfer) {
// if (client.debug) {
// client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ":
// "
// + this + ", 发送请求: " + Arrays.toString(requests) + ", readTimeoutSeconds: " +
// this.channel.getReadTimeoutSeconds());
// }
if (client.debug) {
client.logger.log(
Level.FINEST,
Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", 发送请求: "
+ Arrays.toString(requests));
}
ClientFuture[] respFutures = new ClientFuture[requests.length];
int rts = this.channel.getReadTimeoutSeconds();
int rts = this.client.getReadTimeoutSeconds();
for (int i = 0; i < respFutures.length; i++) {
R request = requests[i];
request.respTransfer = respTransfer;
ClientFuture respFuture = createClientFuture(requests[i]);
respFutures[i] = respFuture;
if (rts > 0 && !request.isCloseType()) {
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
respFutures[i] = createClientFuture(requests[i]);
}
respWaitingCounter.add(respFutures.length); // 放在writeChannelInWriteThread计数会延迟导致不准确
@@ -310,7 +312,12 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected void preComplete(P resp, R req, Throwable exc) {}
protected ClientFuture<R, P> createClientFuture(R request) {
return new ClientFuture(this, request);
ClientFuture respFuture = new ClientFuture(this, request);
int rts = this.client.getReadTimeoutSeconds();
if (rts > 0 && !request.isCloseType()) {
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
return respFuture;
}
@Override // AsyncConnection.beforeCloseListener

View File

@@ -93,9 +93,9 @@ public class ClientResponse<R extends ClientRequest, P extends ClientResult> {
@Override
public String toString() {
if (cause != null) {
return "{\"exc\":" + cause + "}";
return "{\"request\":" + request + ",\"exc\":" + cause + "}";
}
return "{\"message\":" + message + "}";
return "{\"request\":" + request + ",\"message\":" + message + "}";
}
boolean isError() {

View File

@@ -5,8 +5,6 @@
*/
package org.redkale.net.http;
import static org.redkale.net.http.HttpRequest.parseHeaderName;
import java.lang.reflect.Type;
import java.net.*;
import java.nio.*;
@@ -19,6 +17,7 @@ import org.redkale.net.*;
import org.redkale.net.client.Client;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.client.ClientConnection;
import static org.redkale.net.http.HttpRequest.parseHeaderName;
import org.redkale.util.*;
/**
@@ -315,11 +314,7 @@ public class WebClient extends Client<WebConnection, WebRequest, WebResult> {
protected CompletableFuture<HttpConnection> createConnection(String host, int port) {
return asyncGroup
.createTCPClient(
new InetSocketAddress(host, port),
connectTimeoutSeconds,
readTimeoutSeconds,
writeTimeoutSeconds)
.createTCPClient(new InetSocketAddress(host, port), connectTimeoutSeconds)
.thenApply(conn -> new HttpConnection(conn));
}