优化ProtocolCodec.start

This commit is contained in:
redkale
2023-05-27 22:10:50 +08:00
parent 2202465e31
commit 29d45b1e3d

View File

@@ -1,234 +1,234 @@
/* /*
* To change this license header, choose License Headers in Project Properties. * To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates * To change this template file, choose Tools | Templates
* and open the template in the editor. * and open the template in the editor.
*/ */
package org.redkale.net; package org.redkale.net;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level; import java.util.logging.Level;
/** /**
* 一个AsyncConnection绑定一个ProtocolCodec实例, 只会在读IOThread中运行 * 一个AsyncConnection绑定一个ProtocolCodec实例, 只会在读IOThread中运行
* *
* @author zhangjx * @author zhangjx
*/ */
class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> { class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
private final Context context; private final Context context;
private final Supplier<Response> responseSupplier; private final Supplier<Response> responseSupplier;
private final Consumer<Response> responseConsumer; private final Consumer<Response> responseConsumer;
private final ReadCompletionHandler readHandler = new ReadCompletionHandler(); private final ReadCompletionHandler readHandler = new ReadCompletionHandler();
private AsyncConnection channel; private AsyncConnection channel;
private Response resp; private Response resp;
public ProtocolCodec(Context context, Supplier<Response> responseSupplier, public ProtocolCodec(Context context, Supplier<Response> responseSupplier,
Consumer<Response> responseConsumer, AsyncConnection channel) { Consumer<Response> responseConsumer, AsyncConnection channel) {
this.context = context; this.context = context;
this.channel = channel; this.channel = channel;
this.responseSupplier = responseSupplier; this.responseSupplier = responseSupplier;
this.responseConsumer = responseConsumer; this.responseConsumer = responseConsumer;
} }
public void prepare() { public void prepare() {
} }
protected boolean recycle() { protected boolean recycle() {
this.channel = null; this.channel = null;
this.resp = null; this.resp = null;
return true; return true;
} }
public ProtocolCodec response(Response resp) { public ProtocolCodec response(Response resp) {
this.resp = resp; this.resp = resp;
return this; return this;
} }
private Response createResponse() { private Response createResponse() {
Response response = resp; Response response = resp;
if (response == null) { if (response == null) {
response = responseSupplier.get(); response = responseSupplier.get();
} else { } else {
response.prepare(); response.prepare();
} }
response.responseSupplier = responseSupplier; response.responseSupplier = responseSupplier;
response.responseConsumer = responseConsumer; response.responseConsumer = responseConsumer;
return response; return response;
} }
@Override @Override
public void completed(Integer count, ByteBuffer buffer) { public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) { if (count < 1) {
channel.offerReadBuffer(buffer); channel.offerReadBuffer(buffer);
channel.dispose(); // response.init(channel); 在调用之前异常 channel.dispose(); // response.init(channel); 在调用之前异常
return; return;
} }
// { //测试 // { //测试
// buffer.flip(); // buffer.flip();
// byte[] bs = new byte[buffer.remaining()]; // byte[] bs = new byte[buffer.remaining()];
// buffer.get(bs); // buffer.get(bs);
// System.println(new String(bs)); // System.println(new String(bs));
// } // }
buffer.flip(); buffer.flip();
final Response response = createResponse(); final Response response = createResponse();
try { try {
decode(buffer, response, 0, null); decode(buffer, response, 0, null);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.error(t); response.error(t);
} }
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer buffer) { public void failed(Throwable exc, ByteBuffer buffer) {
channel.offerReadBuffer(buffer); channel.offerReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常 channel.dispose();// response.init(channel); 在调用之前异常
if (exc != null && context.logger.isLoggable(Level.FINEST) if (exc != null && context.logger.isLoggable(Level.FINEST)
&& !(exc instanceof SocketException && "Connection reset".equals(exc.getMessage()))) { && !(exc instanceof SocketException && "Connection reset".equals(exc.getMessage()))) {
context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, force to close channel ", exc); context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, force to close channel ", exc);
} }
} }
public void start(final ByteBuffer data) { public void start(final ByteBuffer data) {
if (data != null) { //pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 if (data != null) { //pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
final Response response = createResponse(); final Response response = createResponse();
try { try {
decode(data, response, 0, null); decode(data, response, 0, null);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.error(t); response.error(t);
} }
return; return;
} }
try { try {
channel.startRead(this); channel.readRegisterInIOThread(this);
} catch (Exception te) { } catch (Exception te) {
channel.dispose();// response.init(channel); 在调用之前异常 channel.dispose();// response.init(channel); 在调用之前异常
if (context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet start read channel erroneous, force to close channel ", te); context.logger.log(Level.FINEST, "Servlet start read channel erroneous, force to close channel ", te);
} }
} }
} }
public void run(final ByteBuffer data) { public void run(final ByteBuffer data) {
if (data != null) { //pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 if (data != null) { //pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
final Response response = createResponse(); final Response response = createResponse();
try { try {
decode(data, response, 0, null); decode(data, response, 0, null);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.error(t); response.error(t);
} }
return; return;
} }
try { try {
channel.read(this); channel.read(this);
} catch (Exception te) { } catch (Exception te) {
channel.dispose();// response.init(channel); 在调用之前异常 channel.dispose();// response.init(channel); 在调用之前异常
if (context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet run read channel erroneous, force to close channel ", te); context.logger.log(Level.FINEST, "Servlet run read channel erroneous, force to close channel ", te);
} }
} }
} }
protected void decode(final ByteBuffer buffer, final Response response, final int pipelineIndex, final Request lastReq) { protected void decode(final ByteBuffer buffer, final Response response, final int pipelineIndex, final Request lastReq) {
response.init(channel); response.init(channel);
final Request request = response.request; final Request request = response.request;
final int rs = request.readHeader(buffer, lastReq); final int rs = request.readHeader(buffer, lastReq);
if (rs < 0) { //表示数据格式不正确 if (rs < 0) { //表示数据格式不正确
final DispatcherServlet dispatcher = context.dispatcher; final DispatcherServlet dispatcher = context.dispatcher;
dispatcher.incrExecuteCounter(); dispatcher.incrExecuteCounter();
channel.offerReadBuffer(buffer); channel.offerReadBuffer(buffer);
if (rs != Integer.MIN_VALUE) { if (rs != Integer.MIN_VALUE) {
dispatcher.incrIllegalRequestCounter(); dispatcher.incrIllegalRequestCounter();
} }
response.error(null); response.error(null);
if (context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel ");
} }
} else if (rs == 0) { } else if (rs == 0) {
context.dispatcher.incrExecuteCounter(); context.dispatcher.incrExecuteCounter();
int pindex = pipelineIndex; int pindex = pipelineIndex;
boolean pipeline = false; boolean pipeline = false;
Request hreq = lastReq; Request hreq = lastReq;
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
pipeline = true; pipeline = true;
if (pindex == 0) { if (pindex == 0) {
pindex++; pindex++;
} }
if (request.getRequestid() == null) { //存在requestid则无视pipeline模式 if (request.getRequestid() == null) { //存在requestid则无视pipeline模式
request.pipeline(pindex, pindex + 1); request.pipeline(pindex, pindex + 1);
} }
if (hreq == null) { if (hreq == null) {
hreq = request.copyHeader(); hreq = request.copyHeader();
} }
} else { } else {
if (request.getRequestid() == null) { //存在requestid则无视pipeline模式 if (request.getRequestid() == null) { //存在requestid则无视pipeline模式
request.pipeline(pindex, pindex); request.pipeline(pindex, pindex);
} }
channel.setReadBuffer(buffer.clear()); channel.setReadBuffer(buffer.clear());
} }
context.executeDispatch(request, response); context.executeDispatch(request, response);
if (pipeline) { if (pipeline) {
final Response pipelineResponse = createResponse(); final Response pipelineResponse = createResponse();
try { try {
decode(buffer, pipelineResponse, pindex + 1, hreq); decode(buffer, pipelineResponse, pindex + 1, hreq);
} catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t); context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t);
pipelineResponse.error(t); pipelineResponse.error(t);
} }
} }
} else { } else {
channel.setReadBuffer(buffer); channel.setReadBuffer(buffer);
channel.read(readHandler.prepare(request, response, pipelineIndex, lastReq)); channel.read(readHandler.prepare(request, response, pipelineIndex, lastReq));
} }
} }
private class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private Request request; private Request request;
private Response response; private Response response;
private int pipelineIndex; private int pipelineIndex;
private Request lastReq; private Request lastReq;
public ReadCompletionHandler prepare(Request request, Response response, int pipelineIndex, Request lastReq) { public ReadCompletionHandler prepare(Request request, Response response, int pipelineIndex, Request lastReq) {
this.request = request; this.request = request;
this.response = response; this.response = response;
this.pipelineIndex = pipelineIndex; this.pipelineIndex = pipelineIndex;
this.lastReq = lastReq; this.lastReq = lastReq;
return this; return this;
} }
@Override @Override
public void completed(Integer count, ByteBuffer attachment) { public void completed(Integer count, ByteBuffer attachment) {
if (count < 1) { if (count < 1) {
channel.offerReadBuffer(attachment); channel.offerReadBuffer(attachment);
channel.dispose(); channel.dispose();
return; return;
} }
attachment.flip(); attachment.flip();
decode(attachment, response, pipelineIndex, lastReq); decode(attachment, response, pipelineIndex, lastReq);
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
context.dispatcher.incrIllegalRequestCounter(); context.dispatcher.incrIllegalRequestCounter();
channel.offerReadBuffer(attachment); channel.offerReadBuffer(attachment);
response.error(exc); response.error(exc);
if (exc != null) { if (exc != null) {
request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc); request.context.logger.log(Level.FINER, "Servlet continue read channel erroneous, force to close channel ", exc);
} }
} }
} }
} }