This commit is contained in:
Redkale
2018-12-20 11:38:11 +08:00
parent 61a5420d48
commit 2921478a0a
4 changed files with 69 additions and 76 deletions

View File

@@ -9,6 +9,7 @@ import java.io.IOException;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.*;
import org.redkale.util.*;
@@ -41,15 +42,14 @@ public class PrepareRunner implements Runnable {
@Override
public void run() {
final boolean keepalive = response != null;
final PrepareServlet prepare = context.prepare;
final ObjectPool<? extends Response> responsePool = context.responsePool;
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
if (response == null) response = responsePool.get();
try {
response.init(channel);
prepare.prepare(data, response.request, response);
prepare(data, response.request, response);
} catch (Throwable t) {
context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t);
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true);
}
return;
@@ -76,9 +76,9 @@ public class PrepareRunner implements Runnable {
buffer.flip();
response.init(channel);
try {
prepare.prepare(buffer, response.request, response);
prepare(buffer, response.request, response);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t);
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true);
}
}
@@ -90,7 +90,7 @@ public class PrepareRunner implements Runnable {
response.removeChannel();
response.finish(true);
if (exc != null && context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, forece to close channel ", exc);
context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, force to close channel ", exc);
}
}
});
@@ -99,13 +99,64 @@ public class PrepareRunner implements Runnable {
response.removeChannel();
response.finish(true);
if (te != null && context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet read channel erroneous, forece to close channel ", te);
context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te);
}
}
}
protected void prepare(ByteBuffer buffer, Request request, Response response) throws IOException {
context.prepare.prepare(buffer, request, response);
protected void prepare(final ByteBuffer buffer, final Request request, final Response response) throws IOException {
final PrepareServlet preparer = context.prepare;
preparer.executeCounter.incrementAndGet();
final int rs = request.readHeader(buffer);
if (rs < 0) { //表示数据格式不正确
channel.offerBuffer(buffer);
if (rs != Integer.MIN_VALUE) preparer.illRequestCounter.incrementAndGet();
response.finish(true);
} else if (rs == 0) {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else {
channel.offerBuffer(buffer);
}
preparer.prepare(request, response);
} else {
buffer.clear();
channel.setReadBuffer(buffer);
final AtomicInteger ai = new AtomicInteger(rs);
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
ai.addAndGet(-request.readBody(attachment));
if (ai.get() > 0) {
attachment.clear();
channel.setReadBuffer(attachment);
channel.read(this);
} else {
if (attachment.hasRemaining()) {
request.setMoredata(attachment);
} else {
channel.offerBuffer(attachment);
}
try {
preparer.prepare(request, response);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免preparer.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
preparer.illRequestCounter.incrementAndGet();
channel.offerBuffer(attachment);
response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);
}
});
}
}
protected void initResponse(Response response, AsyncConnection channel) {

View File

@@ -6,12 +6,9 @@
package org.redkale.net;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.function.Predicate;
import java.util.logging.*;
import org.redkale.util.*;
/**
@@ -210,66 +207,11 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
@SuppressWarnings("unchecked")
public abstract void addServlet(S servlet, Object attachment, AnyValue conf, K... mappings);
public void prepare(final ByteBuffer buffer, final R request, final P response) throws IOException {
executeCounter.incrementAndGet();
final int rs = request.readHeader(buffer);
final AsyncConnection channel = request.channel;
if (rs < 0) { //表示数据格式不正确
channel.offerBuffer(buffer);
if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet();
response.finish(true);
} else if (rs == 0) {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else {
channel.offerBuffer(buffer);
}
request.prepare();
response.filter = this.headFilter;
response.servlet = this;
response.nextEvent();
} else {
buffer.clear();
channel.setReadBuffer(buffer);
final AtomicInteger ai = new AtomicInteger(rs);
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
ai.addAndGet(-request.readBody(attachment));
if (ai.get() > 0) {
attachment.clear();
channel.setReadBuffer(attachment);
channel.read(this);
} else {
if (attachment.hasRemaining()) {
request.setMoredata(attachment);
} else {
channel.offerBuffer(attachment);
}
request.prepare();
try {
response.filter = PrepareServlet.this.headFilter;
response.servlet = PrepareServlet.this;
response.nextEvent();
} catch (Exception e) {
illRequestCounter.incrementAndGet();
response.finish(true);
request.context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", e);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
illRequestCounter.incrementAndGet();
channel.offerBuffer(attachment);
response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, forece to close channel ", exc);
}
});
}
public final void prepare(final R request, final P response) throws IOException {
request.prepare();
response.filter = this.headFilter;
response.servlet = this;
response.nextEvent();
}
protected AnyValue getServletConf(Servlet servlet) {

View File

@@ -311,7 +311,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
}
servlet.execute(request, response);
} catch (Exception e) {
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request, e);
response.finish(500, null);
}
}

View File

@@ -259,7 +259,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return Utility.createAsyncHandler((v, a) -> {
finish(v);
}, (t, a) -> {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletionHandler", (Throwable) t);
context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletionHandler", (Throwable) t);
finish(500, null);
});
}
@@ -471,7 +471,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else if (obj instanceof CompletableFuture) {
((CompletableFuture) obj).whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletableFuture", (Throwable) e);
context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletableFuture", (Throwable) e);
finish(500, null);
return;
}
@@ -489,7 +489,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
try {
finish((File) obj);
} catch (IOException e) {
context.getLogger().log(Level.WARNING, "HttpServlet finish File occur, forece to close channel. request = " + getRequest(), e);
context.getLogger().log(Level.WARNING, "HttpServlet finish File occur, force to close channel. request = " + getRequest(), e);
finish(500, null);
}
} else if (obj instanceof HttpResult) {