优化DeParser

This commit is contained in:
redkale
2023-09-09 22:39:43 +08:00
parent 30213e9b0b
commit 42bf6ec73b
2 changed files with 369 additions and 369 deletions

View File

@@ -1,250 +1,250 @@
/* /*
* To change this license header, choose License Headers in Project Properties. * To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates * To change this template file, choose Tools | Templates
* and open the template in the editor. * and open the template in the editor.
*/ */
package org.redkale.net.client; package org.redkale.net.client;
import java.io.Serializable; import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
* 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行 * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
* @since 2.3.0 * @since 2.3.0
* @param <R> ClientRequest * @param <R> ClientRequest
* @param <P> 响应对象 * @param <P> 响应对象
*/ */
public abstract class ClientCodec<R extends ClientRequest, P> implements CompletionHandler<Integer, ByteBuffer> { public abstract class ClientCodec<R extends ClientRequest, P> implements CompletionHandler<Integer, ByteBuffer> {
private final List<ClientResponse<R, P>> respResults = new ArrayList<>(); private final List<ClientResponse<R, P>> respResults = new ArrayList<>();
private final ByteArray readArray = new ByteArray(); private final ByteArray readArray = new ByteArray();
private final ObjectPool<ClientResponse<R, P>> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); private final ObjectPool<ClientResponse<R, P>> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
protected final ClientConnection<R, P> connection; protected final ClientConnection<R, P> connection;
protected ClientMessageListener messageListener; protected ClientMessageListener messageListener;
public ClientCodec(ClientConnection<R, P> connection) { public ClientCodec(ClientConnection<R, P> connection) {
Objects.requireNonNull(connection); Objects.requireNonNull(connection);
this.connection = connection; this.connection = connection;
} }
public abstract void decodeMessages(ByteBuffer buffer, ByteArray array); public abstract void decodeMessages(ByteBuffer buffer, ByteArray array);
public ClientCodec<R, P> withMessageListener(ClientMessageListener listener) { public ClientCodec<R, P> withMessageListener(ClientMessageListener listener) {
this.messageListener = listener; this.messageListener = listener;
return this; return this;
} }
@Override @Override
public final void completed(Integer count, ByteBuffer attachment) { public final void completed(Integer count, ByteBuffer attachment) {
AsyncConnection channel = connection.channel; AsyncConnection channel = connection.channel;
if (count < 1) { if (count < 1) {
channel.setReadBuffer(attachment); channel.setReadBuffer(attachment);
connection.dispose(new NonReadableChannelException()); connection.dispose(new NonReadableChannelException());
return; return;
} }
try { try {
attachment.flip(); attachment.flip();
decodeResponse(attachment); decodeResponse(attachment);
} catch (Throwable e) { } catch (Throwable e) {
channel.setReadBuffer(attachment); channel.setReadBuffer(attachment);
connection.dispose(e); connection.dispose(e);
} }
} }
private void decodeResponse(ByteBuffer buffer) { private void decodeResponse(ByteBuffer buffer) {
AsyncConnection channel = connection.channel; AsyncConnection channel = connection.channel;
connection.currRespIterator = null; connection.currRespIterator = null;
decodeMessages(buffer, readArray); decodeMessages(buffer, readArray);
if (!respResults.isEmpty()) { //存在解析结果 if (!respResults.isEmpty()) { //存在解析结果
connection.currRespIterator = null; connection.currRespIterator = null;
readArray.clear(); readArray.clear();
for (ClientResponse<R, P> cr : respResults) { for (ClientResponse<R, P> cr : respResults) {
connection.doneResponseCounter.increment(); connection.doneResponseCounter.increment();
if (cr.isError()) { if (cr.isError()) {
connection.dispose(cr.cause); connection.dispose(cr.cause);
return; return;
} else if (messageListener != null) { } else if (messageListener != null) {
messageListener.onMessage(connection, cr); messageListener.onMessage(connection, cr);
respPool.accept(cr); respPool.accept(cr);
} else { } else {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid()); ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
if (respFuture != null) { if (respFuture != null) {
if (respFuture.request != cr.request) { if (respFuture.request != cr.request) {
connection.dispose(new RedkaleException("request pipeline error")); connection.dispose(new RedkaleException("request pipeline error"));
return; return;
} }
responseComplete(false, respFuture, cr.message, cr.cause); responseComplete(false, respFuture, cr.message, cr.cause);
} }
respPool.accept(cr); respPool.accept(cr);
} }
} }
respResults.clear(); respResults.clear();
if (buffer.hasRemaining()) { //还有响应数据包 if (buffer.hasRemaining()) { //还有响应数据包
decodeResponse(buffer); decodeResponse(buffer);
} else { //队列都已处理完了 } else { //队列都已处理完了
buffer.clear(); buffer.clear();
channel.setReadBuffer(buffer); channel.setReadBuffer(buffer);
channel.readRegister(this); channel.readRegister(this);
} }
} else { //数据不全, 继续读 } else { //数据不全, 继续读
connection.currRespIterator = null; connection.currRespIterator = null;
buffer.clear(); buffer.clear();
channel.setReadBuffer(buffer); channel.setReadBuffer(buffer);
channel.read(this); channel.read(this);
} }
} }
void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) { void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) {
R request = respFuture.request; R request = respFuture.request;
AsyncIOThread readThread = connection.channel.getReadIOThread(); AsyncIOThread readThread = connection.channel.getReadIOThread();
final WorkThread workThread = request.workThread; final WorkThread workThread = request.workThread;
try { try {
if (!halfCompleted && !request.isCompleted()) { if (!halfCompleted && !request.isCompleted()) {
if (exc == null) { if (exc == null) {
connection.sendHalfWriteInReadThread(request, exc); connection.sendHalfWriteInReadThread(request, exc);
//request没有发送完respFuture需要再次接收 //request没有发送完respFuture需要再次接收
return; return;
} else { } else {
connection.sendHalfWriteInReadThread(request, exc); connection.sendHalfWriteInReadThread(request, exc);
//异常了需要清掉半包 //异常了需要清掉半包
} }
} }
connection.respWaitingCounter.decrement(); connection.respWaitingCounter.decrement();
if (connection.isAuthenticated()) { if (connection.isAuthenticated()) {
connection.client.incrRespDoneCounter(); connection.client.incrRespDoneCounter();
} }
respFuture.cancelTimeout(); respFuture.cancelTimeout();
// if (connection.client.debug) { // if (connection.client.debug) {
// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause); // connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause);
// } // }
connection.preComplete(message, (R) request, exc); connection.preComplete(message, (R) request, exc);
if (exc == null) { if (exc == null) {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread == null) { if (workThread == null) {
readThread.runWork(() -> { readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs); respFuture.complete(rs);
}); });
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) { if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs); respFuture.complete(rs);
} else { } else {
workThread.execute(() -> { workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs); respFuture.complete(rs);
}); });
} }
} else { } else {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs); respFuture.complete(rs);
}); });
} }
} else { //异常 } else { //异常
if (workThread == null) { if (workThread == null) {
readThread.runWork(() -> { readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
}); });
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) { if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
} else { } else {
workThread.execute(() -> { workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
}); });
} }
} else { } else {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
}); });
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
if (workThread == null) { if (workThread == null) {
readThread.runWork(() -> { readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);
}); });
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) { if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);
} else { } else {
workThread.execute(() -> { workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);
}); });
} }
} else { } else {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);
}); });
} }
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
} }
} }
@Override @Override
public final void failed(Throwable t, ByteBuffer attachment) { public final void failed(Throwable t, ByteBuffer attachment) {
connection.dispose(t); connection.dispose(t);
} }
public ClientMessageListener getMessageListener() { public ClientMessageListener getMessageListener() {
return messageListener; return messageListener;
} }
protected R nextRequest() { protected R nextRequest() {
return connection.findRequest(null); return connection.findRequest(null);
} }
protected R findRequest(Serializable requestid) { protected R findRequest(Serializable requestid) {
return connection.findRequest(requestid); return connection.findRequest(requestid);
} }
protected ClientResponse<R, P> getLastMessage() { protected ClientResponse<R, P> getLastMessage() {
List<ClientResponse<R, P>> results = this.respResults; List<ClientResponse<R, P>> results = this.respResults;
int size = results.size(); int size = results.size();
return size == 0 ? null : results.get(size - 1); return size == 0 ? null : results.get(size - 1);
} }
public void addMessage(R request, P result) { public void addMessage(R request, P result) {
this.respResults.add(respPool.get().set(request, result)); this.respResults.add(respPool.get().success(request, result));
} }
public void addMessage(R request, Throwable exc) { public void addMessage(R request, Throwable exc) {
this.respResults.add(respPool.get().set(request, exc)); this.respResults.add(respPool.get().fail(request, exc));
} }
public void occurError(R request, Throwable exc) { public void occurError(R request, Throwable exc) {
this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc)); this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc));
} }
@Override @Override
public String toString() { public String toString() {
return JsonConvert.root().convertTo(this); return JsonConvert.root().convertTo(this);
} }
} }

