From 42bf6ec73b19663aec2b0be1552c10a4af8150f4 Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 9 Sep 2023 22:39:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96DeParser?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/client/ClientCodec.java | 500 +++++++++--------- .../redkale/net/client/ClientResponse.java | 238 ++++----- 2 files changed, 369 insertions(+), 369 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 2fc68e45f..59efe05a3 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -1,250 +1,250 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.client; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.*; -import java.util.logging.Level; -import org.redkale.convert.json.JsonConvert; -import org.redkale.net.*; -import org.redkale.util.*; - -/** - * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * @since 2.3.0 - * @param ClientRequest - * @param

响应对象 - */ -public abstract class ClientCodec implements CompletionHandler { - - private final List> respResults = new ArrayList<>(); - - private final ByteArray readArray = new ByteArray(); - - private final ObjectPool> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); - - protected final ClientConnection connection; - - protected ClientMessageListener messageListener; - - public ClientCodec(ClientConnection connection) { - Objects.requireNonNull(connection); - this.connection = connection; - } - - public abstract void decodeMessages(ByteBuffer buffer, ByteArray array); - - public ClientCodec withMessageListener(ClientMessageListener listener) { - this.messageListener = listener; - return this; - } - - @Override - public final void completed(Integer count, ByteBuffer attachment) { - AsyncConnection channel = connection.channel; - if (count < 1) { - channel.setReadBuffer(attachment); - connection.dispose(new NonReadableChannelException()); - return; - } - try { - attachment.flip(); - decodeResponse(attachment); - } catch (Throwable e) { - channel.setReadBuffer(attachment); - connection.dispose(e); - } - } - - private void decodeResponse(ByteBuffer buffer) { - AsyncConnection channel = connection.channel; - connection.currRespIterator = null; - decodeMessages(buffer, readArray); - if (!respResults.isEmpty()) { //存在解析结果 - connection.currRespIterator = null; - readArray.clear(); - for (ClientResponse cr : respResults) { - connection.doneResponseCounter.increment(); - if (cr.isError()) { - connection.dispose(cr.cause); - return; - } else if (messageListener != null) { - messageListener.onMessage(connection, cr); - respPool.accept(cr); - } else { - ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); - if (respFuture != null) { - if (respFuture.request != cr.request) { - connection.dispose(new RedkaleException("request pipeline error")); - return; - } - responseComplete(false, respFuture, cr.message, cr.cause); - } - respPool.accept(cr); - } - } - respResults.clear(); - - if (buffer.hasRemaining()) { //还有响应数据包 - decodeResponse(buffer); - } else { //队列都已处理完了 - buffer.clear(); - channel.setReadBuffer(buffer); - channel.readRegister(this); - } - } else { //数据不全, 继续读 - connection.currRespIterator = null; - buffer.clear(); - channel.setReadBuffer(buffer); - channel.read(this); - } - } - - void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) { - R request = respFuture.request; - AsyncIOThread readThread = connection.channel.getReadIOThread(); - final WorkThread workThread = request.workThread; - try { - if (!halfCompleted && !request.isCompleted()) { - if (exc == null) { - connection.sendHalfWriteInReadThread(request, exc); - //request没有发送完,respFuture需要再次接收 - return; - } else { - connection.sendHalfWriteInReadThread(request, exc); - //异常了需要清掉半包 - } - } - connection.respWaitingCounter.decrement(); - if (connection.isAuthenticated()) { - connection.client.incrRespDoneCounter(); - } - respFuture.cancelTimeout(); -// if (connection.client.debug) { -// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause); -// } - connection.preComplete(message, (R) request, exc); - - if (exc == null) { - final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); - if (workThread == null) { - readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - } else { - workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - }); - } - } else { - workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - }); - } - } else { //异常 - if (workThread == null) { - readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - } else { - workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } - } else { - workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } - } - } catch (Throwable t) { - if (workThread == null) { - readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); - }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); - } else { - workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); - }); - } - } else { - workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); - }); - } - connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); - } - } - - @Override - public final void failed(Throwable t, ByteBuffer attachment) { - connection.dispose(t); - } - - public ClientMessageListener getMessageListener() { - return messageListener; - } - - protected R nextRequest() { - return connection.findRequest(null); - } - - protected R findRequest(Serializable requestid) { - return connection.findRequest(requestid); - } - - protected ClientResponse getLastMessage() { - List> results = this.respResults; - int size = results.size(); - return size == 0 ? null : results.get(size - 1); - } - - public void addMessage(R request, P result) { - this.respResults.add(respPool.get().set(request, result)); - } - - public void addMessage(R request, Throwable exc) { - this.respResults.add(respPool.get().set(request, exc)); - } - - public void occurError(R request, Throwable exc) { - this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc)); - } - - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } - -} +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net.client; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.*; +import java.util.logging.Level; +import org.redkale.convert.json.JsonConvert; +import org.redkale.net.*; +import org.redkale.util.*; + +/** + * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @since 2.3.0 + * @param ClientRequest + * @param

响应对象 + */ +public abstract class ClientCodec implements CompletionHandler { + + private final List> respResults = new ArrayList<>(); + + private final ByteArray readArray = new ByteArray(); + + private final ObjectPool> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); + + protected final ClientConnection connection; + + protected ClientMessageListener messageListener; + + public ClientCodec(ClientConnection connection) { + Objects.requireNonNull(connection); + this.connection = connection; + } + + public abstract void decodeMessages(ByteBuffer buffer, ByteArray array); + + public ClientCodec withMessageListener(ClientMessageListener listener) { + this.messageListener = listener; + return this; + } + + @Override + public final void completed(Integer count, ByteBuffer attachment) { + AsyncConnection channel = connection.channel; + if (count < 1) { + channel.setReadBuffer(attachment); + connection.dispose(new NonReadableChannelException()); + return; + } + try { + attachment.flip(); + decodeResponse(attachment); + } catch (Throwable e) { + channel.setReadBuffer(attachment); + connection.dispose(e); + } + } + + private void decodeResponse(ByteBuffer buffer) { + AsyncConnection channel = connection.channel; + connection.currRespIterator = null; + decodeMessages(buffer, readArray); + if (!respResults.isEmpty()) { //存在解析结果 + connection.currRespIterator = null; + readArray.clear(); + for (ClientResponse cr : respResults) { + connection.doneResponseCounter.increment(); + if (cr.isError()) { + connection.dispose(cr.cause); + return; + } else if (messageListener != null) { + messageListener.onMessage(connection, cr); + respPool.accept(cr); + } else { + ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); + if (respFuture != null) { + if (respFuture.request != cr.request) { + connection.dispose(new RedkaleException("request pipeline error")); + return; + } + responseComplete(false, respFuture, cr.message, cr.cause); + } + respPool.accept(cr); + } + } + respResults.clear(); + + if (buffer.hasRemaining()) { //还有响应数据包 + decodeResponse(buffer); + } else { //队列都已处理完了 + buffer.clear(); + channel.setReadBuffer(buffer); + channel.readRegister(this); + } + } else { //数据不全, 继续读 + connection.currRespIterator = null; + buffer.clear(); + channel.setReadBuffer(buffer); + channel.read(this); + } + } + + void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) { + R request = respFuture.request; + AsyncIOThread readThread = connection.channel.getReadIOThread(); + final WorkThread workThread = request.workThread; + try { + if (!halfCompleted && !request.isCompleted()) { + if (exc == null) { + connection.sendHalfWriteInReadThread(request, exc); + //request没有发送完,respFuture需要再次接收 + return; + } else { + connection.sendHalfWriteInReadThread(request, exc); + //异常了需要清掉半包 + } + } + connection.respWaitingCounter.decrement(); + if (connection.isAuthenticated()) { + connection.client.incrRespDoneCounter(); + } + respFuture.cancelTimeout(); +// if (connection.client.debug) { +// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause); +// } + connection.preComplete(message, (R) request, exc); + + if (exc == null) { + final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.complete(rs); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread.inIO()) { + Traces.computeIfAbsent(request.traceid); + respFuture.complete(rs); + } else { + workThread.execute(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.complete(rs); + }); + } + } else { + workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.complete(rs); + }); + } + } else { //异常 + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread.inIO()) { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + } else { + workThread.execute(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } + } else { + workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } + } + } catch (Throwable t) { + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread.inIO()) { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + } else { + workThread.execute(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + }); + } + } else { + workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + }); + } + connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); + } + } + + @Override + public final void failed(Throwable t, ByteBuffer attachment) { + connection.dispose(t); + } + + public ClientMessageListener getMessageListener() { + return messageListener; + } + + protected R nextRequest() { + return connection.findRequest(null); + } + + protected R findRequest(Serializable requestid) { + return connection.findRequest(requestid); + } + + protected ClientResponse getLastMessage() { + List> results = this.respResults; + int size = results.size(); + return size == 0 ? null : results.get(size - 1); + } + + public void addMessage(R request, P result) { + this.respResults.add(respPool.get().success(request, result)); + } + + public void addMessage(R request, Throwable exc) { + this.respResults.add(respPool.get().fail(request, exc)); + } + + public void occurError(R request, Throwable exc) { + this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc)); + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + +} diff --git a/src/main/java/org/redkale/net/client/ClientResponse.java b/src/main/java/org/redkale/net/client/ClientResponse.java index efab94e98..3267ad66a 100644 --- a/src/main/java/org/redkale/net/client/ClientResponse.java +++ b/src/main/java/org/redkale/net/client/ClientResponse.java @@ -1,119 +1,119 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.client; - -import java.io.Serializable; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.3.0 - * - * @param 请求对象 - * @param

message - */ -public class ClientResponse { - - protected R request; //服务端返回一个不存在的requestid,可能为null - - protected P message; - - protected Throwable cause; - - public ClientResponse() { - } - - public ClientResponse(R request, P message) { - this.request = request; - this.message = message; - } - - public ClientResponse(R request, Throwable exc) { - this.request = request; - this.cause = exc; - } - - public Serializable getRequestid() { - return request == null ? null : request.getRequestid(); - } - - public ClientResponse set(R request, P message) { - this.request = request; - this.message = message; - return this; - } - - public ClientResponse set(R request, Throwable exc) { - this.request = request; - this.cause = exc; - return this; - } - - protected void prepare() { - this.request = null; - this.message = null; - this.cause = null; - } - - protected boolean recycle() { - this.request = null; - this.message = null; - this.cause = null; - return true; - } - - public R getRequest() { - return request; - } - - public void setRequest(R request) { - this.request = request; - } - - public P getMessage() { - return message; - } - - public void setMessage(P message) { - this.message = message; - } - - public Throwable getCause() { - return cause; - } - - public void setCause(Throwable cause) { - this.cause = cause; - } - - @Override - public String toString() { - if (cause != null) { - return "{\"exc\":" + cause + "}"; - } - return "{\"message\":" + message + "}"; - } - - boolean isError() { - return false; - } - - static class ClientErrorResponse extends ClientResponse { - - public ClientErrorResponse(R request, Throwable exc) { - super(request, exc); - } - - @Override - boolean isError() { - return true; - } - } -} +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net.client; + +import java.io.Serializable; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.3.0 + * + * @param 请求对象 + * @param

message + */ +public class ClientResponse { + + protected R request; //服务端返回一个不存在的requestid,可能为null + + protected P message; + + protected Throwable cause; + + public ClientResponse() { + } + + public ClientResponse(R request, P message) { + this.request = request; + this.message = message; + } + + public ClientResponse(R request, Throwable exc) { + this.request = request; + this.cause = exc; + } + + public Serializable getRequestid() { + return request == null ? null : request.getRequestid(); + } + + public ClientResponse success(R request, P message) { + this.request = request; + this.message = message; + return this; + } + + public ClientResponse fail(R request, Throwable exc) { + this.request = request; + this.cause = exc; + return this; + } + + protected void prepare() { + this.request = null; + this.message = null; + this.cause = null; + } + + protected boolean recycle() { + this.request = null; + this.message = null; + this.cause = null; + return true; + } + + public R getRequest() { + return request; + } + + public void setRequest(R request) { + this.request = request; + } + + public P getMessage() { + return message; + } + + public void setMessage(P message) { + this.message = message; + } + + public Throwable getCause() { + return cause; + } + + public void setCause(Throwable cause) { + this.cause = cause; + } + + @Override + public String toString() { + if (cause != null) { + return "{\"exc\":" + cause + "}"; + } + return "{\"message\":" + message + "}"; + } + + boolean isError() { + return false; + } + + static class ClientErrorResponse extends ClientResponse { + + public ClientErrorResponse(R request, Throwable exc) { + super(request, exc); + } + + @Override + boolean isError() { + return true; + } + } +}