diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java
index 2fc68e45f..59efe05a3 100644
--- a/src/main/java/org/redkale/net/client/ClientCodec.java
+++ b/src/main/java/org/redkale/net/client/ClientCodec.java
@@ -1,250 +1,250 @@
-/*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.redkale.net.client;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.channels.*;
-import java.util.*;
-import java.util.logging.Level;
-import org.redkale.convert.json.JsonConvert;
-import org.redkale.net.*;
-import org.redkale.util.*;
-
-/**
- * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行
- *
- *
- * 详情见: https://redkale.org
- *
- * @author zhangjx
- * @since 2.3.0
- * @param ClientRequest
- * @param 响应对象
- */
-public abstract class ClientCodec implements CompletionHandler {
-
- private final List> respResults = new ArrayList<>();
-
- private final ByteArray readArray = new ByteArray();
-
- private final ObjectPool> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
-
- protected final ClientConnection connection;
-
- protected ClientMessageListener messageListener;
-
- public ClientCodec(ClientConnection connection) {
- Objects.requireNonNull(connection);
- this.connection = connection;
- }
-
- public abstract void decodeMessages(ByteBuffer buffer, ByteArray array);
-
- public ClientCodec withMessageListener(ClientMessageListener listener) {
- this.messageListener = listener;
- return this;
- }
-
- @Override
- public final void completed(Integer count, ByteBuffer attachment) {
- AsyncConnection channel = connection.channel;
- if (count < 1) {
- channel.setReadBuffer(attachment);
- connection.dispose(new NonReadableChannelException());
- return;
- }
- try {
- attachment.flip();
- decodeResponse(attachment);
- } catch (Throwable e) {
- channel.setReadBuffer(attachment);
- connection.dispose(e);
- }
- }
-
- private void decodeResponse(ByteBuffer buffer) {
- AsyncConnection channel = connection.channel;
- connection.currRespIterator = null;
- decodeMessages(buffer, readArray);
- if (!respResults.isEmpty()) { //存在解析结果
- connection.currRespIterator = null;
- readArray.clear();
- for (ClientResponse cr : respResults) {
- connection.doneResponseCounter.increment();
- if (cr.isError()) {
- connection.dispose(cr.cause);
- return;
- } else if (messageListener != null) {
- messageListener.onMessage(connection, cr);
- respPool.accept(cr);
- } else {
- ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid());
- if (respFuture != null) {
- if (respFuture.request != cr.request) {
- connection.dispose(new RedkaleException("request pipeline error"));
- return;
- }
- responseComplete(false, respFuture, cr.message, cr.cause);
- }
- respPool.accept(cr);
- }
- }
- respResults.clear();
-
- if (buffer.hasRemaining()) { //还有响应数据包
- decodeResponse(buffer);
- } else { //队列都已处理完了
- buffer.clear();
- channel.setReadBuffer(buffer);
- channel.readRegister(this);
- }
- } else { //数据不全, 继续读
- connection.currRespIterator = null;
- buffer.clear();
- channel.setReadBuffer(buffer);
- channel.read(this);
- }
- }
-
- void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) {
- R request = respFuture.request;
- AsyncIOThread readThread = connection.channel.getReadIOThread();
- final WorkThread workThread = request.workThread;
- try {
- if (!halfCompleted && !request.isCompleted()) {
- if (exc == null) {
- connection.sendHalfWriteInReadThread(request, exc);
- //request没有发送完,respFuture需要再次接收
- return;
- } else {
- connection.sendHalfWriteInReadThread(request, exc);
- //异常了需要清掉半包
- }
- }
- connection.respWaitingCounter.decrement();
- if (connection.isAuthenticated()) {
- connection.client.incrRespDoneCounter();
- }
- respFuture.cancelTimeout();
-// if (connection.client.debug) {
-// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause);
-// }
- connection.preComplete(message, (R) request, exc);
-
- if (exc == null) {
- final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
- if (workThread == null) {
- readThread.runWork(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.complete(rs);
- });
- } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
- if (workThread.inIO()) {
- Traces.computeIfAbsent(request.traceid);
- respFuture.complete(rs);
- } else {
- workThread.execute(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.complete(rs);
- });
- }
- } else {
- workThread.runWork(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.complete(rs);
- });
- }
- } else { //异常
- if (workThread == null) {
- readThread.runWork(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(exc);
- });
- } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
- if (workThread.inIO()) {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(exc);
- } else {
- workThread.execute(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(exc);
- });
- }
- } else {
- workThread.runWork(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(exc);
- });
- }
- }
- } catch (Throwable t) {
- if (workThread == null) {
- readThread.runWork(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(t);
- });
- } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
- if (workThread.inIO()) {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(t);
- } else {
- workThread.execute(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(t);
- });
- }
- } else {
- workThread.runWork(() -> {
- Traces.computeIfAbsent(request.traceid);
- respFuture.completeExceptionally(t);
- });
- }
- connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
- }
- }
-
- @Override
- public final void failed(Throwable t, ByteBuffer attachment) {
- connection.dispose(t);
- }
-
- public ClientMessageListener getMessageListener() {
- return messageListener;
- }
-
- protected R nextRequest() {
- return connection.findRequest(null);
- }
-
- protected R findRequest(Serializable requestid) {
- return connection.findRequest(requestid);
- }
-
- protected ClientResponse getLastMessage() {
- List> results = this.respResults;
- int size = results.size();
- return size == 0 ? null : results.get(size - 1);
- }
-
- public void addMessage(R request, P result) {
- this.respResults.add(respPool.get().set(request, result));
- }
-
- public void addMessage(R request, Throwable exc) {
- this.respResults.add(respPool.get().set(request, exc));
- }
-
- public void occurError(R request, Throwable exc) {
- this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc));
- }
-
- @Override
- public String toString() {
- return JsonConvert.root().convertTo(this);
- }
-
-}
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.redkale.net.client;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.logging.Level;
+import org.redkale.convert.json.JsonConvert;
+import org.redkale.net.*;
+import org.redkale.util.*;
+
+/**
+ * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行
+ *
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ * @since 2.3.0
+ * @param ClientRequest
+ * @param 响应对象
+ */
+public abstract class ClientCodec implements CompletionHandler {
+
+ private final List> respResults = new ArrayList<>();
+
+ private final ByteArray readArray = new ByteArray();
+
+ private final ObjectPool> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
+
+ protected final ClientConnection connection;
+
+ protected ClientMessageListener messageListener;
+
+ public ClientCodec(ClientConnection connection) {
+ Objects.requireNonNull(connection);
+ this.connection = connection;
+ }
+
+ public abstract void decodeMessages(ByteBuffer buffer, ByteArray array);
+
+ public ClientCodec withMessageListener(ClientMessageListener listener) {
+ this.messageListener = listener;
+ return this;
+ }
+
+ @Override
+ public final void completed(Integer count, ByteBuffer attachment) {
+ AsyncConnection channel = connection.channel;
+ if (count < 1) {
+ channel.setReadBuffer(attachment);
+ connection.dispose(new NonReadableChannelException());
+ return;
+ }
+ try {
+ attachment.flip();
+ decodeResponse(attachment);
+ } catch (Throwable e) {
+ channel.setReadBuffer(attachment);
+ connection.dispose(e);
+ }
+ }
+
+ private void decodeResponse(ByteBuffer buffer) {
+ AsyncConnection channel = connection.channel;
+ connection.currRespIterator = null;
+ decodeMessages(buffer, readArray);
+ if (!respResults.isEmpty()) { //存在解析结果
+ connection.currRespIterator = null;
+ readArray.clear();
+ for (ClientResponse cr : respResults) {
+ connection.doneResponseCounter.increment();
+ if (cr.isError()) {
+ connection.dispose(cr.cause);
+ return;
+ } else if (messageListener != null) {
+ messageListener.onMessage(connection, cr);
+ respPool.accept(cr);
+ } else {
+ ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid());
+ if (respFuture != null) {
+ if (respFuture.request != cr.request) {
+ connection.dispose(new RedkaleException("request pipeline error"));
+ return;
+ }
+ responseComplete(false, respFuture, cr.message, cr.cause);
+ }
+ respPool.accept(cr);
+ }
+ }
+ respResults.clear();
+
+ if (buffer.hasRemaining()) { //还有响应数据包
+ decodeResponse(buffer);
+ } else { //队列都已处理完了
+ buffer.clear();
+ channel.setReadBuffer(buffer);
+ channel.readRegister(this);
+ }
+ } else { //数据不全, 继续读
+ connection.currRespIterator = null;
+ buffer.clear();
+ channel.setReadBuffer(buffer);
+ channel.read(this);
+ }
+ }
+
+ void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) {
+ R request = respFuture.request;
+ AsyncIOThread readThread = connection.channel.getReadIOThread();
+ final WorkThread workThread = request.workThread;
+ try {
+ if (!halfCompleted && !request.isCompleted()) {
+ if (exc == null) {
+ connection.sendHalfWriteInReadThread(request, exc);
+ //request没有发送完,respFuture需要再次接收
+ return;
+ } else {
+ connection.sendHalfWriteInReadThread(request, exc);
+ //异常了需要清掉半包
+ }
+ }
+ connection.respWaitingCounter.decrement();
+ if (connection.isAuthenticated()) {
+ connection.client.incrRespDoneCounter();
+ }
+ respFuture.cancelTimeout();
+// if (connection.client.debug) {
+// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause);
+// }
+ connection.preComplete(message, (R) request, exc);
+
+ if (exc == null) {
+ final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
+ if (workThread == null) {
+ readThread.runWork(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.complete(rs);
+ });
+ } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
+ if (workThread.inIO()) {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.complete(rs);
+ } else {
+ workThread.execute(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.complete(rs);
+ });
+ }
+ } else {
+ workThread.runWork(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.complete(rs);
+ });
+ }
+ } else { //异常
+ if (workThread == null) {
+ readThread.runWork(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(exc);
+ });
+ } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
+ if (workThread.inIO()) {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(exc);
+ } else {
+ workThread.execute(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(exc);
+ });
+ }
+ } else {
+ workThread.runWork(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(exc);
+ });
+ }
+ }
+ } catch (Throwable t) {
+ if (workThread == null) {
+ readThread.runWork(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(t);
+ });
+ } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
+ if (workThread.inIO()) {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(t);
+ } else {
+ workThread.execute(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(t);
+ });
+ }
+ } else {
+ workThread.runWork(() -> {
+ Traces.computeIfAbsent(request.traceid);
+ respFuture.completeExceptionally(t);
+ });
+ }
+ connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
+ }
+ }
+
+ @Override
+ public final void failed(Throwable t, ByteBuffer attachment) {
+ connection.dispose(t);
+ }
+
+ public ClientMessageListener getMessageListener() {
+ return messageListener;
+ }
+
+ protected R nextRequest() {
+ return connection.findRequest(null);
+ }
+
+ protected R findRequest(Serializable requestid) {
+ return connection.findRequest(requestid);
+ }
+
+ protected ClientResponse getLastMessage() {
+ List> results = this.respResults;
+ int size = results.size();
+ return size == 0 ? null : results.get(size - 1);
+ }
+
+ public void addMessage(R request, P result) {
+ this.respResults.add(respPool.get().success(request, result));
+ }
+
+ public void addMessage(R request, Throwable exc) {
+ this.respResults.add(respPool.get().fail(request, exc));
+ }
+
+ public void occurError(R request, Throwable exc) {
+ this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc));
+ }
+
+ @Override
+ public String toString() {
+ return JsonConvert.root().convertTo(this);
+ }
+
+}
diff --git a/src/main/java/org/redkale/net/client/ClientResponse.java b/src/main/java/org/redkale/net/client/ClientResponse.java
index efab94e98..3267ad66a 100644
--- a/src/main/java/org/redkale/net/client/ClientResponse.java
+++ b/src/main/java/org/redkale/net/client/ClientResponse.java
@@ -1,119 +1,119 @@
-/*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.redkale.net.client;
-
-import java.io.Serializable;
-
-/**
- *
- *
- * 详情见: https://redkale.org
- *
- * @author zhangjx
- *
- * @since 2.3.0
- *
- * @param 请求对象
- * @param message
- */
-public class ClientResponse {
-
- protected R request; //服务端返回一个不存在的requestid,可能为null
-
- protected P message;
-
- protected Throwable cause;
-
- public ClientResponse() {
- }
-
- public ClientResponse(R request, P message) {
- this.request = request;
- this.message = message;
- }
-
- public ClientResponse(R request, Throwable exc) {
- this.request = request;
- this.cause = exc;
- }
-
- public Serializable getRequestid() {
- return request == null ? null : request.getRequestid();
- }
-
- public ClientResponse set(R request, P message) {
- this.request = request;
- this.message = message;
- return this;
- }
-
- public ClientResponse set(R request, Throwable exc) {
- this.request = request;
- this.cause = exc;
- return this;
- }
-
- protected void prepare() {
- this.request = null;
- this.message = null;
- this.cause = null;
- }
-
- protected boolean recycle() {
- this.request = null;
- this.message = null;
- this.cause = null;
- return true;
- }
-
- public R getRequest() {
- return request;
- }
-
- public void setRequest(R request) {
- this.request = request;
- }
-
- public P getMessage() {
- return message;
- }
-
- public void setMessage(P message) {
- this.message = message;
- }
-
- public Throwable getCause() {
- return cause;
- }
-
- public void setCause(Throwable cause) {
- this.cause = cause;
- }
-
- @Override
- public String toString() {
- if (cause != null) {
- return "{\"exc\":" + cause + "}";
- }
- return "{\"message\":" + message + "}";
- }
-
- boolean isError() {
- return false;
- }
-
- static class ClientErrorResponse extends ClientResponse {
-
- public ClientErrorResponse(R request, Throwable exc) {
- super(request, exc);
- }
-
- @Override
- boolean isError() {
- return true;
- }
- }
-}
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.redkale.net.client;
+
+import java.io.Serializable;
+
+/**
+ *
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ *
+ * @since 2.3.0
+ *
+ * @param 请求对象
+ * @param message
+ */
+public class ClientResponse {
+
+ protected R request; //服务端返回一个不存在的requestid,可能为null
+
+ protected P message;
+
+ protected Throwable cause;
+
+ public ClientResponse() {
+ }
+
+ public ClientResponse(R request, P message) {
+ this.request = request;
+ this.message = message;
+ }
+
+ public ClientResponse(R request, Throwable exc) {
+ this.request = request;
+ this.cause = exc;
+ }
+
+ public Serializable getRequestid() {
+ return request == null ? null : request.getRequestid();
+ }
+
+ public ClientResponse success(R request, P message) {
+ this.request = request;
+ this.message = message;
+ return this;
+ }
+
+ public ClientResponse fail(R request, Throwable exc) {
+ this.request = request;
+ this.cause = exc;
+ return this;
+ }
+
+ protected void prepare() {
+ this.request = null;
+ this.message = null;
+ this.cause = null;
+ }
+
+ protected boolean recycle() {
+ this.request = null;
+ this.message = null;
+ this.cause = null;
+ return true;
+ }
+
+ public R getRequest() {
+ return request;
+ }
+
+ public void setRequest(R request) {
+ this.request = request;
+ }
+
+ public P getMessage() {
+ return message;
+ }
+
+ public void setMessage(P message) {
+ this.message = message;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+ @Override
+ public String toString() {
+ if (cause != null) {
+ return "{\"exc\":" + cause + "}";
+ }
+ return "{\"message\":" + message + "}";
+ }
+
+ boolean isError() {
+ return false;
+ }
+
+ static class ClientErrorResponse extends ClientResponse {
+
+ public ClientErrorResponse(R request, Throwable exc) {
+ super(request, exc);
+ }
+
+ @Override
+ boolean isError() {
+ return true;
+ }
+ }
+}