From 2921478a0a0bb1e6416210ff63d0229c720cd3dc Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 20 Dec 2018 11:38:11 +0800 Subject: [PATCH] --- src/org/redkale/net/PrepareRunner.java | 69 ++++++++++++++++--- src/org/redkale/net/PrepareServlet.java | 68 ++---------------- .../redkale/net/http/HttpPrepareServlet.java | 2 +- src/org/redkale/net/http/HttpResponse.java | 6 +- 4 files changed, 69 insertions(+), 76 deletions(-) diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index a729f5a99..4b9e58c51 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.*; import java.nio.channels.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.*; import org.redkale.util.*; @@ -41,15 +42,14 @@ public class PrepareRunner implements Runnable { @Override public void run() { final boolean keepalive = response != null; - final PrepareServlet prepare = context.prepare; final ObjectPool responsePool = context.responsePool; if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 if (response == null) response = responsePool.get(); try { response.init(channel); - prepare.prepare(data, response.request, response); + prepare(data, response.request, response); } catch (Throwable t) { - context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t); + context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); response.finish(true); } return; @@ -76,9 +76,9 @@ public class PrepareRunner implements Runnable { buffer.flip(); response.init(channel); try { - prepare.prepare(buffer, response.request, response); + prepare(buffer, response.request, response); } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer - context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t); + context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); response.finish(true); } } @@ -90,7 +90,7 @@ public class PrepareRunner implements Runnable { response.removeChannel(); response.finish(true); if (exc != null && context.logger.isLoggable(Level.FINEST)) { - context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, forece to close channel ", exc); + context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, force to close channel ", exc); } } }); @@ -99,13 +99,64 @@ public class PrepareRunner implements Runnable { response.removeChannel(); response.finish(true); if (te != null && context.logger.isLoggable(Level.FINEST)) { - context.logger.log(Level.FINEST, "Servlet read channel erroneous, forece to close channel ", te); + context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te); } } } - protected void prepare(ByteBuffer buffer, Request request, Response response) throws IOException { - context.prepare.prepare(buffer, request, response); + protected void prepare(final ByteBuffer buffer, final Request request, final Response response) throws IOException { + final PrepareServlet preparer = context.prepare; + preparer.executeCounter.incrementAndGet(); + final int rs = request.readHeader(buffer); + if (rs < 0) { //表示数据格式不正确 + channel.offerBuffer(buffer); + if (rs != Integer.MIN_VALUE) preparer.illRequestCounter.incrementAndGet(); + response.finish(true); + } else if (rs == 0) { + if (buffer.hasRemaining()) { + request.setMoredata(buffer); + } else { + channel.offerBuffer(buffer); + } + preparer.prepare(request, response); + } else { + buffer.clear(); + channel.setReadBuffer(buffer); + final AtomicInteger ai = new AtomicInteger(rs); + channel.read(new CompletionHandler() { + + @Override + public void completed(Integer result, ByteBuffer attachment) { + attachment.flip(); + ai.addAndGet(-request.readBody(attachment)); + if (ai.get() > 0) { + attachment.clear(); + channel.setReadBuffer(attachment); + channel.read(this); + } else { + if (attachment.hasRemaining()) { + request.setMoredata(attachment); + } else { + channel.offerBuffer(attachment); + } + try { + preparer.prepare(request, response); + } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免preparer.prepare内部异常导致重复 offerBuffer + context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); + response.finish(true); + } + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + preparer.illRequestCounter.incrementAndGet(); + channel.offerBuffer(attachment); + response.finish(true); + if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); + } + }); + } } protected void initResponse(Response response, AsyncConnection channel) { diff --git a/src/org/redkale/net/PrepareServlet.java b/src/org/redkale/net/PrepareServlet.java index 65b991a7d..0bd8ebb38 100644 --- a/src/org/redkale/net/PrepareServlet.java +++ b/src/org/redkale/net/PrepareServlet.java @@ -6,12 +6,9 @@ package org.redkale.net; import java.io.*; -import java.nio.*; -import java.nio.channels.*; import java.util.*; import java.util.concurrent.atomic.*; import java.util.function.Predicate; -import java.util.logging.*; import org.redkale.util.*; /** @@ -210,66 +207,11 @@ public abstract class PrepareServlet() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - attachment.flip(); - ai.addAndGet(-request.readBody(attachment)); - if (ai.get() > 0) { - attachment.clear(); - channel.setReadBuffer(attachment); - channel.read(this); - } else { - if (attachment.hasRemaining()) { - request.setMoredata(attachment); - } else { - channel.offerBuffer(attachment); - } - request.prepare(); - try { - response.filter = PrepareServlet.this.headFilter; - response.servlet = PrepareServlet.this; - response.nextEvent(); - } catch (Exception e) { - illRequestCounter.incrementAndGet(); - response.finish(true); - request.context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", e); - } - } - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - illRequestCounter.incrementAndGet(); - channel.offerBuffer(attachment); - response.finish(true); - if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, forece to close channel ", exc); - } - }); - } + public final void prepare(final R request, final P response) throws IOException { + request.prepare(); + response.filter = this.headFilter; + response.servlet = this; + response.nextEvent(); } protected AnyValue getServletConf(Servlet servlet) { diff --git a/src/org/redkale/net/http/HttpPrepareServlet.java b/src/org/redkale/net/http/HttpPrepareServlet.java index a0d701162..45465e0be 100644 --- a/src/org/redkale/net/http/HttpPrepareServlet.java +++ b/src/org/redkale/net/http/HttpPrepareServlet.java @@ -311,7 +311,7 @@ public class HttpPrepareServlet extends PrepareServlet { } else if (obj instanceof CompletableFuture) { ((CompletableFuture) obj).whenComplete((v, e) -> { if (e != null) { - context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletableFuture", (Throwable) e); + context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletableFuture", (Throwable) e); finish(500, null); return; } @@ -489,7 +489,7 @@ public class HttpResponse extends Response { try { finish((File) obj); } catch (IOException e) { - context.getLogger().log(Level.WARNING, "HttpServlet finish File occur, forece to close channel. request = " + getRequest(), e); + context.getLogger().log(Level.WARNING, "HttpServlet finish File occur, force to close channel. request = " + getRequest(), e); finish(500, null); } } else if (obj instanceof HttpResult) {