Response优化finish方法

This commit is contained in:
Redkale
2023-01-15 21:11:59 +08:00
parent 39ade6f3ab
commit 19c8ffb79d
11 changed files with 40 additions and 66 deletions

View File

@@ -323,7 +323,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
public void finish(boolean kill, ByteBuffer buffer) { public void finishBuffer(boolean kill, ByteBuffer buffer) {
if (future == null) { if (future == null) {
return; return;
} }
@@ -333,7 +333,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
public void finish(boolean kill, ByteBuffer... buffers) { public void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (future == null) { if (future == null) {
return; return;
} }

View File

@@ -293,7 +293,7 @@ public class HttpMessageResponse extends HttpResponse {
} }
@Override @Override
public void finish(boolean kill, ByteBuffer buffer) { public void finishBuffer(boolean kill, ByteBuffer buffer) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) { if (callback != null) {
callback.run(); callback.run();
@@ -306,7 +306,7 @@ public class HttpMessageResponse extends HttpResponse {
} }
@Override @Override
public void finish(boolean kill, ByteBuffer... buffers) { public void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) { if (callback != null) {
callback.run(); callback.run();

View File

@@ -190,7 +190,6 @@ public class AsyncIOThread extends WorkThread {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
conn.doConnect(); conn.doConnect();
} else if (conn.readCompletionHandler != null && key.isReadable()) { } else if (conn.readCompletionHandler != null && key.isReadable()) {
conn.currReadInvoker = 0;
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
conn.doRead(true); conn.doRead(true);
} else if (conn.writeCompletionHandler != null && key.isWritable()) { } else if (conn.writeCompletionHandler != null && key.isWritable()) {
@@ -199,7 +198,6 @@ public class AsyncIOThread extends WorkThread {
} }
} else { } else {
if (conn.readCompletionHandler != null && key.isReadable()) { if (conn.readCompletionHandler != null && key.isReadable()) {
conn.currReadInvoker = 0;
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); //不放开这行在CompletableFuture时容易ReadPending key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); //不放开这行在CompletableFuture时容易ReadPending
conn.doRead(true); conn.doRead(true);
} else if (conn.writeCompletionHandler != null && key.isWritable()) { } else if (conn.writeCompletionHandler != null && key.isWritable()) {

View File

@@ -26,8 +26,6 @@ import org.redkale.util.ByteBufferWriter;
*/ */
abstract class AsyncNioConnection extends AsyncConnection { abstract class AsyncNioConnection extends AsyncConnection {
protected static final int MAX_INVOKER_ONSTACK = Integer.getInteger("redkale.net.invoker.max.onstack", 16);
final AsyncIOThread connectThread; final AsyncIOThread connectThread;
protected SocketAddress remoteAddress; protected SocketAddress remoteAddress;
@@ -46,8 +44,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected int readTimeoutSeconds; protected int readTimeoutSeconds;
int currReadInvoker;
protected ByteBuffer readByteBuffer; protected ByteBuffer readByteBuffer;
protected CompletionHandler<Integer, ByteBuffer> readCompletionHandler; protected CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
@@ -134,7 +130,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override @Override
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) { protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
currReadInvoker = MAX_INVOKER_ONSTACK;
read(handler); read(handler);
} }
@@ -159,11 +154,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.readCompletionHandler = handler; this.readCompletionHandler = handler;
} }
doRead(this.ioReadThread.inCurrThread()); doRead(this.ioReadThread.inCurrThread());
// if (client) {
// doRead(this.ioReadThread.inCurrThread());
// } else {
// doRead(this.ioReadThread.inCurrThread() || currReadInvoker < MAX_INVOKER_ONSTACK); //同一线程中Selector.wakeup无效
// }
} }
@Override @Override
@@ -264,7 +254,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.readtime = System.currentTimeMillis(); this.readtime = System.currentTimeMillis();
int readCount = 0; int readCount = 0;
if (direct) { if (direct) {
currReadInvoker++;
if (this.readByteBuffer == null) { if (this.readByteBuffer == null) {
this.readByteBuffer = sslEngine == null ? pollReadBuffer() : pollReadSSLBuffer(); this.readByteBuffer = sslEngine == null ? pollReadBuffer() : pollReadSSLBuffer();
if (this.readTimeoutSeconds > 0) { if (this.readTimeoutSeconds > 0) {

View File

@@ -134,7 +134,7 @@ public class Context {
} }
} catch (Throwable t) { } catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.error(); response.error(t);
} }
}); });
} else if (workExecutor != null) { } else if (workExecutor != null) {
@@ -144,7 +144,7 @@ public class Context {
servlet.execute(request, response); servlet.execute(request, response);
} catch (Throwable t) { } catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.error(); response.error(t);
} }
}); });
} else { } else {
@@ -153,7 +153,7 @@ public class Context {
servlet.execute(request, response); servlet.execute(request, response);
} catch (Throwable t) { } catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.error(); response.error(t);
} }
} }

