EntityCache临时屏蔽

This commit is contained in:
redkale
2023-03-28 23:18:02 +08:00
parent 49662a7d5f
commit 3bb287b3ef
3 changed files with 20 additions and 27 deletions

View File

@@ -293,7 +293,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel();
if (conn != null && conn.protocolCodec != null) {
this.responseConsumer.accept(this);
conn.readInIOThreadSafe(conn.protocolCodec);
conn.readRegisterInIOThreadSafe(conn.protocolCodec);
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
Consumer<Response> poolConsumer = this.responseConsumer;

View File

@@ -81,12 +81,12 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
}
respResults.clear();
if (buffer.hasRemaining()) {
if (buffer.hasRemaining()) { //还有响应数据包
decodeResponse(buffer);
} else { //队列都已处理完了
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
channel.readRegister(this);
}
} else { //数据不全, 继续读
connection.currRespIterator = null;
@@ -100,9 +100,9 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
if (respFuture != null) {
R request = respFuture.request;
AsyncIOThread readThread = connection.channel.getReadIOThread();
final WorkThread workThread = request == null ? readThread : request.removeWorkThread(readThread);
final WorkThread workThread = request.removeWorkThread(readThread);
try {
if (!halfCompleted && request != null && !request.isCompleted()) {
if (!halfCompleted && !request.isCompleted()) {
if (exc == null) {
connection.sendHalfWrite(request, exc);
//request没有发送完respFuture需要再次接收
@@ -125,33 +125,25 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
if (exc != null) {
if (workThread.inIO()) {
workThread.execute(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
} else {
workThread.runWork(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message);
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
if (workThread.inIO()) {
workThread.execute(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
Traces.currTraceid(request.traceid);
((ClientFuture) respFuture).complete(rs);
});
} else {
workThread.runWork(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
Traces.currTraceid(request.traceid);
((ClientFuture) respFuture).complete(rs);
});
}
@@ -159,16 +151,12 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} catch (Throwable t) {
if (workThread.inIO()) {
workThread.execute(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(t);
});
} else {
workThread.runWork(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(t);
});
}

View File

@@ -5,11 +5,11 @@
*/
package org.redkale.source;
import java.io.*;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.*;
@@ -131,7 +131,12 @@ public final class EntityCache<T> {
public void fullLoad() {
CompletableFuture<List<T>> future = fullLoadAsync();
if (future != null) {
future.join();
//future.join();
try {
future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
}