View File

@@ -1,119 +1,119 @@
/* /*
* To change this license header, choose License Headers in Project Properties. * To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates * To change this template file, choose Tools | Templates
* and open the template in the editor. * and open the template in the editor.
*/ */
package org.redkale.net.client; package org.redkale.net.client;
import java.io.Serializable; import java.io.Serializable;
/** /**
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
* *
* @since 2.3.0 * @since 2.3.0
* *
* @param <R> 请求对象 * @param <R> 请求对象
* @param <P> message * @param <P> message
*/ */
public class ClientResponse<R extends ClientRequest, P> { public class ClientResponse<R extends ClientRequest, P> {
protected R request; //服务端返回一个不存在的requestid可能为null protected R request; //服务端返回一个不存在的requestid可能为null
protected P message; protected P message;
protected Throwable cause; protected Throwable cause;
public ClientResponse() { public ClientResponse() {
} }
public ClientResponse(R request, P message) { public ClientResponse(R request, P message) {
this.request = request; this.request = request;
this.message = message; this.message = message;
} }
public ClientResponse(R request, Throwable exc) { public ClientResponse(R request, Throwable exc) {
this.request = request; this.request = request;
this.cause = exc; this.cause = exc;
} }
public Serializable getRequestid() { public Serializable getRequestid() {
return request == null ? null : request.getRequestid(); return request == null ? null : request.getRequestid();
} }
public ClientResponse<R, P> set(R request, P message) { public ClientResponse<R, P> success(R request, P message) {
this.request = request; this.request = request;
this.message = message; this.message = message;
return this; return this;
} }
public ClientResponse<R, P> set(R request, Throwable exc) { public ClientResponse<R, P> fail(R request, Throwable exc) {
this.request = request; this.request = request;
this.cause = exc; this.cause = exc;
return this; return this;
} }
protected void prepare() { protected void prepare() {
this.request = null; this.request = null;
this.message = null; this.message = null;
this.cause = null; this.cause = null;
} }
protected boolean recycle() { protected boolean recycle() {
this.request = null; this.request = null;
this.message = null; this.message = null;
this.cause = null; this.cause = null;
return true; return true;
} }
public R getRequest() { public R getRequest() {
return request; return request;
} }
public void setRequest(R request) { public void setRequest(R request) {
this.request = request; this.request = request;
} }
public P getMessage() { public P getMessage() {
return message; return message;
} }
public void setMessage(P message) { public void setMessage(P message) {
this.message = message; this.message = message;
} }
public Throwable getCause() { public Throwable getCause() {
return cause; return cause;
} }
public void setCause(Throwable cause) { public void setCause(Throwable cause) {
this.cause = cause; this.cause = cause;
} }
@Override @Override
public String toString() { public String toString() {
if (cause != null) { if (cause != null) {
return "{\"exc\":" + cause + "}"; return "{\"exc\":" + cause + "}";
} }
return "{\"message\":" + message + "}"; return "{\"message\":" + message + "}";
} }
boolean isError() { boolean isError() {
return false; return false;
} }
static class ClientErrorResponse<R extends ClientRequest, P> extends ClientResponse<R, P> { static class ClientErrorResponse<R extends ClientRequest, P> extends ClientResponse<R, P> {
public ClientErrorResponse(R request, Throwable exc) { public ClientErrorResponse(R request, Throwable exc) {
super(request, exc); super(request, exc);
} }
@Override @Override
boolean isError() { boolean isError() {
return true; return true;
} }
} }
} }