View File

@@ -254,7 +254,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
response.nextEvent(); response.nextEvent();
} catch (Throwable t) { } catch (Throwable t) {
response.context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); response.context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.error(); response.error(t);
} }
} }

View File

@@ -81,7 +81,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(buffer, response, 0, null); decode(buffer, response, 0, null);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true); response.error(t);
} }
} }
@@ -102,7 +102,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(data, response, 0, null); decode(data, response, 0, null);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true); response.error(t);
} }
return; return;
} }
@@ -123,7 +123,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(data, response, 0, null); decode(data, response, 0, null);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true); response.error(t);
} }
return; return;
} }
@@ -148,7 +148,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (rs != Integer.MIN_VALUE) { if (rs != Integer.MIN_VALUE) {
preparer.incrIllegalRequestCounter(); preparer.incrIllegalRequestCounter();
} }
response.finish(true); response.error(null);
if (context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel ");
} }
@@ -177,7 +177,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
decode(buffer, pipelineResponse, pindex + 1, hreq); decode(buffer, pipelineResponse, pindex + 1, hreq);
} catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare pipeline servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "prepare pipeline servlet abort, force to close channel ", t);
pipelineResponse.finish(true); pipelineResponse.error(t);
} }
} }
} else { } else {
@@ -199,7 +199,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
context.prepare.incrIllegalRequestCounter(); context.prepare.incrIllegalRequestCounter();
channel.offerReadBuffer(attachment); channel.offerReadBuffer(attachment);
response.finish(true); response.error(exc);
if (exc != null) { if (exc != null) {
request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);
} }

View File

