From 979a263c88c3254ca883f6a21442b268569570ab Mon Sep 17 00:00:00 2001 From: Redkale Date: Tue, 3 Jan 2023 12:08:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96DEFAULT=5FMAX=5FPIPELINES?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/DispatcherServlet.java | 25 ++++++++++++++--- .../java/org/redkale/net/ProtocolCodec.java | 28 ++++++------------- .../java/org/redkale/net/client/Client.java | 12 ++++---- .../redkale/net/client/ClientConnection.java | 5 +++- .../org/redkale/net/http/HttpRequest.java | 6 ++-- 5 files changed, 43 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/redkale/net/DispatcherServlet.java b/src/main/java/org/redkale/net/DispatcherServlet.java index 41a7cd29f..5a80aa7a2 100644 --- a/src/main/java/org/redkale/net/DispatcherServlet.java +++ b/src/main/java/org/redkale/net/DispatcherServlet.java @@ -31,11 +31,9 @@ import org.redkale.util.*; */ public abstract class DispatcherServlet, P extends Response, S extends Servlet> extends Servlet { - protected final LongAdder executeCounter = new LongAdder(); //执行请求次数 + private final LongAdder executeCounter = new LongAdder(); //执行请求次数 - protected final LongAdder illRequestCounter = new LongAdder(); //错误请求次数 - - protected Application application; + private final LongAdder illRequestCounter = new LongAdder(); //错误请求次数 private final Object servletLock = new Object(); @@ -47,8 +45,18 @@ public abstract class DispatcherServlet> filters = new ArrayList<>(); + protected Application application; + protected Filter headFilter; + protected void incrExecuteCounter() { + executeCounter.increment(); + } + + protected void incrIllRequestCounter() { + illRequestCounter.increment(); + } + protected void putServlet(S servlet) { synchronized (servletLock) { Set newservlets = new HashSet<>(servlets); @@ -265,4 +273,13 @@ public abstract class DispatcherServlet servletStream() { return servlets.stream(); } + + public Long getExecuteCounter() { + return executeCounter.longValue(); + } + + public Long getIllRequestCounter() { + return illRequestCounter.longValue(); + } + } diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 595c12c86..1a3442dd3 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -8,7 +8,6 @@ package org.redkale.net; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -import java.util.concurrent.atomic.LongAdder; import java.util.function.*; import java.util.logging.Level; @@ -129,33 +128,26 @@ class ProtocolCodec implements CompletionHandler { } } - protected void decode(final ByteBuffer buffer, final Response response, final int pipelineIndex, final Request lastreq) { + protected void decode(final ByteBuffer buffer, final Response response, final int pipelineIndex, final Request lastReq) { response.init(channel); final Request request = response.request; - final int rs = request.readHeader(buffer, lastreq); + final int rs = request.readHeader(buffer, lastReq); if (rs < 0) { //表示数据格式不正确 final DispatcherServlet preparer = context.prepare; - LongAdder ec = preparer.executeCounter; - if (ec != null) { - ec.increment(); - } + preparer.incrExecuteCounter(); channel.offerBuffer(buffer); - if (rs != Integer.MIN_VALUE && preparer.illRequestCounter != null) { - preparer.illRequestCounter.increment(); + if (rs != Integer.MIN_VALUE) { + preparer.incrIllRequestCounter(); } response.finish(true); if (context.logger.isLoggable(Level.FINEST)) { context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); } } else if (rs == 0) { - final DispatcherServlet preparer = context.prepare; - LongAdder ec = preparer.executeCounter; - if (ec != null) { - ec.increment(); - } + context.prepare.incrExecuteCounter(); int pindex = pipelineIndex; boolean pipeline = false; - Request hreq = lastreq; + Request hreq = lastReq; if (buffer.hasRemaining()) { pipeline = true; if (pindex == 0) { @@ -191,14 +183,12 @@ class ProtocolCodec implements CompletionHandler { return; } attachment.flip(); - decode(attachment, response, pipelineIndex, lastreq); + decode(attachment, response, pipelineIndex, lastReq); } @Override public void failed(Throwable exc, ByteBuffer attachment) { - if (context.prepare.illRequestCounter != null) { - context.prepare.illRequestCounter.increment(); - } + context.prepare.incrIllRequestCounter(); channel.offerBuffer(attachment); response.finish(true); if (exc != null) { diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index b981bf5c0..dd57f099a 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -26,7 +26,7 @@ import org.redkale.util.*; */ public abstract class Client { - public static final int DEFAULT_MAX_PIPELINES = 256; + public static final int DEFAULT_MAX_PIPELINES = 128; protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); @@ -77,15 +77,15 @@ public abstract class Client { protected Function, CompletableFuture> authenticate; protected Client(AsyncGroup group, ClientAddress address) { - this(group, true, address, Utility.cpus(), 16, null, null, null); + this(group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); } protected Client(AsyncGroup group, boolean tcp, ClientAddress address) { - this(group, tcp, address, Utility.cpus(), 16, null, null, null); + this(group, tcp, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); } protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) { - this(group, tcp, address, maxconns, 16, null, null, null); + this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, null); } protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) { @@ -94,12 +94,12 @@ public abstract class Client { protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, Function, CompletableFuture> authenticate) { - this(group, tcp, address, maxconns, 16, null, null, authenticate); + this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, authenticate); } protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, R closeRequest, Function, CompletableFuture> authenticate) { - this(group, tcp, address, maxconns, 16, null, closeRequest, authenticate); + this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequest, authenticate); } @SuppressWarnings("OverridableMethodCallInConstructor") diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index d8c90526f..c0777cae6 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -63,7 +63,9 @@ public abstract class ClientConnection implements Co public void completed(Integer result, Void attachment) { if (writeLastRequest != null && writeLastRequest == client.closeRequest) { if (closeFuture != null) { - closeFuture.complete(null); + channel.getAsyncIOThread().runWork(() -> { + closeFuture.complete(null); + }); } closeFuture = null; return; @@ -103,6 +105,7 @@ public abstract class ClientConnection implements Co this.pauseWriting.set(false); } + //有写入数据返回true,否则返回false private boolean sendWrite(boolean must) { ClientConnection conn = this; ByteArray rw = conn.writeArray; diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 50cb86a99..82450381d 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -305,14 +305,14 @@ public class HttpRequest extends Request { if (last != null && ((HttpRequest) last).headerLength > 0) { final HttpRequest httplast = (HttpRequest) last; int bufremain = buffer.remaining(); - int remainhalf = httplast.headerLength - this.headerHalfLen; - if (remainhalf > bufremain) { + int remainHalf = httplast.headerLength - this.headerHalfLen; + if (remainHalf > bufremain) { bytes.put(buffer); this.headerHalfLen += bufremain; buffer.clear(); return 1; } - buffer.position(buffer.position() + remainhalf); + buffer.position(buffer.position() + remainHalf); this.contentType = httplast.contentType; this.contentLength = httplast.contentLength; this.host = httplast.host;