sncp优化

This commit is contained in:
redkale
2023-10-19 08:26:58 +08:00
parent 8cba160af8
commit 3d8dcc0026
5 changed files with 33 additions and 14 deletions

View File

@@ -83,7 +83,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(buffer, response, 0, null); decode(buffer, response, 0, null);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.errorInIOCodec(t); response.codecError(t);
} }
} }
@@ -104,7 +104,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(data, response, 0, null); decode(data, response, 0, null);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.errorInIOCodec(t); response.codecError(t);
} }
return; return;
} }
@@ -125,7 +125,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(data, response, 0, null); decode(data, response, 0, null);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.errorInIOCodec(t); response.codecError(t);
} }
return; return;
} }
@@ -150,7 +150,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (rs != Integer.MIN_VALUE) { if (rs != Integer.MIN_VALUE) {
dispatcher.incrIllegalRequestCounter(); dispatcher.incrIllegalRequestCounter();
} }
response.errorInIOCodec(null); response.codecError(null);
if (context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel ");
} }
@@ -183,7 +183,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(buffer, pipelineResponse, pindex + 1, hreq); decode(buffer, pipelineResponse, pindex + 1, hreq);
} catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t);
pipelineResponse.errorInIOCodec(t); pipelineResponse.codecError(t);
} }
} }
} else { } else {
@@ -225,7 +225,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
context.dispatcher.incrIllegalRequestCounter(); context.dispatcher.incrIllegalRequestCounter();
channel.offerReadBuffer(attachment); channel.offerReadBuffer(attachment);
response.errorInIOCodec(exc); response.codecError(exc);
if (exc != null) { if (exc != null) {
request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc); request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc);
} }

View File

@@ -291,7 +291,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
protected void defaultError(Throwable t) { protected void defaultError(Throwable t) {
errorInIOCodec(t); codecError(t);
} }
/** /**
@@ -299,7 +299,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
* *
* @param t Throwable * @param t Throwable
*/ */
protected void errorInIOCodec(Throwable t) { protected void codecError(Throwable t) {
completeInIOThread(true); completeInIOThread(true);
} }

View File

@@ -337,7 +337,12 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
CompletableFuture<C> f; CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) { while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) { if (!f.isDone()) {
f.complete(c); if (workThread != null) {
CompletableFuture<C> fs = f;
workThread.execute(() -> fs.complete(c));
} else {
f.complete(c);
}
} }
} }
} }
@@ -374,6 +379,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (pool && ec != null && ec.isOpen()) { if (pool && ec != null && ec.isOpen()) {
return CompletableFuture.completedFuture(ec); return CompletableFuture.completedFuture(ec);
} }
WorkThread workThread = WorkThread.currentWorkThread();
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings; final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (!pool || entry.connOpenState.compareAndSet(false, true)) { if (!pool || entry.connOpenState.compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds)
@@ -397,7 +403,12 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
CompletableFuture<C> f; CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) { while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) { if (!f.isDone()) {
f.complete(c); if (workThread != null) {
CompletableFuture<C> fs = f;
workThread.execute(() -> fs.complete(c));
} else {
f.complete(c);
}
} }
} }
} }

View File

@@ -23,14 +23,14 @@ public class SncpHeader {
private Uint128 serviceid; private Uint128 serviceid;
private String serviceName; String serviceName;
//sncp协议版本 //sncp协议版本
private int sncpVersion; private int sncpVersion;
private Uint128 actionid; private Uint128 actionid;
private String methodName; String methodName;
//SncpRequest的值是clientSncpAddressSncpResponse的值是serverSncpAddress //SncpRequest的值是clientSncpAddressSncpResponse的值是serverSncpAddress
private byte[] addrBytes; private byte[] addrBytes;

View File

@@ -12,8 +12,8 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.annotation.NonBlocking; import org.redkale.annotation.NonBlocking;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*; import org.redkale.asm.*;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import static org.redkale.asm.Opcodes.*; import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type; import org.redkale.asm.Type;
import org.redkale.convert.*; import org.redkale.convert.*;
@@ -42,6 +42,10 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
private final HashMap<Uint128, SncpActionServlet> actions = new HashMap<>(); private final HashMap<Uint128, SncpActionServlet> actions = new HashMap<>();
private SncpServlet(String resourceName, Class resourceType, Service service, Uint128 serviceid) { private SncpServlet(String resourceName, Class resourceType, Service service, Uint128 serviceid) {
Objects.requireNonNull(resourceName);
Objects.requireNonNull(resourceType);
Objects.requireNonNull(service);
Objects.requireNonNull(serviceid);
this.resourceName = resourceName; this.resourceName = resourceName;
this.resourceType = resourceType; this.resourceType = resourceType;
this.service = service; this.service = service;
@@ -70,12 +74,15 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException { public void execute(SncpRequest request, SncpResponse response) throws IOException {
final SncpActionServlet action = actions.get(request.getHeader().getActionid()); SncpHeader reqHeader = request.getHeader();
final SncpActionServlet action = actions.get(reqHeader.getActionid());
//logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));
if (action == null) { if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else { } else {
try { try {
reqHeader.serviceName = action.resourceType.getName();
reqHeader.methodName = action.method.getName();
if (response.inNonBlocking()) { if (response.inNonBlocking()) {
if (action.nonBlocking) { if (action.nonBlocking) {
action.execute(request, response); action.execute(request, response);
@@ -198,6 +205,7 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
protected SncpActionServlet(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { protected SncpActionServlet(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
super(resourceName, resourceType, service, serviceid); super(resourceName, resourceType, service, serviceid);
Objects.requireNonNull(method);
this.actionid = actionid; this.actionid = actionid;
this.method = method; this.method = method;