移除ChannelContext

This commit is contained in:
redkale
2023-02-06 19:08:20 +08:00
parent 0aa8e8653c
commit 5df83b2649
25 changed files with 130 additions and 213 deletions

View File

@@ -111,7 +111,7 @@ public class ArrayEncoder<T> implements Encodeable<Writer, T[]> {
final Type comp = this.componentType; final Type comp = this.componentType;
for (int i = 0;; i++) { for (int i = 0;; i++) {
Object v = value[i]; Object v = value[i];
writeMemberValue(out, member, ((v != null && (v.getClass() == comp || out.specify() == comp)) ? itemEncoder : anyEncoder), v, i); writeMemberValue(out, member, ((v != null && (v.getClass() == comp || out.specificObjectType() == comp)) ? itemEncoder : anyEncoder), v, i);
if (i == iMax) { if (i == iMax) {
break; break;
} }

View File

@@ -279,7 +279,7 @@ public class ObjectEncoder<W extends Writer, T> implements Encodeable<W, T> {
lock.unlock(); lock.unlock();
} }
} }
if (value.getClass() != this.typeClass && !this.type.equals(out.specify())) { if (value.getClass() != this.typeClass && !this.type.equals(out.specificObjectType())) {
final Class clz = value.getClass(); final Class clz = value.getClass();
if (out.needWriteClassName()) { if (out.needWriteClassName()) {
out.writeClassName(factory.getEntityAlias(clz)); out.writeClassName(factory.getEntityAlias(clz));

View File

@@ -23,7 +23,7 @@ public abstract class Writer {
protected boolean comma; protected boolean comma;
//convertTo时是否以指定Type的ObjectEncoder进行处理 //convertTo时是否以指定Type的ObjectEncoder进行处理
protected Type specify; protected Type specificObjectType;
//对某个key值进行动态处理仅供MapEncoder使用 //对某个key值进行动态处理仅供MapEncoder使用
protected BiFunction<Object, Object, Object> mapFieldFunc; protected BiFunction<Object, Object, Object> mapFieldFunc;
@@ -35,32 +35,36 @@ public abstract class Writer {
protected Function<Object, ConvertField[]> objExtFunc; protected Function<Object, ConvertField[]> objExtFunc;
/** /**
* 设置specify * 设置specificObjectType
* *
* @param value Type * @param value Type
*/ */
public void specify(Type value) { public void specificObjectType(Type value) {
if (value instanceof GenericArrayType) { if (value instanceof GenericArrayType) {
this.specify = ((GenericArrayType) value).getGenericComponentType(); this.specificObjectType = ((GenericArrayType) value).getGenericComponentType();
} else if (value instanceof Class && ((Class) value).isArray()) { } else if (value instanceof Class && ((Class) value).isArray()) {
this.specify = ((Class) value).getComponentType(); this.specificObjectType = ((Class) value).getComponentType();
} else { } else {
this.specify = value; this.specificObjectType = value;
} }
} }
protected boolean recycle() { protected boolean recycle() {
this.comma = false;
this.specificObjectType = null;
this.mapFieldFunc = null;
this.objFieldFunc = null; this.objFieldFunc = null;
this.objExtFunc = null;
return true; return true;
} }
/** /**
* 返回specify * 返回specificObjectType
* *
* @return int * @return int
*/ */
public Type specify() { public Type specificObjectType() {
return this.specify; return this.specificObjectType;
} }
/** /**
@@ -127,12 +131,18 @@ public abstract class Writer {
} else { } else {
value = objFieldFunc.apply(member.attribute, obj); value = objFieldFunc.apply(member.attribute, obj);
} }
if (value == null) return; if (value == null) {
return;
}
if (tiny()) { if (tiny()) {
if (member.string) { if (member.string) {
if (((CharSequence) value).length() == 0) return; if (((CharSequence) value).length() == 0) {
return;
}
} else if (member.bool) { } else if (member.bool) {
if (!((Boolean) value)) return; if (!((Boolean) value)) {
return;
}
} }
} }
Attribute attr = member.getAttribute(); Attribute attr = member.getAttribute();
@@ -153,14 +163,22 @@ public abstract class Writer {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void writeObjectField(final String fieldName, Type fieldType, int fieldPos, Encodeable anyEncoder, Object value) { public void writeObjectField(final String fieldName, Type fieldType, int fieldPos, Encodeable anyEncoder, Object value) {
if (value == null) return; if (value == null) {
if (fieldType == null) fieldType = value.getClass(); return;
}
if (fieldType == null) {
fieldType = value.getClass();
}
if (tiny() && fieldType instanceof Class) { if (tiny() && fieldType instanceof Class) {
Class clazz = (Class) fieldType; Class clazz = (Class) fieldType;
if (CharSequence.class.isAssignableFrom(clazz)) { if (CharSequence.class.isAssignableFrom(clazz)) {
if (((CharSequence) value).length() == 0) return; if (((CharSequence) value).length() == 0) {
return;
}
} else if (clazz == boolean.class || clazz == Boolean.class) { } else if (clazz == boolean.class || clazz == Boolean.class) {
if (!((Boolean) value)) return; if (!((Boolean) value)) {
return;
}
} }
} }
this.writeFieldName(null, fieldName, fieldType, fieldPos); this.writeFieldName(null, fieldName, fieldType, fieldPos);

View File

@@ -139,7 +139,7 @@ public class BsonByteBufferWriter extends BsonWriter {
protected boolean recycle() { protected boolean recycle() {
super.recycle(); super.recycle();
this.index = 0; this.index = 0;
this.specify = null; this.specificObjectType = null;
this.buffers = null; this.buffers = null;
return false; return false;
} }

View File

@@ -153,7 +153,7 @@ public class BsonWriter extends Writer implements ByteTuple {
protected boolean recycle() { protected boolean recycle() {
super.recycle(); super.recycle();
this.count = 0; this.count = 0;
this.specify = null; this.specificObjectType = null;
if (this.content != null && this.content.length > defaultSize) { if (this.content != null && this.content.length > defaultSize) {
this.content = new byte[defaultSize]; this.content = new byte[defaultSize];
} }

View File

@@ -42,10 +42,10 @@ public class JsonByteBufferReader extends JsonReader {
@Override @Override
protected boolean recycle() { protected boolean recycle() {
super.recycle(); // this.position 初始化值为-1 super.recycle(); // this.position 初始化值为-1
this.currentIndex = 0;
this.currentChar = 0; this.currentChar = 0;
this.currentBuffer = null;
this.buffers = null; this.buffers = null;
this.currentIndex = 0;
this.currentBuffer = null;
this.mask = null; this.mask = null;
return false; return false;
} }

View File

@@ -53,10 +53,9 @@ public class JsonByteBufferWriter extends JsonWriter {
@Override @Override
protected boolean recycle() { protected boolean recycle() {
super.recycle(); super.recycle();
this.index = 0;
this.specify = null;
this.charset = null; this.charset = null;
this.buffers = null; this.buffers = null;
this.index = 0;
return false; return false;
} }

View File

@@ -72,6 +72,16 @@ public class JsonBytesWriter extends JsonWriter implements ByteTuple {
return newdata; return newdata;
} }
@Override
public boolean recycle() {
super.recycle();
this.count = 0;
if (this.content != null && this.content.length > defaultSize * 100) {
this.content = new byte[defaultSize];
}
return true;
}
@Override @Override
public byte[] content() { public byte[] content() {
return content; return content;
@@ -333,17 +343,6 @@ public class JsonBytesWriter extends JsonWriter implements ByteTuple {
return this; return this;
} }
@Override
public boolean recycle() {
super.recycle();
this.count = 0;
this.specify = null;
if (this.content != null && this.content.length > defaultSize * 100) {
this.content = new byte[defaultSize];
}
return true;
}
/** /**
* 直接获取全部数据, 实际数据需要根据count长度来截取 * 直接获取全部数据, 实际数据需要根据count长度来截取
* *

View File

@@ -293,7 +293,7 @@ public class JsonCharsWriter extends JsonWriter {
protected boolean recycle() { protected boolean recycle() {
super.recycle(); super.recycle();
this.count = 0; this.count = 0;
this.specify = null; this.specificObjectType = null;
if (this.content != null && this.content.length > defaultSize) { if (this.content != null && this.content.length > defaultSize) {
this.content = new char[defaultSize]; this.content = new char[defaultSize];
} }

View File

@@ -304,7 +304,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
@@ -336,7 +336,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
@@ -362,7 +362,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
} }
@@ -386,7 +386,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
} }
@@ -415,7 +415,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
} }
@@ -444,7 +444,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (value == null) { if (value == null) {
out.writeNull(); out.writeNull();
} else { } else {
out.specify(type); out.specificObjectType(type);
factory.loadEncoder(type).convertTo(out, value); factory.loadEncoder(type).convertTo(out, value);
} }
return out.toBuffers(); return out.toBuffers();
@@ -462,7 +462,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
} }
@@ -482,7 +482,7 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
this.lastConvertEncodeable = encoder; this.lastConvertEncodeable = encoder;
} }
if (encoder.specifyable()) { if (encoder.specifyable()) {
writer.specify(type); writer.specificObjectType(type);
} }
encoder.convertTo(writer, value); encoder.convertTo(writer, value);
} }

View File

@@ -106,7 +106,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
} }
if (userid != null) { if (userid != null) {
clientHeaders.put(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid);
} }
if (headers != null) { if (headers != null) {
headers.forEach((n, v) -> { headers.forEach((n, v) -> {
@@ -186,7 +186,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
} }
if (userid != null) { if (userid != null) {
clientHeaders.put(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid);
} }
if (headers != null) { if (headers != null) {
boolean ws = headers.containsKey("Sec-WebSocket-Key"); boolean ws = headers.containsKey("Sec-WebSocket-Key");
@@ -265,7 +265,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); // if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); // if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); // if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); // if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid);
// if (headers != null) headers.forEach((n, v) -> { // if (headers != null) headers.forEach((n, v) -> {
// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); // if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
// }); // });
@@ -313,7 +313,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); // if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); // if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); // if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); // if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid);
// if (headers != null) headers.forEach((n, v) -> { // if (headers != null) headers.forEach((n, v) -> {
// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); // if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
// }); // });

View File

@@ -26,7 +26,7 @@ import org.redkale.util.*;
* *
* @author zhangjx * @author zhangjx
*/ */
public abstract class AsyncConnection implements ChannelContext, Channel, AutoCloseable { public abstract class AsyncConnection implements Channel, AutoCloseable {
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
@@ -814,7 +814,6 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
this.subobject = value; this.subobject = value;
} }
@Override
public void setAttribute(String name, Object value) { public void setAttribute(String name, Object value) {
if (this.attributes == null) { if (this.attributes == null) {
this.attributes = new HashMap<>(); this.attributes = new HashMap<>();
@@ -822,25 +821,21 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
this.attributes.put(name, value); this.attributes.put(name, value);
} }
@Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final <T> T getAttribute(String name) { public final <T> T getAttribute(String name) {
return (T) (this.attributes == null ? null : this.attributes.get(name)); return (T) (this.attributes == null ? null : this.attributes.get(name));
} }
@Override
public final void removeAttribute(String name) { public final void removeAttribute(String name) {
if (this.attributes != null) { if (this.attributes != null) {
this.attributes.remove(name); this.attributes.remove(name);
} }
} }
@Override
public final Map<String, Object> getAttributes() { public final Map<String, Object> getAttributes() {
return this.attributes; return this.attributes;
} }
@Override
public final void clearAttribute() { public final void clearAttribute() {
if (this.attributes != null) { if (this.attributes != null) {
this.attributes.clear(); this.attributes.clear();

View File

@@ -1,31 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.util.*;
/**
* 当前一个Request绑定的AsyncConnection 类似Session但概念上不同于sessionid对应的对象
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @since 2.4.0
*/
public interface ChannelContext {
public void setAttribute(String name, Object value);
@SuppressWarnings("unchecked")
public <T> T getAttribute(String name);
public void removeAttribute(String name);
public Map<String, Object> getAttributes();
public void clearAttribute();
}

View File

@@ -151,10 +151,6 @@ public abstract class Request<C extends Context> {
return attributes; return attributes;
} }
public ChannelContext getChannelContext() {
return channel;
}
public C getContext() { public C getContext() {
return this.context; return this.context;
} }

View File

@@ -206,28 +206,14 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread(); request.workThread = WorkThread.currWorkThread();
} }
return connect(null).thenCompose(conn -> writeChannel(conn, request)); return connect().thenCompose(conn -> writeChannel(conn, request));
} }
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) { public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread(); request.workThread = WorkThread.currWorkThread();
} }
return connect(null).thenCompose(conn -> writeChannel(conn, request, respTransfer)); return connect().thenCompose(conn -> writeChannel(conn, request, respTransfer));
}
public final CompletableFuture<P> sendAsync(ChannelContext context, R request) {
if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread();
}
return connect(context).thenCompose(conn -> writeChannel(conn, request));
}
public final <T> CompletableFuture<T> sendAsync(ChannelContext context, R request, Function<P, T> respTransfer) {
if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread();
}
return connect(context).thenCompose(conn -> writeChannel(conn, request, respTransfer));
} }
protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) { protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) {
@@ -239,38 +225,16 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
protected CompletableFuture<C> connect() { protected CompletableFuture<C> connect() {
return connect(null);
}
protected CompletableFuture<C> connect(final ChannelContext context) {
final boolean cflag = context != null && connectionContextName != null;
if (cflag) {
C cc = context.getAttribute(connectionContextName);
if (cc != null && cc.isOpen()) {
return CompletableFuture.completedFuture(cc);
}
}
final int size = this.connArray.length; final int size = this.connArray.length;
int connIndex = (int) Math.abs(connIndexSeq.getAndIncrement()) % size; final int connIndex = (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
// WorkThread workThread = WorkThread.currWorkThread();
// if (workThread != null && workThread.threads() == size) {
// connIndex = workThread.index();
// } else {
// connIndex = (int) Math.abs(Thread.currentThread().getId() % size);
// }
// if (connIndex >= 0) {
C cc = (C) this.connArray[connIndex]; C cc = (C) this.connArray[connIndex];
if (cc != null && cc.isOpen()) { if (cc != null && cc.isOpen()) {
if (cflag) {
context.setAttribute(connectionContextName, cc);
}
return CompletableFuture.completedFuture(cc); return CompletableFuture.completedFuture(cc);
} }
final int index = connIndex; final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[index]; if (this.connOpenStates[connIndex].compareAndSet(false, true)) {
if (this.connOpenStates[index].compareAndSet(false, true)) {
CompletableFuture<C> future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(index, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
@@ -282,18 +246,15 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
return future.thenApply(c -> { return future.thenApply(c -> {
c.setAuthenticated(true); c.setAuthenticated(true);
this.connArray[index] = c; this.connArray[connIndex] = c;
CompletableFuture<C> f; CompletableFuture<C> f;
if (cflag) {
context.setAttribute(connectionContextName, c);
}
while ((f = waitQueue.poll()) != null) { while ((f = waitQueue.poll()) != null) {
f.complete(c); f.complete(c);
} }
return c; return c;
}).whenComplete((r, t) -> { }).whenComplete((r, t) -> {
if (t != null) { if (t != null) {
this.connOpenStates[index].set(false); this.connOpenStates[connIndex].set(false);
} }
}); });
} else { } else {
@@ -327,13 +288,9 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
// if (minRunningConn != null) { // && minRunningConn.runningCount() < maxPipelines // if (minRunningConn != null) { // && minRunningConn.runningCount() < maxPipelines
// return CompletableFuture.completedFuture(minRunningConn); // return CompletableFuture.completedFuture(minRunningConn);
// } // }
// return waitClientConnection(); // CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS);
} // connAcquireWaitings[connSeqno.getAndIncrement() % this.connLimit].offer(rs);
// return rs;
protected CompletableFuture<C> waitClientConnection() {
CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS);
connAcquireWaitings[connSeqno.getAndIncrement() % this.connLimit].offer(rs);
return rs;
} }
protected long getRespWaitingCount() { protected long getRespWaitingCount() {

View File

@@ -121,19 +121,33 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
respFuture.cancelTimeout(); respFuture.cancelTimeout();
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
connection.preComplete(message, (R) request, exc); connection.preComplete(message, (R) request, exc);
boolean reqInIO = workThread != null && workThread.inIO();
if (workThread == null || workThread.getWorkExecutor() == null) { if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = connection.channel.getReadIOThread(); workThread = connection.channel.getReadIOThread();
} }
if (exc != null) { if (exc != null) {
if (reqInIO) { //request在IO线程中发送请求说明request是在异步模式中
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(exc);
} else {
workThread.runWork(() -> { workThread.runWork(() -> {
if (request != null) { if (request != null) {
Traces.currTraceid(request.traceid); Traces.currTraceid(request.traceid);
} }
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
}); });
}
} else { } else {
final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message); final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message);
if (reqInIO) { //request在IO线程中发送请求说明request是在异步模式中
if (request != null) {
Traces.currTraceid(request.traceid);
}
((ClientFuture) respFuture).complete(rs);
} else {
workThread.runWork(() -> { workThread.runWork(() -> {
if (request != null) { if (request != null) {
Traces.currTraceid(request.traceid); Traces.currTraceid(request.traceid);
@@ -141,8 +155,9 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
((ClientFuture) respFuture).complete(rs); ((ClientFuture) respFuture).complete(rs);
}); });
} }
}
} catch (Throwable t) { } catch (Throwable t) {
if (workThread == null) { if (workThread == null || workThread.inIO()) {
if (request != null) { if (request != null) {
Traces.currTraceid(request.traceid); Traces.currTraceid(request.traceid);
} }

View File

@@ -41,12 +41,11 @@ public class HttpContext extends Context {
protected final AnyValue rpcAuthenticatorConfig; protected final AnyValue rpcAuthenticatorConfig;
//所有Servlet方法都不需要读取http-headerlazyHeaders=true //所有Servlet方法都不需要读取http-header且不存在HttpFilter的情况下lazyHeaders=true
protected boolean lazyHeaders; //存在动态改值 protected boolean lazyHeaders; //存在动态改值
Function<WebSocket, WebSocketWriteIOThread> webSocketWriterIOThreadFunc; Function<WebSocket, WebSocketWriteIOThread> webSocketWriterIOThreadFunc;
// protected RequestURINode[] uriCacheNodes;
public HttpContext(HttpContextConfig config) { public HttpContext(HttpContextConfig config) {
super(config); super(config);
this.remoteAddrHeader = config.remoteAddrHeader; this.remoteAddrHeader = config.remoteAddrHeader;
@@ -57,19 +56,6 @@ public class HttpContext extends Context {
random.setSeed(Math.abs(System.nanoTime())); random.setSeed(Math.abs(System.nanoTime()));
} }
// protected RequestURINode[] getUriCacheNodes() {
// return uriCacheNodes;
// }
//
// protected void addRequestURINode(String path) {
// RequestURINode node = new RequestURINode(path);
// if (this.uriCacheNodes != null) {
// for (int i = 0; i < uriCacheNodes.length; i++) {
// if (uriCacheNodes[i].path.equals(path)) return;
// }
// }
// this.uriCacheNodes = Utility.append(this.uriCacheNodes, node);
// }
@Override @Override
protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) { protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) {
super.updateReadIOThread(conn, ioReadThread); super.updateReadIOThread(conn, ioReadThread);

View File

@@ -12,7 +12,8 @@ import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.redkale.net.DispatcherServlet; import org.redkale.net.*;
import org.redkale.net.Filter;
import org.redkale.net.http.Rest.RestDynSourceType; import org.redkale.net.http.Rest.RestDynSourceType;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.AnyValue.DefaultAnyValue;
@@ -47,6 +48,7 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
protected HttpContext context; protected HttpContext context;
//所有Servlet方法都不需要读取http-header且不存在HttpFilter的情况下lazyHeaders=true
protected boolean lazyHeaders = true; protected boolean lazyHeaders = true;
private Map<String, BiPredicate<String, String>> forbidURIMaps; //禁用的URL的正则表达式, 必须与 forbidURIPredicates 保持一致 private Map<String, BiPredicate<String, String>> forbidURIMaps; //禁用的URL的正则表达式, 必须与 forbidURIPredicates 保持一致
@@ -363,6 +365,15 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
} }
} }
@Override
public void addFilter(Filter<HttpContext, HttpRequest, HttpResponse> filter, AnyValue conf) {
super.addFilter(filter, conf);
this.lazyHeaders = false;
if (context != null) {
context.lazyHeaders = this.lazyHeaders; //启动后运行过程中执行addFilter
}
}
/** /**
* 添加HttpServlet * 添加HttpServlet
* *
@@ -385,10 +396,10 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
} }
} }
} }
if (lazyHeaders && !Rest.isSimpleRestDyn(servlet)) { if (this.lazyHeaders && !Rest.isSimpleRestDyn(servlet)) {
lazyHeaders = false; this.lazyHeaders = false;
if (context != null) { if (context != null) {
context.lazyHeaders = false; //启动后运行过程中执行addServlet context.lazyHeaders = this.lazyHeaders; //启动后运行过程中执行addServlet
} }
} }
allMapLock.lock(); allMapLock.lock();

View File

@@ -227,10 +227,10 @@ public class HttpRequest extends Request<HttpContext> {
req.setBody(array.length() == 0 ? null : array.getBytes()); req.setBody(array.length() == 0 ? null : array.getBytes());
if (!getHeaders().isEmpty()) { if (!getHeaders().isEmpty()) {
if (headers.containsKey(Rest.REST_HEADER_RPC) if (headers.containsKey(Rest.REST_HEADER_RPC)
|| headers.containsKey(Rest.REST_HEADER_CURRUSERID_NAME)) { //外部request不能包含RPC的header信息 || headers.containsKey(Rest.REST_HEADER_CURRUSERID)) { //外部request不能包含RPC的header信息
req.setHeaders(new HashMap<>(headers)); req.setHeaders(new HashMap<>(headers));
req.removeHeader(Rest.REST_HEADER_RPC); req.removeHeader(Rest.REST_HEADER_RPC);
req.removeHeader(Rest.REST_HEADER_CURRUSERID_NAME); req.removeHeader(Rest.REST_HEADER_CURRUSERID);
} else { } else {
req.setHeaders(headers); req.setHeaders(headers);
} }
@@ -745,7 +745,7 @@ public class HttpRequest extends Request<HttpContext> {
this.rpc = "true".equalsIgnoreCase(value); this.rpc = "true".equalsIgnoreCase(value);
headers.put(name, value); headers.put(name, value);
break; break;
case Rest.REST_HEADER_CURRUSERID_NAME: case Rest.REST_HEADER_CURRUSERID:
value = bytes.toString(charset); value = bytes.toString(charset);
this.hashid = value.hashCode(); this.hashid = value.hashCode();
this.currentUserid = value; this.currentUserid = value;

View File

@@ -6,15 +6,12 @@
package org.redkale.net.http; package org.redkale.net.http;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Type;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.*; import java.util.function.*;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.persistence.Transient; import org.redkale.persistence.Transient;
import org.redkale.util.TypeToken;
/** /**
* HTTP输出引擎的对象域 <br> * HTTP输出引擎的对象域 <br>
@@ -44,9 +41,6 @@ public class HttpScope {
public static final Object NIL = new Object(); public static final Object NIL = new Object();
static final Type FUTRU_TYPE = new TypeToken<CompletableFuture<HttpScope>>() {
}.getType();
@ConvertColumn(index = 1) @ConvertColumn(index = 1)
protected String referid; protected String referid;

View File

@@ -38,16 +38,22 @@ import org.redkale.util.*;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final class Rest { public final class Rest {
//请求所需的RestService的资源名值类型: 字符串
public static final String REST_HEADER_RESOURCE_NAME = "rest-resource-name"; public static final String REST_HEADER_RESOURCE_NAME = "rest-resource-name";
//请求是否为rpc协议值类型: 布尔取值为true、false
public static final String REST_HEADER_RPC = "rest-rpc"; public static final String REST_HEADER_RPC = "rest-rpc";
public static final String REST_HEADER_CURRUSERID_NAME = "rest-curruserid-name"; //当前用户ID值值类型: 字符串
public static final String REST_HEADER_CURRUSERID = "rest-curruserid";
public static final String REST_HEADER_PARAM_FROM_BODY = "rest-paramfrombody"; //参数是否从body中获取值类型: 布尔取值为true、false
public static final String REST_HEADER_PARAM_FROM_BODY = "rest-param-from-body";
//请求参数的反序列化种类,值类型: 字符串取值为ConvertType枚举值名
public static final String REST_HEADER_REQ_CONVERT_TYPE = "rest-req-convert-type"; public static final String REST_HEADER_REQ_CONVERT_TYPE = "rest-req-convert-type";
//响应结果的序列化种类,值类型: 字符串取值为ConvertType枚举值名
public static final String REST_HEADER_RESP_CONVERT_TYPE = "rest-resp-convert-type"; public static final String REST_HEADER_RESP_CONVERT_TYPE = "rest-resp-convert-type";
static final String REST_TOSTRINGOBJ_FIELD_NAME = "_redkale_tostringsupplier"; static final String REST_TOSTRINGOBJ_FIELD_NAME = "_redkale_tostringsupplier";
@@ -574,10 +580,6 @@ public final class Rest {
MethodDebugVisitor.pushInt(mv, rws.wsmaxbody()); MethodDebugVisitor.pushInt(mv, rws.wsmaxbody());
mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I"); mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.mergemsg() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "mergemsg", "Z");
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");
@@ -1053,7 +1055,6 @@ public final class Rest {
final String httpScopeDesc = Type.getDescriptor(HttpScope.class); final String httpScopeDesc = Type.getDescriptor(HttpScope.class);
final String stageDesc = Type.getDescriptor(CompletionStage.class); final String stageDesc = Type.getDescriptor(CompletionStage.class);
final String flipperDesc = Type.getDescriptor(Flipper.class); final String flipperDesc = Type.getDescriptor(Flipper.class);
final String channelDesc = Type.getDescriptor(ChannelContext.class);
final String httpServletName = HttpServlet.class.getName().replace('.', '/'); final String httpServletName = HttpServlet.class.getName().replace('.', '/');
final String actionEntryName = HttpServlet.ActionEntry.class.getName().replace('.', '/'); final String actionEntryName = HttpServlet.ActionEntry.class.getName().replace('.', '/');
final String attrDesc = Type.getDescriptor(org.redkale.util.Attribute.class); final String attrDesc = Type.getDescriptor(org.redkale.util.Attribute.class);
@@ -1330,7 +1331,6 @@ public final class Rest {
} else if ("&".equals(pname) && ptype == userType) { //当前用户对象的类名 } else if ("&".equals(pname) && ptype == userType) { //当前用户对象的类名
} else if (ptype.isPrimitive()) { } else if (ptype.isPrimitive()) {
} else if (ptype == String.class) { } else if (ptype == String.class) {
} else if (ptype == ChannelContext.class) {
} else if (ptype == Flipper.class) { } else if (ptype == Flipper.class) {
} else { //其他Json对象 } else { //其他Json对象
//构建 RestHeader、RestCookie、RestAddress 等赋值操作 //构建 RestHeader、RestCookie、RestAddress 等赋值操作
@@ -1561,7 +1561,7 @@ public final class Rest {
av0.visit("value", Type.getType(Type.getDescriptor(serviceType))); av0.visit("value", Type.getType(Type.getDescriptor(serviceType)));
av0.visitEnd(); av0.visitEnd();
} }
boolean dynsimple = true; boolean dynsimple = baseServletType != HttpServlet.class; //有自定义的BaseServlet会存在读取header的操作
//获取所有可以转换成HttpMapping的方法 //获取所有可以转换成HttpMapping的方法
int methodidex = 0; int methodidex = 0;
final MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class); final MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class);
@@ -2729,11 +2729,6 @@ public final class Rest {
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, iscookie ? "getCookie" : (ishead ? "getHeader" : "getParameter"), "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;", false); mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, iscookie ? "getCookie" : (ishead ? "getHeader" : "getParameter"), "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;", false);
mv.visitVarInsn(ASTORE, maxLocals); mv.visitVarInsn(ASTORE, maxLocals);
varInsns.add(new int[]{ALOAD, maxLocals}); varInsns.add(new int[]{ALOAD, maxLocals});
} else if (ptype == ChannelContext.class) {
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getChannelContext", "()" + channelDesc, false);
mv.visitVarInsn(ASTORE, maxLocals);
varInsns.add(new int[]{ALOAD, maxLocals});
} else if (ptype == Flipper.class) { } else if (ptype == Flipper.class) {
mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getFlipper", "()" + flipperDesc, false); mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getFlipper", "()" + flipperDesc, false);

View File

@@ -2,9 +2,9 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*; import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*; import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/** /**
* 只能注解于Service类的方法的String参数或参数内的String字段 * 只能注解于Service类的方法的String参数或参数内的String字段
@@ -24,7 +24,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
public @interface RestLocale { public @interface RestLocale {
/** /**
* 备注描述, 对应&#64;HttpParam.comment * 备注描述
* *
* @return String * @return String
*/ */

View File

@@ -5,9 +5,9 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
/** /**
@@ -60,13 +60,6 @@ public @interface RestWebSocket {
*/ */
boolean anyuser() default false; boolean anyuser() default false;
/**
* 接收客户端的分包(last=false)消息时是否自动合并包
*
* @return 默认true
*/
boolean mergemsg() default true;
/** /**
* WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒 * WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒
* *

View File

@@ -76,14 +76,11 @@ public class WebSocketEngine {
@Comment("最大消息体长度, 小于1表示无限制") @Comment("最大消息体长度, 小于1表示无限制")
protected int wsMaxBody; protected int wsMaxBody;
@Comment("接收客户端的分包(last=false)消息时是否自动合并包")
protected boolean mergeMode = true;
@Comment("加密解密器") @Comment("加密解密器")
protected Cryptor cryptor; protected Cryptor cryptor;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveInterval, int wsMaxConns, protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveInterval,
int wsThreads, int wsMaxBody, boolean mergeMode, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { int wsMaxConns, int wsThreads, int wsMaxBody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.single = single; this.single = single;
this.context = context; this.context = context;
@@ -93,7 +90,6 @@ public class WebSocketEngine {
this.wsMaxConns = wsMaxConns; this.wsMaxConns = wsMaxConns;
this.wsThreads = wsThreads; this.wsThreads = wsThreads;
this.wsMaxBody = wsMaxBody; this.wsMaxBody = wsMaxBody;
this.mergeMode = mergeMode;
this.cryptor = cryptor; this.cryptor = cryptor;
this.logger = logger; this.logger = logger;
this.index = sequence.getAndIncrement(); this.index = sequence.getAndIncrement();

View File

@@ -60,9 +60,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Comment("最大消息体长度, 小于1表示无限制") @Comment("最大消息体长度, 小于1表示无限制")
public static final String WEBPARAM__WSMAXBODY = "wsmaxbody"; public static final String WEBPARAM__WSMAXBODY = "wsmaxbody";
@Comment("接收客户端的分包(last=false)消息时是否自动合并包")
public static final String WEBPARAM__WSMERGEMSG = "wsmergemsg";
@Comment("加密解密器") @Comment("加密解密器")
public static final String WEBPARAM__CRYPTOR = "cryptor"; public static final String WEBPARAM__CRYPTOR = "cryptor";
@@ -93,9 +90,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.anyuser //同RestWebSocket.anyuser
protected boolean anyuser = false; protected boolean anyuser = false;
//同RestWebSocket.mergemsg
protected boolean mergemsg = true;
//同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到 //同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到
protected Cryptor cryptor; protected Cryptor cryptor;
@@ -197,7 +191,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.webSocketNode, this.sendConvert, logger);
this.webSocketNode.init(conf); this.webSocketNode.init(conf);
this.webSocketNode.localEngine.init(conf); this.webSocketNode.localEngine.init(conf);