diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 0b79e1752..0af4b16fc 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -225,7 +225,7 @@ public abstract class Client, R extends ClientR return connect().thenCompose(conn -> writeChannel(conn, request)); } - public final CompletableFuture sendAsync(R request, Function respTransfer) { + public final CompletableFuture sendAsync(R request, BiFunction respTransfer) { if (request.workThread == null) { request.workThread = WorkThread.currWorkThread(); } @@ -239,7 +239,7 @@ public abstract class Client, R extends ClientR return connect(addr).thenCompose(conn -> writeChannel(conn, request)); } - public final CompletableFuture sendAsync(SocketAddress addr, R request, Function respTransfer) { + public final CompletableFuture sendAsync(SocketAddress addr, R request, BiFunction respTransfer) { if (request.workThread == null) { request.workThread = WorkThread.currWorkThread(); } @@ -250,18 +250,10 @@ public abstract class Client, R extends ClientR return conn.writeChannel(request); } - protected CompletableFuture writeChannel(ClientConnection conn, R request, Function respTransfer) { + protected CompletableFuture writeChannel(ClientConnection conn, R request, BiFunction respTransfer) { return conn.writeChannel(request, respTransfer); } - //是否采用ThreadLocal连接池模式 - //支持ThreadLocal连接池模式的最基本要求: - // 1) 只能调用connect()获取连接,不能调用connect(SocketAddress addr) - // 2) request必须一次性输出,不能出现写入request后request.isCompleted()=false的情况 - protected boolean isThreadLocalConnMode() { - return false; - } - private C createConnection(int index, AsyncConnection channel) { C conn = createClientConnection(index, channel); if (!channel.isReadPending()) { diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index cc412f373..1fdfba0f0 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -135,7 +135,7 @@ public abstract class ClientCodec implements Complet }); } } else { - final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); + final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(connection, message); if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { workThread.execute(() -> { Traces.currTraceid(request.traceid); @@ -178,6 +178,12 @@ public abstract class ClientCodec implements Complet return connection.findRequest(requestid); } + protected ClientResponse getLastMessage() { + List> results = this.respResults; + int size = results.size(); + return size == 0 ? null : results.get(size - 1); + } + public void addMessage(R request, P result) { this.respResults.add(respPool.get().set(request, result)); } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index bb2225979..164bb4f1e 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -106,7 +106,7 @@ public abstract class ClientConnection implements Co } //respTransfer只会在ClientCodec的读线程里调用 - protected final CompletableFuture writeChannel(R request, Function respTransfer) { + protected final CompletableFuture writeChannel(R request, BiFunction, P, T> respTransfer) { request.respTransfer = respTransfer; ClientFuture respFuture = createClientFuture(request); int rts = this.channel.getReadTimeoutSeconds(); diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 4b3289f42..b41001b49 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -29,8 +29,6 @@ public class ClientFuture extends CompletableFuture< private ScheduledFuture timeout; - Boolean resumeHalfRequestFlag; - ClientFuture(ClientConnection conn, R request) { super(); Objects.requireNonNull(conn); diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 5dd40a0ae..760030b67 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -6,7 +6,7 @@ package org.redkale.net.client; import java.io.Serializable; -import java.util.function.Function; +import java.util.function.BiFunction; import org.redkale.net.WorkThread; import org.redkale.util.*; @@ -28,7 +28,7 @@ public abstract class ClientRequest { protected String traceid; //只会在ClientCodec的读线程里调用 - Function respTransfer; + BiFunction respTransfer; public abstract void writeTo(ClientConnection conn, ByteArray array); diff --git a/src/main/java/org/redkale/source/AbstractDataSqlSource.java b/src/main/java/org/redkale/source/AbstractDataSqlSource.java index f6b040b60..c358fd72a 100644 --- a/src/main/java/org/redkale/source/AbstractDataSqlSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSqlSource.java @@ -230,11 +230,13 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement } protected void slowLog(long startTime, String... sqls) { - long cost = System.currentTimeMillis() - startTime; - if (slowmsError > 0 && cost > slowmsError) { - logger.log(Level.SEVERE, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls)); - } else if (slowmsWarn > 0 && cost > slowmsWarn) { - logger.log(Level.WARNING, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') very slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls)); + if (slowmsError > 0 || slowmsWarn > 0) { + long cost = System.currentTimeMillis() - startTime; + if (slowmsError > 0 && cost > slowmsError) { + logger.log(Level.SEVERE, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') very slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls)); + } else if (slowmsWarn > 0 && cost > slowmsWarn) { + logger.log(Level.WARNING, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls)); + } } }