@@ -49,21 +49,21 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private final ByteBuffer writeBuffer; private final ByteBuffer writeBuffer;
private final CompletionHandler finishBytesHandler = new CompletionHandler<Integer, Void>() { protected final CompletionHandler finishBytesHandler = new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
finish(); finishInIOThread();
} }
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
finish(true); finishInIOThread(true);
} }
}; };
private final CompletionHandler finishBufferHandler = new CompletionHandler<Integer, ByteBuffer>() { protected final CompletionHandler finishBufferHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer result, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {
@@ -72,7 +72,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} else { } else {
attachment.clear(); attachment.clear();
} }
finish(); finishInIOThread();
} }
@Override @Override
@@ -82,7 +82,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} else { } else {
attachment.clear(); attachment.clear();
} }
finish(true); finishInIOThread(true);
} }
}; };
@@ -96,7 +96,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
channel.offerWriteBuffer(attachment); channel.offerWriteBuffer(attachment);
} }
} }
finish(); finishInIOThread();
} }
@Override @Override
@@ -106,7 +106,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
channel.offerWriteBuffer(attachment); channel.offerWriteBuffer(attachment);
} }
} }
finish(true); finishInIOThread(true);
} }
}; };
@@ -201,15 +201,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
return !this.inited; return !this.inited;
} }
public void finish() { private void finishInIOThread() {
this.finish(false); this.finishInIOThread(false);
} }
protected void error() { protected void error(Throwable t) {
finish(true); finishInIOThread(true);
} }
public void finish(boolean kill) { private void finishInIOThread(boolean kill) {
if (!this.inited) { if (!this.inited) {
return; //避免重复关闭 return; //避免重复关闭
} //System.println("耗时: " + (System.currentTimeMillis() - request.createtime)); } //System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
@@ -318,15 +318,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
} }
protected final void finish(ByteBuffer buffer) { protected final void finishBuffer(ByteBuffer buffer) {
finish(false, buffer); finishBuffers(false, buffer);
} }
protected final void finish(ByteBuffer... buffers) { protected final void finishBuffers(ByteBuffer... buffers) {
finish(false, buffers); finishBuffers(false, buffers);
} }
protected void finish(boolean kill, ByteBuffer buffer) { protected void finishBuffer(boolean kill, ByteBuffer buffer) {
if (!this.inited) { if (!this.inited) {
return; //避免重复关闭 return; //避免重复关闭
} }
@@ -351,7 +351,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
} }
protected void finish(boolean kill, ByteBuffer... buffers) { protected void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (!this.inited) { if (!this.inited) {
return; //避免重复关闭 return; //避免重复关闭
} }

View File

@@ -177,19 +177,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private final JsonBytesWriter jsonWriter = new JsonBytesWriter(); private final JsonBytesWriter jsonWriter = new JsonBytesWriter();
protected final CompletionHandler<Integer, Void> pipelineWriteHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
finish();
}
@Override
public void failed(Throwable exc, Void attachment) {
finish(true);
}
};
@SuppressWarnings("Convert2Lambda") @SuppressWarnings("Convert2Lambda")
protected final ConvertBytesHandler convertHandler = new ConvertBytesHandler() { protected final ConvertBytesHandler convertHandler = new ConvertBytesHandler() {
@Override @Override
@@ -964,7 +951,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
boolean over = this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data); boolean over = this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data);
if (over) { if (over) {
request.setPipelineCompleted(true); request.setPipelineCompleted(true);
this.channel.flushPipelineData(this.pipelineWriteHandler); this.channel.flushPipelineData(this.finishBytesHandler);
} else { } else {
removeChannel(); removeChannel();
this.responseConsumer.accept(this); this.responseConsumer.accept(this);
@@ -972,7 +959,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else { } else {
if (this.channel.hasPipelineData()) { if (this.channel.hasPipelineData()) {
this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data); this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data);
this.channel.flushPipelineData(this.pipelineWriteHandler); this.channel.flushPipelineData(this.finishBytesHandler);
} else { } else {
//不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish //不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish
super.finish(false, data.content(), 0, data.length()); super.finish(false, data.content(), 0, data.length());
@@ -981,7 +968,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} }
@Override @Override
protected void error() { protected void error(Throwable t) {
finish500(); finish500();
} }
@@ -1337,7 +1324,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
fileChannel.close(); fileChannel.close();
} catch (IOException ie) { } catch (IOException ie) {
} }
finish(); finishBytesHandler.completed(result, attachment);
return; return;
} }
if (fileChannel == null) { if (fileChannel == null) {

View File

@@ -70,7 +70,7 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
@Override @Override
public void execute(SncpRequest request, SncpResponse response) throws IOException { public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (request.isPing()) { if (request.isPing()) {
response.finish(false, Sncp.PONG_BUFFER.duplicate()); response.finishBuffer(false, Sncp.PONG_BUFFER.duplicate());
return; return;
} }
SncpServlet servlet = (SncpServlet) mappingServlet(request.getServiceid()); SncpServlet servlet = (SncpServlet) mappingServlet(request.getServiceid());

View File

@@ -68,8 +68,8 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
} }
@Override @Override
protected void finish(boolean kill, ByteBuffer buffer) { protected void finishBuffer(boolean kill, ByteBuffer buffer) {
super.finish(kill, buffer); super.finishBuffer(kill, buffer);
} }
public void finish(final int retcode, final BsonWriter out) { public void finish(final int retcode, final BsonWriter out) {