client优化
This commit is contained in:
@@ -225,7 +225,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return connect().thenCompose(conn -> writeChannel(conn, request));
|
return connect().thenCompose(conn -> writeChannel(conn, request));
|
||||||
}
|
}
|
||||||
|
|
||||||
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
|
public final <T> CompletableFuture<T> sendAsync(R request, BiFunction<C, P, T> respTransfer) {
|
||||||
if (request.workThread == null) {
|
if (request.workThread == null) {
|
||||||
request.workThread = WorkThread.currWorkThread();
|
request.workThread = WorkThread.currWorkThread();
|
||||||
}
|
}
|
||||||
@@ -239,7 +239,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return connect(addr).thenCompose(conn -> writeChannel(conn, request));
|
return connect(addr).thenCompose(conn -> writeChannel(conn, request));
|
||||||
}
|
}
|
||||||
|
|
||||||
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
|
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, BiFunction<C, P, T> respTransfer) {
|
||||||
if (request.workThread == null) {
|
if (request.workThread == null) {
|
||||||
request.workThread = WorkThread.currWorkThread();
|
request.workThread = WorkThread.currWorkThread();
|
||||||
}
|
}
|
||||||
@@ -250,18 +250,10 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return conn.writeChannel(request);
|
return conn.writeChannel(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, R request, Function<P, T> respTransfer) {
|
protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, R request, BiFunction<C, P, T> respTransfer) {
|
||||||
return conn.writeChannel(request, respTransfer);
|
return conn.writeChannel(request, respTransfer);
|
||||||
}
|
}
|
||||||
|
|
||||||
//是否采用ThreadLocal连接池模式
|
|
||||||
//支持ThreadLocal连接池模式的最基本要求:
|
|
||||||
// 1) 只能调用connect()获取连接,不能调用connect(SocketAddress addr)
|
|
||||||
// 2) request必须一次性输出,不能出现写入request后request.isCompleted()=false的情况
|
|
||||||
protected boolean isThreadLocalConnMode() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private C createConnection(int index, AsyncConnection channel) {
|
private C createConnection(int index, AsyncConnection channel) {
|
||||||
C conn = createClientConnection(index, channel);
|
C conn = createClientConnection(index, channel);
|
||||||
if (!channel.isReadPending()) {
|
if (!channel.isReadPending()) {
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
|
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(connection, message);
|
||||||
if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) {
|
if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) {
|
||||||
workThread.execute(() -> {
|
workThread.execute(() -> {
|
||||||
Traces.currTraceid(request.traceid);
|
Traces.currTraceid(request.traceid);
|
||||||
@@ -178,6 +178,12 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
return connection.findRequest(requestid);
|
return connection.findRequest(requestid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ClientResponse<R, P> getLastMessage() {
|
||||||
|
List<ClientResponse<R, P>> results = this.respResults;
|
||||||
|
int size = results.size();
|
||||||
|
return size == 0 ? null : results.get(size - 1);
|
||||||
|
}
|
||||||
|
|
||||||
public void addMessage(R request, P result) {
|
public void addMessage(R request, P result) {
|
||||||
this.respResults.add(respPool.get().set(request, result));
|
this.respResults.add(respPool.get().set(request, result));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
|||||||
}
|
}
|
||||||
|
|
||||||
//respTransfer只会在ClientCodec的读线程里调用
|
//respTransfer只会在ClientCodec的读线程里调用
|
||||||
protected final <T> CompletableFuture<T> writeChannel(R request, Function<P, T> respTransfer) {
|
protected final <T> CompletableFuture<T> writeChannel(R request, BiFunction<? extends ClientConnection<R, P>, P, T> respTransfer) {
|
||||||
request.respTransfer = respTransfer;
|
request.respTransfer = respTransfer;
|
||||||
ClientFuture respFuture = createClientFuture(request);
|
ClientFuture respFuture = createClientFuture(request);
|
||||||
int rts = this.channel.getReadTimeoutSeconds();
|
int rts = this.channel.getReadTimeoutSeconds();
|
||||||
|
|||||||
@@ -29,8 +29,6 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
|
|||||||
|
|
||||||
private ScheduledFuture timeout;
|
private ScheduledFuture timeout;
|
||||||
|
|
||||||
Boolean resumeHalfRequestFlag;
|
|
||||||
|
|
||||||
ClientFuture(ClientConnection conn, R request) {
|
ClientFuture(ClientConnection conn, R request) {
|
||||||
super();
|
super();
|
||||||
Objects.requireNonNull(conn);
|
Objects.requireNonNull(conn);
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
package org.redkale.net.client;
|
package org.redkale.net.client;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.function.Function;
|
import java.util.function.BiFunction;
|
||||||
import org.redkale.net.WorkThread;
|
import org.redkale.net.WorkThread;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
|
|
||||||
@@ -28,7 +28,7 @@ public abstract class ClientRequest {
|
|||||||
protected String traceid;
|
protected String traceid;
|
||||||
|
|
||||||
//只会在ClientCodec的读线程里调用
|
//只会在ClientCodec的读线程里调用
|
||||||
Function respTransfer;
|
BiFunction respTransfer;
|
||||||
|
|
||||||
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
||||||
|
|
||||||
|
|||||||
@@ -230,11 +230,13 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void slowLog(long startTime, String... sqls) {
|
protected void slowLog(long startTime, String... sqls) {
|
||||||
|
if (slowmsError > 0 || slowmsWarn > 0) {
|
||||||
long cost = System.currentTimeMillis() - startTime;
|
long cost = System.currentTimeMillis() - startTime;
|
||||||
if (slowmsError > 0 && cost > slowmsError) {
|
if (slowmsError > 0 && cost > slowmsError) {
|
||||||
logger.log(Level.SEVERE, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls));
|
logger.log(Level.SEVERE, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') very slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls));
|
||||||
} else if (slowmsWarn > 0 && cost > slowmsWarn) {
|
} else if (slowmsWarn > 0 && cost > slowmsWarn) {
|
||||||
logger.log(Level.WARNING, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') very slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls));
|
logger.log(Level.WARNING, DataSource.class.getSimpleName() + "(name='" + resourceName() + "') slow sql cost " + cost + " ms, content: " + Arrays.toString(sqls));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user