From 3d8dcc0026ac28605cf5e947bc26a6bc2fd6253c Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 19 Oct 2023 08:26:58 +0800 Subject: [PATCH] =?UTF-8?q?sncp=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/ProtocolCodec.java | 12 ++++++------ src/main/java/org/redkale/net/Response.java | 4 ++-- src/main/java/org/redkale/net/client/Client.java | 15 +++++++++++++-- .../java/org/redkale/net/sncp/SncpHeader.java | 4 ++-- .../java/org/redkale/net/sncp/SncpServlet.java | 12 ++++++++++-- 5 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 5245b2e17..8e787e814 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -83,7 +83,7 @@ class ProtocolCodec implements CompletionHandler { decode(buffer, response, 0, null); } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); - response.errorInIOCodec(t); + response.codecError(t); } } @@ -104,7 +104,7 @@ class ProtocolCodec implements CompletionHandler { decode(data, response, 0, null); } catch (Throwable t) { context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); - response.errorInIOCodec(t); + response.codecError(t); } return; } @@ -125,7 +125,7 @@ class ProtocolCodec implements CompletionHandler { decode(data, response, 0, null); } catch (Throwable t) { context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); - response.errorInIOCodec(t); + response.codecError(t); } return; } @@ -150,7 +150,7 @@ class ProtocolCodec implements CompletionHandler { if (rs != Integer.MIN_VALUE) { dispatcher.incrIllegalRequestCounter(); } - response.errorInIOCodec(null); + response.codecError(null); if (context.logger.isLoggable(Level.FINEST)) { context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); } @@ -183,7 +183,7 @@ class ProtocolCodec implements CompletionHandler { decode(buffer, pipelineResponse, pindex + 1, hreq); } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t); - pipelineResponse.errorInIOCodec(t); + pipelineResponse.codecError(t); } } } else { @@ -225,7 +225,7 @@ class ProtocolCodec implements CompletionHandler { public void failed(Throwable exc, ByteBuffer attachment) { context.dispatcher.incrIllegalRequestCounter(); channel.offerReadBuffer(attachment); - response.errorInIOCodec(exc); + response.codecError(exc); if (exc != null) { request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc); } diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index c92047127..2e4750e92 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -291,7 +291,7 @@ public abstract class Response> { } protected void defaultError(Throwable t) { - errorInIOCodec(t); + codecError(t); } /** @@ -299,7 +299,7 @@ public abstract class Response> { * * @param t Throwable */ - protected void errorInIOCodec(Throwable t) { + protected void codecError(Throwable t) { completeInIOThread(true); } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index e4fff8d76..7f9b01920 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -337,7 +337,12 @@ public abstract class Client, R extends ClientR CompletableFuture f; while ((f = waitQueue.poll()) != null) { if (!f.isDone()) { - f.complete(c); + if (workThread != null) { + CompletableFuture fs = f; + workThread.execute(() -> fs.complete(c)); + } else { + f.complete(c); + } } } } @@ -374,6 +379,7 @@ public abstract class Client, R extends ClientR if (pool && ec != null && ec.isOpen()) { return CompletableFuture.completedFuture(ec); } + WorkThread workThread = WorkThread.currentWorkThread(); final Queue> waitQueue = entry.connAcquireWaitings; if (!pool || entry.connOpenState.compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) @@ -397,7 +403,12 @@ public abstract class Client, R extends ClientR CompletableFuture f; while ((f = waitQueue.poll()) != null) { if (!f.isDone()) { - f.complete(c); + if (workThread != null) { + CompletableFuture fs = f; + workThread.execute(() -> fs.complete(c)); + } else { + f.complete(c); + } } } } diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index 46f735343..89dda5bc1 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -23,14 +23,14 @@ public class SncpHeader { private Uint128 serviceid; - private String serviceName; + String serviceName; //sncp协议版本 private int sncpVersion; private Uint128 actionid; - private String methodName; + String methodName; //SncpRequest的值是clientSncpAddress,SncpResponse的值是serverSncpAddress private byte[] addrBytes; diff --git a/src/main/java/org/redkale/net/sncp/SncpServlet.java b/src/main/java/org/redkale/net/sncp/SncpServlet.java index dbe0a6944..aaf449b47 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpServlet.java @@ -12,8 +12,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.logging.Level; import org.redkale.annotation.NonBlocking; -import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import org.redkale.asm.*; +import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.*; @@ -42,6 +42,10 @@ public class SncpServlet extends Servlet private final HashMap actions = new HashMap<>(); private SncpServlet(String resourceName, Class resourceType, Service service, Uint128 serviceid) { + Objects.requireNonNull(resourceName); + Objects.requireNonNull(resourceType); + Objects.requireNonNull(service); + Objects.requireNonNull(serviceid); this.resourceName = resourceName; this.resourceType = resourceType; this.service = service; @@ -70,12 +74,15 @@ public class SncpServlet extends Servlet @Override @SuppressWarnings("unchecked") public void execute(SncpRequest request, SncpResponse response) throws IOException { - final SncpActionServlet action = actions.get(request.getHeader().getActionid()); + SncpHeader reqHeader = request.getHeader(); + final SncpActionServlet action = actions.get(reqHeader.getActionid()); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid } else { try { + reqHeader.serviceName = action.resourceType.getName(); + reqHeader.methodName = action.method.getName(); if (response.inNonBlocking()) { if (action.nonBlocking) { action.execute(request, response); @@ -198,6 +205,7 @@ public class SncpServlet extends Servlet protected SncpActionServlet(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { super(resourceName, resourceType, service, serviceid); + Objects.requireNonNull(method); this.actionid = actionid; this.method = method;