优化命名

This commit is contained in:
Redkale
2023-01-01 13:02:54 +08:00
parent de089072fa
commit 391352e077
5 changed files with 37 additions and 40 deletions

View File

@@ -110,7 +110,7 @@ public class Context {
}
}
protected void executeDispatcher(Request request, Response response) {
protected void executeDispatch(Request request, Response response) {
if (workHashExecutor != null) {
workHashExecutor.execute(request.getHashid(), () -> prepare.prepare(request, response));
} else if (workExecutor != null) {

View File

@@ -169,7 +169,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
request.pipeline(pindex, pindex);
channel.setReadBuffer((ByteBuffer) buffer.clear());
}
context.executeDispatcher(request, response);
context.executeDispatch(request, response);
if (pipeline) {
final Response pipelineResponse = createResponse();
try {

View File

@@ -23,7 +23,7 @@ import org.redkale.util.ByteArray;
*/
public abstract class ClientCodec<R extends ClientRequest, P> {
protected final List<ClientResult<P>> results = new ArrayList<>();
protected final List<ClientResponse<P>> results = new ArrayList<>();
protected final ClientConnection connection;
@@ -32,17 +32,14 @@ public abstract class ClientCodec<R extends ClientRequest, P> {
}
//返回true: array会clear, 返回false: buffer会clear
public abstract boolean codecResult(ByteBuffer buffer, ByteArray array);
public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array);
protected Queue<ClientFuture> responseQueue() {
return connection.responseQueue;
}
public List<ClientResult<P>> removeResults() {
if (results.isEmpty()) {
return null;
}
List<ClientResult<P>> rs = new ArrayList<>(results);
public List<ClientResponse<P>> pollMessages() {
List<ClientResponse<P>> rs = new ArrayList<>(results);
this.results.clear();
return rs;
}
@@ -51,12 +48,12 @@ public abstract class ClientCodec<R extends ClientRequest, P> {
return connection;
}
public void addResult(P result) {
this.results.add(new ClientResult<>(result));
public void addMessage(P result) {
this.results.add(new ClientResponse<>(result));
}
public void addResult(Throwable exc) {
this.results.add(new ClientResult<>(exc));
public void addMessage(Throwable exc) {
this.results.add(new ClientResponse<>(exc));
}
public void reset() {

View File

@@ -64,7 +64,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
closeFuture = null;
return;
}
if (continueWrite(false)) {
if (sendWrite(false)) {
return;
}
writePending.compareAndSet(true, false);
@@ -95,7 +95,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.pauseWriting.set(false);
}
private boolean continueWrite(boolean must) {
private boolean sendWrite(boolean must) {
ClientConnection conn = this;
ByteArray rw = conn.writeArray;
rw.clear();
@@ -168,14 +168,14 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
try {
attachment.flip();
codecResponse(attachment);
decodeResponse(attachment);
} catch (Throwable e) {
channel.setReadBuffer(attachment);
dispose(e);
}
}
protected void completeResponse(ClientResult<P> rs, ClientFuture respFuture) {
protected void completeResponse(ClientResponse<P> rs, ClientFuture respFuture) {
if (respFuture != null) {
if (!respFuture.request.isCompleted()) {
if (rs.exc == null) {
@@ -196,8 +196,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
respFuture.timeout.cancel(true);
}
ClientRequest request = respFuture.request;
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", result=" + rs.result);
preComplete(rs.result, (R) request, rs.exc);
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
preComplete(rs.message, (R) request, rs.exc);
WorkThread workThread = null;
if (request != null) {
workThread = request.workThread;
@@ -224,13 +224,13 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
// if (request != null) {
// Traces.currTraceid(request.traceid);
// }
// respFuture.complete(rs.result);
// respFuture.complete(rs.message);
// } else {
// workThread.execute(() -> {
// if (request != null) {
// Traces.currTraceid(request.traceid);
// }
// respFuture.complete(rs.result);
// respFuture.complete(rs.message);
// });
// }
// }
@@ -249,7 +249,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.complete(rs.result);
respFuture.complete(rs.message);
});
}
} catch (Throwable t) {
@@ -258,12 +258,12 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
}
public void codecResponse(ByteBuffer buffer) {
if (codec.codecResult(buffer, readArray)) { //成功了
public void decodeResponse(ByteBuffer buffer) {
if (codec.decodeMessages(buffer, readArray)) { //成功了
readArray.clear();
List<ClientResult<P>> results = codec.removeResults();
List<ClientResponse<P>> results = codec.pollMessages();
if (results != null) {
for (ClientResult<P> rs : results) {
for (ClientResponse<P> rs : results) {
ClientFuture respFuture = responseQueue.poll();
if (respFuture != null) {
int mergeCount = respFuture.mergeCount;
@@ -281,7 +281,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
if (buffer.hasRemaining()) {
codecResponse(buffer);
decodeResponse(buffer);
} else if (responseQueue.isEmpty()) { //队列都已处理完了
buffer.clear();
channel.setReadBuffer(buffer);
@@ -293,7 +293,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} else { //还有消息需要读取
if ((!requestQueue.isEmpty() || lastHalfRequest != null) && writePending.compareAndSet(false, true)) {
//先写后读取
if (!continueWrite(true)) {
if (!sendWrite(true)) {
writePending.compareAndSet(true, false);
}
}
@@ -366,7 +366,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
client.reqWritedCounter.increment();
}
if (writePending.compareAndSet(false, true)) {
continueWrite(true);
sendWrite(true);
}
}

View File

@@ -8,28 +8,28 @@ package org.redkale.net.client;
/**
*
* @author zhangjx
* @param <P> result
* @param <P> message
*/
public class ClientResult<P> {
public class ClientResponse<P> {
protected P result;
protected P message;
protected Throwable exc;
public ClientResult(P result) {
this.result = result;
public ClientResponse(P result) {
this.message = result;
}
public ClientResult(Throwable exc) {
public ClientResponse(Throwable exc) {
this.exc = exc;
}
public P getResult() {
return result;
public P getMessage() {
return message;
}
public void setResult(P result) {
this.result = result;
public void setMessage(P message) {
this.message = message;
}
public Throwable getExc() {
@@ -45,6 +45,6 @@ public class ClientResult<P> {
if (exc != null) {
return "{\"exc\":" + exc + "}";
}
return "{\"result\":" + result + "}";
return "{\"message\":" + message + "}";
}
}