client优化

This commit is contained in:
redkale
2023-03-31 12:07:39 +08:00
parent 00829cf198
commit 6b92748b7c
4 changed files with 40 additions and 21 deletions

View File

@@ -81,8 +81,8 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
final Response response = createResponse();
try {
decode(buffer, response, 0, null);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.error(t);
}
}
@@ -103,7 +103,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
try {
decode(data, response, 0, null);
} catch (Throwable t) {
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.error(t);
}
return;
@@ -124,7 +124,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
try {
decode(data, response, 0, null);
} catch (Throwable t) {
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.error(t);
}
return;
@@ -144,11 +144,11 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
final Request request = response.request;
final int rs = request.readHeader(buffer, lastReq);
if (rs < 0) { //表示数据格式不正确
final DispatcherServlet preparer = context.dispatcher;
preparer.incrExecuteCounter();
final DispatcherServlet dispatcher = context.dispatcher;
dispatcher.incrExecuteCounter();
channel.offerReadBuffer(buffer);
if (rs != Integer.MIN_VALUE) {
preparer.incrIllegalRequestCounter();
dispatcher.incrIllegalRequestCounter();
}
response.error(null);
if (context.logger.isLoggable(Level.FINEST)) {
@@ -181,8 +181,8 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
final Response pipelineResponse = createResponse();
try {
decode(buffer, pipelineResponse, pindex + 1, hreq);
} catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare pipeline servlet abort, force to close channel ", t);
} catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t);
pipelineResponse.error(t);
}
}

View File

@@ -7,6 +7,7 @@ package org.redkale.net.client;
import java.io.Serializable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
@@ -46,6 +47,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final ByteArray writeArray = new ByteArray();
protected final ByteBuffer writeBuffer;
final AtomicBoolean pauseWriting = new AtomicBoolean();
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
@@ -79,6 +82,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.connEntry = index == -2 ? null : (index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()));
this.respWaitingCounter = index == -2 ? new LongAdder() : (index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting);
this.channel = channel.beforeCloseListener(this);
this.writeBuffer = channel.pollWriteBuffer();
}
protected abstract ClientCodec createCodec();
@@ -102,7 +106,14 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
request.writeTo(this, writeArray);
doneRequestCounter.increment();
if (writeArray.length() > 0) {
channel.write(writeArray, this, writeHandler);
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
} else {
if (channel.inCurrWriteThread()) {
@@ -134,7 +145,14 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
channel.write(writeArray, this, writeHandler);
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
}
@@ -194,6 +212,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
public void dispose(Throwable exc) {
channel.offerWriteBuffer(writeBuffer);
channel.dispose();
Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f;

View File

@@ -6,17 +6,17 @@
package org.redkale.net.http;
import java.io.*;
import java.lang.annotation.*;
import java.lang.annotation.Annotation;
import java.net.*;
import java.nio.*;
import java.nio.ByteBuffer;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.redkale.annotation.Comment;
import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.net.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.Request;
import org.redkale.util.*;
/**
@@ -292,7 +292,7 @@ public class HttpRequest extends Request<HttpContext> {
final ByteBuffer buffer = buf;
ByteArray bytes = array;
if (this.readState == READ_STATE_ROUTE) {
int rs = readMethodLine(buffer);
int rs = readMethodUriLine(buffer);
if (rs != 0) {
return rs;
}
@@ -455,7 +455,7 @@ public class HttpRequest extends Request<HttpContext> {
}
//解析 GET /xxx HTTP/1.1
private int readMethodLine(final ByteBuffer buf) {
private int readMethodUriLine(final ByteBuffer buf) {
final ByteBuffer buffer = buf;
Charset charset = this.context.getCharset();
int remain = buffer.remaining();

View File

@@ -32,10 +32,10 @@ public class MapIgnoreColumnTest {
String rs = "{\"bbb\":[1,2]}";
if (!main) Assertions.assertEquals(rs, convert.convertTo(map));
System.out.println(convert.convertTo(map));
JsonConvert convert2 = JsonConvert.root().newConvert((k, v) -> {
JsonConvert convert2 = JsonConvert.root().newConvert(null, (k, v) -> {
if ("bbb".equals(k)) return null;
return v;
}, null, null);
}, null);
if (!main) Assertions.assertEquals("{\"aaa\":\"123\",\"bbb\":null}", convert2.convertTo(map));
System.out.println(convert2.convertTo(map));
}