diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 4de1440ed..b7928d71b 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -293,7 +293,7 @@ public abstract class Response> { AsyncConnection conn = removeChannel(); if (conn != null && conn.protocolCodec != null) { this.responseConsumer.accept(this); - conn.readInIOThreadSafe(conn.protocolCodec); + conn.readRegisterInIOThreadSafe(conn.protocolCodec); } else { Supplier poolSupplier = this.responseSupplier; Consumer poolConsumer = this.responseConsumer; diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 5d0f94ebf..b9fcd6667 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -81,12 +81,12 @@ public abstract class ClientCodec implements Complet } respResults.clear(); - if (buffer.hasRemaining()) { + if (buffer.hasRemaining()) { //还有响应数据包 decodeResponse(buffer); } else { //队列都已处理完了 buffer.clear(); channel.setReadBuffer(buffer); - channel.read(this); + channel.readRegister(this); } } else { //数据不全, 继续读 connection.currRespIterator = null; @@ -100,9 +100,9 @@ public abstract class ClientCodec implements Complet if (respFuture != null) { R request = respFuture.request; AsyncIOThread readThread = connection.channel.getReadIOThread(); - final WorkThread workThread = request == null ? readThread : request.removeWorkThread(readThread); + final WorkThread workThread = request.removeWorkThread(readThread); try { - if (!halfCompleted && request != null && !request.isCompleted()) { + if (!halfCompleted && !request.isCompleted()) { if (exc == null) { connection.sendHalfWrite(request, exc); //request没有发送完,respFuture需要再次接收 @@ -125,33 +125,25 @@ public abstract class ClientCodec implements Complet if (exc != null) { if (workThread.inIO()) { workThread.execute(() -> { - if (request != null) { - Traces.currTraceid(request.traceid); - } + Traces.currTraceid(request.traceid); respFuture.completeExceptionally(exc); }); } else { workThread.runWork(() -> { - if (request != null) { - Traces.currTraceid(request.traceid); - } + Traces.currTraceid(request.traceid); respFuture.completeExceptionally(exc); }); } } else { - final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message); + final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); if (workThread.inIO()) { workThread.execute(() -> { - if (request != null) { - Traces.currTraceid(request.traceid); - } + Traces.currTraceid(request.traceid); ((ClientFuture) respFuture).complete(rs); }); } else { workThread.runWork(() -> { - if (request != null) { - Traces.currTraceid(request.traceid); - } + Traces.currTraceid(request.traceid); ((ClientFuture) respFuture).complete(rs); }); } @@ -159,16 +151,12 @@ public abstract class ClientCodec implements Complet } catch (Throwable t) { if (workThread.inIO()) { workThread.execute(() -> { - if (request != null) { - Traces.currTraceid(request.traceid); - } + Traces.currTraceid(request.traceid); respFuture.completeExceptionally(t); }); } else { workThread.runWork(() -> { - if (request != null) { - Traces.currTraceid(request.traceid); - } + Traces.currTraceid(request.traceid); respFuture.completeExceptionally(t); }); } diff --git a/src/main/java/org/redkale/source/EntityCache.java b/src/main/java/org/redkale/source/EntityCache.java index e92614325..b04b9541d 100644 --- a/src/main/java/org/redkale/source/EntityCache.java +++ b/src/main/java/org/redkale/source/EntityCache.java @@ -5,11 +5,11 @@ */ package org.redkale.source; -import java.io.*; +import java.io.Serializable; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; import java.util.stream.*; @@ -131,7 +131,12 @@ public final class EntityCache { public void fullLoad() { CompletableFuture> future = fullLoadAsync(); if (future != null) { - future.join(); + //future.join(); + try { + future.get(1, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } } }