diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index dc27212b1..ff1786b42 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -81,8 +81,8 @@ class ProtocolCodec implements CompletionHandler { final Response response = createResponse(); try { decode(buffer, response, 0, null); - } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer - context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); + } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer + context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); response.error(t); } } @@ -103,7 +103,7 @@ class ProtocolCodec implements CompletionHandler { try { decode(data, response, 0, null); } catch (Throwable t) { - context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); + context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); response.error(t); } return; @@ -124,7 +124,7 @@ class ProtocolCodec implements CompletionHandler { try { decode(data, response, 0, null); } catch (Throwable t) { - context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); + context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); response.error(t); } return; @@ -144,11 +144,11 @@ class ProtocolCodec implements CompletionHandler { final Request request = response.request; final int rs = request.readHeader(buffer, lastReq); if (rs < 0) { //表示数据格式不正确 - final DispatcherServlet preparer = context.dispatcher; - preparer.incrExecuteCounter(); + final DispatcherServlet dispatcher = context.dispatcher; + dispatcher.incrExecuteCounter(); channel.offerReadBuffer(buffer); if (rs != Integer.MIN_VALUE) { - preparer.incrIllegalRequestCounter(); + dispatcher.incrIllegalRequestCounter(); } response.error(null); if (context.logger.isLoggable(Level.FINEST)) { @@ -181,8 +181,8 @@ class ProtocolCodec implements CompletionHandler { final Response pipelineResponse = createResponse(); try { decode(buffer, pipelineResponse, pindex + 1, hreq); - } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer - context.logger.log(Level.WARNING, "prepare pipeline servlet abort, force to close channel ", t); + } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer + context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t); pipelineResponse.error(t); } } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index cd95559dc..7e8399283 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -7,6 +7,7 @@ package org.redkale.net.client; import java.io.Serializable; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; @@ -46,6 +47,8 @@ public abstract class ClientConnection implements Co protected final ByteArray writeArray = new ByteArray(); + protected final ByteBuffer writeBuffer; + final AtomicBoolean pauseWriting = new AtomicBoolean(); final ConcurrentLinkedQueue pauseRequests = new ConcurrentLinkedQueue<>(); @@ -79,6 +82,7 @@ public abstract class ClientConnection implements Co this.connEntry = index == -2 ? null : (index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress())); this.respWaitingCounter = index == -2 ? new LongAdder() : (index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting); this.channel = channel.beforeCloseListener(this); + this.writeBuffer = channel.pollWriteBuffer(); } protected abstract ClientCodec createCodec(); @@ -102,7 +106,14 @@ public abstract class ClientConnection implements Co request.writeTo(this, writeArray); doneRequestCounter.increment(); if (writeArray.length() > 0) { - channel.write(writeArray, this, writeHandler); + if (writeBuffer.capacity() >= writeArray.length()) { + writeBuffer.clear(); + writeBuffer.put(writeArray.content(), 0, writeArray.length()); + writeBuffer.flip(); + channel.write(writeBuffer, this, writeHandler); + } else { + channel.write(writeArray, this, writeHandler); + } } } else { if (channel.inCurrWriteThread()) { @@ -134,7 +145,14 @@ public abstract class ClientConnection implements Co currHalfWriteFuture = respFuture; } if (writeArray.length() > 0) { - channel.write(writeArray, this, writeHandler); + if (writeBuffer.capacity() >= writeArray.length()) { + writeBuffer.clear(); + writeBuffer.put(writeArray.content(), 0, writeArray.length()); + writeBuffer.flip(); + channel.write(writeBuffer, this, writeHandler); + } else { + channel.write(writeArray, this, writeHandler); + } } } @@ -194,6 +212,7 @@ public abstract class ClientConnection implements Co } public void dispose(Throwable exc) { + channel.offerWriteBuffer(writeBuffer); channel.dispose(); Throwable e = exc == null ? new ClosedChannelException() : exc; CompletableFuture f; diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 01e8ecd32..78a17a97c 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -6,17 +6,17 @@ package org.redkale.net.http; import java.io.*; -import java.lang.annotation.*; +import java.lang.annotation.Annotation; import java.net.*; -import java.nio.*; +import java.nio.ByteBuffer; import java.nio.charset.*; import java.util.*; -import java.util.function.*; -import java.util.logging.*; +import java.util.function.Supplier; +import java.util.logging.Level; import org.redkale.annotation.Comment; import org.redkale.convert.*; -import org.redkale.convert.json.*; -import org.redkale.net.*; +import org.redkale.convert.json.JsonConvert; +import org.redkale.net.Request; import org.redkale.util.*; /** @@ -292,7 +292,7 @@ public class HttpRequest extends Request { final ByteBuffer buffer = buf; ByteArray bytes = array; if (this.readState == READ_STATE_ROUTE) { - int rs = readMethodLine(buffer); + int rs = readMethodUriLine(buffer); if (rs != 0) { return rs; } @@ -455,7 +455,7 @@ public class HttpRequest extends Request { } //解析 GET /xxx HTTP/1.1 - private int readMethodLine(final ByteBuffer buf) { + private int readMethodUriLine(final ByteBuffer buf) { final ByteBuffer buffer = buf; Charset charset = this.context.getCharset(); int remain = buffer.remaining(); diff --git a/src/test/java/org/redkale/test/convert/MapIgnoreColumnTest.java b/src/test/java/org/redkale/test/convert/MapIgnoreColumnTest.java index 9f229b779..620f737c3 100644 --- a/src/test/java/org/redkale/test/convert/MapIgnoreColumnTest.java +++ b/src/test/java/org/redkale/test/convert/MapIgnoreColumnTest.java @@ -32,10 +32,10 @@ public class MapIgnoreColumnTest { String rs = "{\"bbb\":[1,2]}"; if (!main) Assertions.assertEquals(rs, convert.convertTo(map)); System.out.println(convert.convertTo(map)); - JsonConvert convert2 = JsonConvert.root().newConvert((k, v) -> { + JsonConvert convert2 = JsonConvert.root().newConvert(null, (k, v) -> { if ("bbb".equals(k)) return null; return v; - }, null, null); + }, null); if (!main) Assertions.assertEquals("{\"aaa\":\"123\",\"bbb\":null}", convert2.convertTo(map)); System.out.println(convert2.convertTo(map)); }