readpending

This commit is contained in:
redkale
2024-10-09 22:46:32 +08:00
parent 0d84ef0541
commit 227e94b546
11 changed files with 27 additions and 20 deletions

View File

@@ -187,7 +187,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
}
channel.setReadBuffer(buffer.clear());
context.executeDispatch(request, response);
if (!response.readRegistered) {
if (request.readCompleted) {
channel.readRegister(this);
}
}

View File

@@ -46,8 +46,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected boolean inNonBlocking = true;
protected boolean readRegistered;
// 输出的结果对象
protected Object output;
@@ -141,7 +139,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected void prepare() {
inited = true;
inNonBlocking = true;
readRegistered = false;
request.prepare();
}
@@ -182,7 +179,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected void refuseAlive() {
this.request.keepAlive = false;
this.readRegistered = true;
}
protected void init(AsyncConnection channel) {
@@ -353,7 +349,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.responseConsumer.accept(this);
if (!request.readCompleted) {
conn.readRegister(conn.protocolCodec);
this.readRegistered = true;
}
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
@@ -362,7 +357,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
new ProtocolCodec(context, poolSupplier, poolConsumer, conn)
.response(this)
.run(null);
this.readRegistered = true;
request.readCompleted = false;
}
} else {
this.responseConsumer.accept(this);

View File

@@ -401,7 +401,7 @@ public class HttpRequest extends Request<HttpContext> {
this.keepAlive = false;
}
// readCompleted=true时ProtocolCodec会继续读下一个request
this.readCompleted = !this.boundary;
this.readCompleted = !this.boundary && !maybews;
this.bodyBytes.clear();
this.readState = READ_STATE_BODY;
}

View File

@@ -1037,10 +1037,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.responseConsumer.accept(this);
}
void skipReadRegistered() {
this.readRegistered = true;
}
/** 以304状态码输出 */
public void finish304() {
skipHeader();

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.util.Map;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
@@ -18,8 +19,10 @@ import org.redkale.convert.json.JsonConvert;
*/
public class WebSocketAction implements Serializable {
@ConvertColumn(index = 1)
protected String action;
@ConvertColumn(index = 2)
protected Map<String, String> attach;
public WebSocketAction() {}

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Objects;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
@@ -20,8 +21,10 @@ import org.redkale.convert.json.JsonConvert;
*/
public class WebSocketAddress implements Serializable {
@ConvertColumn(index = 1)
protected InetSocketAddress addr;
@ConvertColumn(index = 2)
protected String topic;
public WebSocketAddress() {}

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.redkale.convert.ConvertColumn;
import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.ByteArray;
@@ -65,10 +66,13 @@ public final class WebSocketPacket {
}
}
@ConvertColumn(index = 1)
protected FrameType type;
@ConvertColumn(index = 2)
protected byte[] payload;
@ConvertColumn(index = 3)
protected boolean last = true;
public WebSocketPacket() {}

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.util.Map;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
@@ -18,8 +19,10 @@ import org.redkale.convert.json.JsonConvert;
*/
public class WebSocketRange implements Serializable {
@ConvertColumn(index = 1)
protected String wskey;
@ConvertColumn(index = 2)
protected Map<String, String> attach;
public WebSocketRange() {}

View File

@@ -287,7 +287,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
webSocket.inflater = new Inflater(true);
}
response.skipReadRegistered();
initRestWebSocket(webSocket);
CompletableFuture<String> sessionFuture = webSocket.onOpen(request);
if (sessionFuture == null) {

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Collection;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
@@ -41,10 +42,13 @@ public interface WebSocketUserAddress extends Serializable {
public static class SimpleWebSocketUserAddress implements WebSocketUserAddress {
@ConvertColumn(index = 1)
private Serializable userid;
@ConvertColumn(index = 2)
private WebSocketAddress address;
@ConvertColumn(index = 3)
private Collection<WebSocketAddress> addresses;
public SimpleWebSocketUserAddress() {}

View File

@@ -5,9 +5,6 @@
*/
package org.redkale.net.sncp;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import static org.redkale.asm.Opcodes.*;
import java.io.IOException;
import java.lang.reflect.*;
import java.nio.channels.CompletionHandler;
@@ -16,6 +13,8 @@ import java.util.concurrent.*;
import java.util.logging.Level;
import org.redkale.annotation.NonBlocking;
import org.redkale.asm.*;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.*;
import org.redkale.convert.pb.ProtobufFactory;
@@ -155,7 +154,7 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
return 1;
}
SncpServlet o = other;
int rs = 0;
int rs;
if (this.resourceType == null) {
rs = o.resourceType == null ? 0 : -1;
} else if (o.resourceType == null) {
@@ -489,7 +488,6 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
final Class serviceClass = service.getClass();
final String supDynName = SncpActionServlet.class.getName().replace('.', '/');
final String resourceTypeName = resourceType.getName().replace('.', '/');
final String serviceImpTypeName = serviceImplClass.getName().replace('.', '/');
final String convertName = Convert.class.getName().replace('.', '/');
final String uint128Desc = Type.getDescriptor(Uint128.class);
@@ -521,7 +519,6 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
if (newClazz == null) {
// -------------------------------------------------------------
ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
FieldVisitor fv;
MethodDebugVisitor mv;
cw.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null, supDynName, null);
@@ -761,7 +758,9 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
// do nothing
}
for (java.lang.reflect.Type t : originalParamTypes) {
if (t.toString().startsWith("java.lang.")) {
if (t == java.io.Serializable.class
|| t == java.io.Serializable[].class
|| t.toString().startsWith("java.lang.")) {
continue;
}
ProtobufFactory.root().loadDecoder(t);