优化DEFAULT_MAX_PIPELINES

This commit is contained in:
Redkale
2023-01-03 12:08:42 +08:00
parent c72d0bec8c
commit 979a263c88
5 changed files with 43 additions and 33 deletions

View File

@@ -31,11 +31,9 @@ import org.redkale.util.*;
*/ */
public abstract class DispatcherServlet<K extends Serializable, C extends Context, R extends Request<C>, P extends Response<C, R>, S extends Servlet<C, R, P>> extends Servlet<C, R, P> { public abstract class DispatcherServlet<K extends Serializable, C extends Context, R extends Request<C>, P extends Response<C, R>, S extends Servlet<C, R, P>> extends Servlet<C, R, P> {
protected final LongAdder executeCounter = new LongAdder(); //执行请求次数 private final LongAdder executeCounter = new LongAdder(); //执行请求次数
protected final LongAdder illRequestCounter = new LongAdder(); //错误请求次数 private final LongAdder illRequestCounter = new LongAdder(); //错误请求次数
protected Application application;
private final Object servletLock = new Object(); private final Object servletLock = new Object();
@@ -47,8 +45,18 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
private final List<Filter<C, R, P>> filters = new ArrayList<>(); private final List<Filter<C, R, P>> filters = new ArrayList<>();
protected Application application;
protected Filter<C, R, P> headFilter; protected Filter<C, R, P> headFilter;
protected void incrExecuteCounter() {
executeCounter.increment();
}
protected void incrIllRequestCounter() {
illRequestCounter.increment();
}
protected void putServlet(S servlet) { protected void putServlet(S servlet) {
synchronized (servletLock) { synchronized (servletLock) {
Set<S> newservlets = new HashSet<>(servlets); Set<S> newservlets = new HashSet<>(servlets);
@@ -265,4 +273,13 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
protected Stream<S> servletStream() { protected Stream<S> servletStream() {
return servlets.stream(); return servlets.stream();
} }
public Long getExecuteCounter() {
return executeCounter.longValue();
}
public Long getIllRequestCounter() {
return illRequestCounter.longValue();
}
} }

View File

@@ -8,7 +8,6 @@ package org.redkale.net;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level; import java.util.logging.Level;
@@ -129,33 +128,26 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
} }
} }
protected void decode(final ByteBuffer buffer, final Response response, final int pipelineIndex, final Request lastreq) { protected void decode(final ByteBuffer buffer, final Response response, final int pipelineIndex, final Request lastReq) {
response.init(channel); response.init(channel);
final Request request = response.request; final Request request = response.request;
final int rs = request.readHeader(buffer, lastreq); final int rs = request.readHeader(buffer, lastReq);
if (rs < 0) { //表示数据格式不正确 if (rs < 0) { //表示数据格式不正确
final DispatcherServlet preparer = context.prepare; final DispatcherServlet preparer = context.prepare;
LongAdder ec = preparer.executeCounter; preparer.incrExecuteCounter();
if (ec != null) {
ec.increment();
}
channel.offerBuffer(buffer); channel.offerBuffer(buffer);
if (rs != Integer.MIN_VALUE && preparer.illRequestCounter != null) { if (rs != Integer.MIN_VALUE) {
preparer.illRequestCounter.increment(); preparer.incrIllRequestCounter();
} }
response.finish(true); response.finish(true);
if (context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel ");
} }
} else if (rs == 0) { } else if (rs == 0) {
final DispatcherServlet preparer = context.prepare; context.prepare.incrExecuteCounter();
LongAdder ec = preparer.executeCounter;
if (ec != null) {
ec.increment();
}
int pindex = pipelineIndex; int pindex = pipelineIndex;
boolean pipeline = false; boolean pipeline = false;
Request hreq = lastreq; Request hreq = lastReq;
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
pipeline = true; pipeline = true;
if (pindex == 0) { if (pindex == 0) {
@@ -191,14 +183,12 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
return; return;
} }
attachment.flip(); attachment.flip();
decode(attachment, response, pipelineIndex, lastreq); decode(attachment, response, pipelineIndex, lastReq);
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
if (context.prepare.illRequestCounter != null) { context.prepare.incrIllRequestCounter();
context.prepare.illRequestCounter.increment();
}
channel.offerBuffer(attachment); channel.offerBuffer(attachment);
response.finish(true); response.finish(true);
if (exc != null) { if (exc != null) {

View File

@@ -26,7 +26,7 @@ import org.redkale.util.*;
*/ */
public abstract class Client<R extends ClientRequest, P> { public abstract class Client<R extends ClientRequest, P> {
public static final int DEFAULT_MAX_PIPELINES = 256; public static final int DEFAULT_MAX_PIPELINES = 128;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -77,15 +77,15 @@ public abstract class Client<R extends ClientRequest, P> {
protected Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate; protected Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate;
protected Client(AsyncGroup group, ClientAddress address) { protected Client(AsyncGroup group, ClientAddress address) {
this(group, true, address, Utility.cpus(), 16, null, null, null); this(group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address) { protected Client(AsyncGroup group, boolean tcp, ClientAddress address) {
this(group, tcp, address, Utility.cpus(), 16, null, null, null); this(group, tcp, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) { protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) {
this(group, tcp, address, maxconns, 16, null, null, null); this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) { protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) {
@@ -94,12 +94,12 @@ public abstract class Client<R extends ClientRequest, P> {
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
this(group, tcp, address, maxconns, 16, null, null, authenticate); this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, authenticate);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
R closeRequest, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { R closeRequest, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
this(group, tcp, address, maxconns, 16, null, closeRequest, authenticate); this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequest, authenticate);
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")

View File

@@ -63,7 +63,9 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
if (writeLastRequest != null && writeLastRequest == client.closeRequest) { if (writeLastRequest != null && writeLastRequest == client.closeRequest) {
if (closeFuture != null) { if (closeFuture != null) {
channel.getAsyncIOThread().runWork(() -> {
closeFuture.complete(null); closeFuture.complete(null);
});
} }
closeFuture = null; closeFuture = null;
return; return;
@@ -103,6 +105,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.pauseWriting.set(false); this.pauseWriting.set(false);
} }
//有写入数据返回true否则返回false
private boolean sendWrite(boolean must) { private boolean sendWrite(boolean must) {
ClientConnection conn = this; ClientConnection conn = this;
ByteArray rw = conn.writeArray; ByteArray rw = conn.writeArray;

View File

@@ -305,14 +305,14 @@ public class HttpRequest extends Request<HttpContext> {
if (last != null && ((HttpRequest) last).headerLength > 0) { if (last != null && ((HttpRequest) last).headerLength > 0) {
final HttpRequest httplast = (HttpRequest) last; final HttpRequest httplast = (HttpRequest) last;
int bufremain = buffer.remaining(); int bufremain = buffer.remaining();
int remainhalf = httplast.headerLength - this.headerHalfLen; int remainHalf = httplast.headerLength - this.headerHalfLen;
if (remainhalf > bufremain) { if (remainHalf > bufremain) {
bytes.put(buffer); bytes.put(buffer);
this.headerHalfLen += bufremain; this.headerHalfLen += bufremain;
buffer.clear(); buffer.clear();
return 1; return 1;
} }
buffer.position(buffer.position() + remainhalf); buffer.position(buffer.position() + remainHalf);
this.contentType = httplast.contentType; this.contentType = httplast.contentType;
this.contentLength = httplast.contentLength; this.contentLength = httplast.contentLength;
this.host = httplast.host; this.host = httplast.host;