sncp优化

This commit is contained in:
redkale
2023-02-08 21:33:14 +08:00
parent 67cd0b1b46
commit 6622e88232
15 changed files with 555 additions and 291 deletions

View File

@@ -55,6 +55,14 @@ public abstract class Convert<R extends Reader, W extends Writer> {
public abstract boolean isBinary();
public abstract R pollReader();
public abstract void offerReader(final R reader);
public abstract W pollWriter();
public abstract void offerWriter(final W write);
public abstract <T> T convertFrom(final Type type, final byte[] bytes);
public abstract <T> T convertFrom(final Type type, final R reader);

View File

@@ -30,6 +30,13 @@ public abstract class Reader {
public static final short SIGN_NOLENBUTBYTES = -3; //目前只适合于protobuf的boolean[]...double[]类型
/**
* 设置Reader的内容通常结合对象池使用
*
* @param content 内容
*/
public abstract void prepare(byte[] content);
/**
* 是否还存在下个元素或字段 <br>
* 注意: 主要用于Array、Collection、Stream或Map等集合对象

View File

@@ -41,7 +41,9 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
private final ThreadLocal<BsonWriter> writerPool = ThreadLocal.withInitial(BsonWriter::new);
private final Consumer<BsonWriter> offerConsumer = w -> offerBsonWriter(w);
private final Consumer<BsonWriter> writerConsumer = w -> offerWriter(w);
private final ThreadLocal<BsonReader> readerPool = ThreadLocal.withInitial(BsonReader::new);
private final boolean tiny;
@@ -75,32 +77,44 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
}
//------------------------------ reader -----------------------------------------------------------
public BsonReader pollBsonReader(final ByteBuffer... buffers) {
public BsonReader pollReader(final ByteBuffer... buffers) {
return new BsonByteBufferReader((ConvertMask) null, buffers);
}
public BsonReader pollBsonReader(final InputStream in) {
public BsonReader pollReader(final InputStream in) {
return new BsonStreamReader(in);
}
public BsonReader pollBsonReader() {
return new BsonReader();
@Override
public BsonReader pollReader() {
BsonReader reader = readerPool.get();
if (reader == null) {
reader = new BsonReader();
} else {
readerPool.set(null);
}
return reader;
}
public void offerBsonReader(final BsonReader in) {
//无需回收
@Override
public void offerReader(final BsonReader in) {
if (in != null) {
in.recycle();
readerPool.set(in);
}
}
//------------------------------ writer -----------------------------------------------------------
public BsonByteBufferWriter pollBsonWriter(final Supplier<ByteBuffer> supplier) {
public BsonByteBufferWriter pollWriter(final Supplier<ByteBuffer> supplier) {
return configWrite(new BsonByteBufferWriter(tiny, supplier));
}
protected BsonWriter pollBsonWriter(final OutputStream out) {
protected BsonWriter pollWriter(final OutputStream out) {
return configWrite(new BsonStreamWriter(tiny, out));
}
public BsonWriter pollBsonWriter() {
@Override
public BsonWriter pollWriter() {
BsonWriter writer = writerPool.get();
if (writer == null) {
writer = new BsonWriter();
@@ -110,7 +124,8 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
return configWrite(writer.tiny(tiny));
}
public void offerBsonWriter(final BsonWriter out) {
@Override
public void offerWriter(final BsonWriter out) {
if (out != null) {
out.recycle();
writerPool.set(out);
@@ -179,10 +194,10 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
@Override
public byte[] convertTo(final Object value) {
if (value == null) {
final BsonWriter out = pollBsonWriter();
final BsonWriter out = pollWriter();
out.writeNull();
byte[] result = out.toArray();
offerBsonWriter(out);
offerWriter(out);
return result;
}
return convertTo(value.getClass(), value);
@@ -193,10 +208,10 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
if (type == null) {
return null;
}
final BsonWriter writer = pollBsonWriter();
final BsonWriter writer = pollWriter();
factory.loadEncoder(type).convertTo(writer, value);
byte[] result = writer.toArray();
offerBsonWriter(writer);
offerWriter(writer);
return result;
}
@@ -217,13 +232,13 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
@Override
public void convertToBytes(final Type type, final Object value, final ConvertBytesHandler handler) {
final BsonWriter writer = pollBsonWriter();
final BsonWriter writer = pollWriter();
if (type == null) {
writer.writeNull();
} else {
factory.loadEncoder(type).convertTo(writer, value);
}
writer.completed(handler, offerConsumer);
writer.completed(handler, writerConsumer);
}
@Override
@@ -244,9 +259,9 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
public void convertTo(final OutputStream out, final Object value) {
if (value == null) {
pollBsonWriter(out).writeNull();
pollWriter(out).writeNull();
} else {
factory.loadEncoder(value.getClass()).convertTo(pollBsonWriter(out), value);
factory.loadEncoder(value.getClass()).convertTo(pollWriter(out), value);
}
}
@@ -255,9 +270,9 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
return;
}
if (value == null) {
pollBsonWriter(out).writeNull();
pollWriter(out).writeNull();
} else {
factory.loadEncoder(type).convertTo(pollBsonWriter(out), value);
factory.loadEncoder(type).convertTo(pollWriter(out), value);
}
}
@@ -266,7 +281,7 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
if (supplier == null) {
return null;
}
BsonByteBufferWriter out = pollBsonWriter(supplier);
BsonByteBufferWriter out = pollWriter(supplier);
if (value == null) {
out.writeNull();
} else {
@@ -280,7 +295,7 @@ public class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
if (supplier == null || type == null) {
return null;
}
BsonByteBufferWriter writer = pollBsonWriter(supplier);
BsonByteBufferWriter writer = pollWriter(supplier);
if (value == null) {
writer.writeNull();
} else {

View File

@@ -55,6 +55,11 @@ public class BsonReader extends Reader {
setBytes(bytes, start, len);
}
@Override
public void prepare(byte[] bytes) {
setBytes(bytes);
}
public final BsonReader setBytes(byte[] bytes) {
if (bytes == null) {
this.position = 0;

View File

@@ -37,6 +37,8 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
private final Consumer<JsonBytesWriter> offerBytesConsumer = w -> offerJsonBytesWriter(w);
private final ThreadLocal<JsonReader> readerPool = ThreadLocal.withInitial(JsonReader::new);
private final boolean tiny;
private Encodeable lastConvertEncodeable;
@@ -81,6 +83,45 @@ public class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
};
}
@Override
public JsonReader pollReader() {
JsonReader reader = readerPool.get();
if (reader == null) {
reader = new JsonReader();
} else {
readerPool.set(null);
}
return reader;
}
@Override
public void offerReader(final JsonReader in) {
if (in != null) {
in.recycle();
readerPool.set(in);
}
}
@Override
public JsonWriter pollWriter() {
JsonBytesWriter writer = bytesWriterPool.get();
if (writer == null) {
writer = new JsonBytesWriter();
} else {
bytesWriterPool.set(null);
}
return configWrite((JsonBytesWriter) writer.tiny(tiny));
}
@Override
public void offerWriter(final JsonWriter writer) {
if (writer instanceof JsonBytesWriter) {
JsonBytesWriter bw = (JsonBytesWriter) writer;
bw.recycle();
bytesWriterPool.set(bw);
}
}
//------------------------------ writer -----------------------------------------------------------
private JsonBytesWriter pollJsonBytesWriter() {
JsonBytesWriter writer = bytesWriterPool.get();

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.convert.json;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.redkale.convert.*;
import static org.redkale.convert.Reader.*;
@@ -59,6 +60,13 @@ public class JsonReader extends Reader {
this.limit = start + len - 1;
}
@Override
public void prepare(byte[] bytes) {
if (bytes != null) {
setText(new String(bytes, StandardCharsets.UTF_8));
}
}
protected boolean recycle() {
this.position = -1;
this.limit = -1;

View File

@@ -161,25 +161,21 @@ public final class OldSncpClient {
//只给远程模式调用的
public <T> T remote(final int index, final Object... params) {
final SncpServiceAction action = actions[index];
final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null;
if (action.handlerFuncParamIndex >= 0) {
params[action.handlerFuncParamIndex] = null;
final CompletionHandler handlerFunc = action.paramHandlerIndex >= 0 ? (CompletionHandler) params[action.paramHandlerIndex] : null;
if (action.paramHandlerIndex >= 0) {
params[action.paramHandlerIndex] = null;
}
final BsonReader reader = bsonConvert.pollBsonReader();
final BsonReader reader = bsonConvert.pollReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥
CompletableFuture result = action.futureCreator.create();
if (action.returnFutureResultType != null) { //与handlerFuncIndex互斥
CompletableFuture result = (CompletableFuture) action.returnFutureCreator.create();
future.whenComplete((v, e) -> {
try {
if (e != null) {
result.completeExceptionally(e);
} else {
reader.setBytes(v);
byte i;
while ((i = reader.readByte()) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
reader.readByte(); //读掉0
Object rs = bsonConvert.convertFrom(Object.class, reader);
result.complete(rs);
@@ -187,7 +183,7 @@ public final class OldSncpClient {
} catch (Exception exp) {
result.completeExceptionally(exp);
} finally {
bsonConvert.offerBsonReader(reader);
bsonConvert.offerReader(reader);
}
}); //需要获取 Executor
return (T) result;
@@ -197,12 +193,8 @@ public final class OldSncpClient {
}
try {
reader.setBytes(future.get(5, TimeUnit.SECONDS));
byte i;
while ((i = reader.readByte()) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.returnObjectType, reader);
reader.readByte(); //读掉0
return bsonConvert.convertFrom(action.paramHandlerIndex >= 0 ? Object.class : action.returnObjectType, reader);
} catch (RpcRemoteException re) {
throw re;
} catch (TimeoutException e) {
@@ -210,21 +202,21 @@ public final class OldSncpClient {
} catch (InterruptedException | ExecutionException e) {
throw new RpcRemoteException(actions[index].method + " sncp remote error, params=" + JsonConvert.root().convertTo(params), e);
} finally {
bsonConvert.offerBsonReader(reader);
bsonConvert.offerReader(reader);
}
}
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpServiceAction action, final Object... params) {
final String traceid = Traces.currTraceid();
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) {
params[action.addressSourceParamIndex] = this.clientSncpAddress;
final Class[] myparamclass = action.paramClasses;
if (action.paramAddressSourceIndex >= 0) {
params[action.paramAddressSourceIndex] = this.clientSncpAddress;
}
if (bsonConvert == null) {
bsonConvert = BsonConvert.root();
}
final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入
final BsonWriter writer = bsonConvert.pollWriter(); // 将head写入
writer.writePlaceholderTo(HEADER_SIZE);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
BsonConvert bcc = bsonConvert;
@@ -242,7 +234,7 @@ public final class OldSncpClient {
if (messageAgent != null) { //MQ模式
final ByteArray reqbytes = writer.toByteArray();
fillHeader(reqbytes, action, seqid, traceid, reqBodyLength);
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
String targetTopic = action.paramTopicTargetIndex >= 0 ? (String) params[action.paramTopicTargetIndex] : this.topic;
if (targetTopic == null) {
targetTopic = this.topic;
}
@@ -272,7 +264,7 @@ public final class OldSncpClient {
return body;
});
}
final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
final SocketAddress addr = addr0 == null ? (action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : null) : addr0;
CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr);
return connFuture.thenCompose(conn0 -> {
final CompletableFuture<byte[]> future = new CompletableFuture();
@@ -356,7 +348,7 @@ public final class OldSncpClient {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
handler.failed(e, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e);
@@ -368,21 +360,17 @@ public final class OldSncpClient {
future.complete(this.body);
transport.offerConnection(false, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final BsonReader reader = bsonConvert.pollBsonReader();
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
final BsonReader reader = bsonConvert.pollReader();
try {
reader.setBytes(this.body);
int i;
while ((i = (reader.readByte() & 0xff)) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.returnObjectType, reader);
reader.readByte(); //读掉0
Object rs = bsonConvert.convertFrom(action.paramHandlerIndex >= 0 ? Object.class : action.returnObjectType, reader);
handler.completed(rs, handlerAttach);
} catch (Exception e) {
handler.failed(e, handlerAttach);
} finally {
bsonConvert.offerBsonReader(reader);
bsonConvert.offerReader(reader);
}
}
}
@@ -393,7 +381,7 @@ public final class OldSncpClient {
conn.offerReadBuffer(attachment2);
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed, params=" + JsonConvert.root().convertTo(params), exc);
@@ -406,7 +394,7 @@ public final class OldSncpClient {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed, params=" + JsonConvert.root().convertTo(params), exc);

View File

@@ -19,6 +19,7 @@ import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.Convert;
import org.redkale.mq.*;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.WebSocketNode;
@@ -150,8 +151,8 @@ public abstract class Sncp {
}
public static <T extends Service> SncpServiceInfo createSncpServiceInfo(String resourceName,
Class<T> resourceServiceType, T service, MessageAgent messageAgent, SncpMessageClient messageClient) {
return new SncpServiceInfo(resourceName, resourceServiceType, service, messageAgent, messageClient);
Class<T> resourceServiceType, T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) {
return new SncpServiceInfo(resourceName, resourceServiceType, service, convert, messageAgent, messageClient);
}
public static Uint128 actionid(final RpcAction action) {

View File

@@ -4,10 +4,15 @@
package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import org.redkale.annotation.Resource;
import org.redkale.convert.Convert;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.net.*;
import org.redkale.net.client.*;
import org.redkale.net.sncp.SncpServiceInfo.SncpServiceAction;
import org.redkale.util.Traces;
/**
* SNCP版Client
@@ -40,12 +45,68 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
return clientSncpAddress;
}
protected SncpClientConnection connect(SncpServiceInfo info) {
return null;
protected CompletableFuture<SncpClientConnection> connect(SncpServiceInfo info) {
return super.connect();
}
//只给远程模式调用的
public <T> T remote(final SncpServiceInfo info, final int index, final Object... params) {
final String traceid = Traces.currTraceid();
final Convert convert = info.convert;
final SncpServiceAction action = info.actions[index];
CompletionHandler callbackHandler = null;
Object callbackHandlerAttach = null;
if (action.paramHandlerIndex >= 0) {
callbackHandler = (CompletionHandler) params[action.paramHandlerIndex];
params[action.paramHandlerIndex] = null;
if (action.paramHandlerAttachIndex >= 0) {
callbackHandlerAttach = params[action.paramHandlerAttachIndex];
params[action.paramHandlerAttachIndex] = null;
}
}
final CompletableFuture<byte[]> future = remote(info, action, traceid, params);
if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler
final CompletionHandler handler = callbackHandler;
final Object attach = callbackHandlerAttach;
if (handler == null) { //传入的CompletionHandler参数为null
future.join();
} else {
future.whenComplete((v, t) -> {
if (t == null) {
handler.completed(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v), attach);
} else {
handler.failed(t, attach);
}
});
}
} else if (action.returnFutureClass != null) { //返回类型为CompletableFuture
if (action.returnFutureClass == CompletableFuture.class) {
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v));
} else {
final CompletableFuture stage = action.returnFutureCreator.create();
future.whenComplete((v, t) -> {
if (t == null) {
stage.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v));
} else {
stage.completeExceptionally(t);
}
});
return (T) stage;
}
} else if (action.returnObjectType != null) { //返回类型为JavaBean
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)).join();
} else { //返回类型为void
future.join();
}
return null;
}
protected CompletableFuture<byte[]> remote(
final SncpServiceInfo info,
final SncpServiceAction action,
final String traceid,
final Object... params) {
return null;
}
}

View File

@@ -9,7 +9,7 @@ import java.io.IOException;
import java.lang.reflect.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.redkale.annotation.NonBlocking;
@@ -138,11 +138,11 @@ public final class SncpDynServlet extends SncpServlet {
protected final java.lang.reflect.Type[] paramTypes; //第一个元素存放返回类型return type void的返回参数类型为null, 数组长度为:1+参数个数
protected final java.lang.reflect.Type returnObjectType; //返回结果的CompletableFuture的结果泛型类型
protected final java.lang.reflect.Type returnObjectType; //返回结果类型 void必须设为null
protected final int paramHandlerIndex; //>=0表示存在CompletionHandler参数
protected final Class<? extends CompletionHandler> paramHandlerType; //CompletionHandler参数的类型
protected final Class<? extends CompletionHandler> paramHandlerClass; //CompletionHandler参数的类型
protected final java.lang.reflect.Type paramHandlerResultType; //CompletionHandler.completed第一个参数的类型
@@ -155,21 +155,24 @@ public final class SncpDynServlet extends SncpServlet {
this.method = method;
int handlerFuncIndex = -1;
Class handlerFuncType = null;
Class handlerFuncClass = null;
java.lang.reflect.Type handlerResultType = null;
try {
final Class[] paramClasses = method.getParameterTypes();
for (int i = 0; i < paramClasses.length; i++) { //反序列化方法的每个参数
if (CompletionHandler.class.isAssignableFrom(paramClasses[i])) {
handlerFuncIndex = i;
handlerFuncType = paramClasses[i];
handlerFuncClass = paramClasses[i];
java.lang.reflect.Type handlerType = TypeToken.getGenericType(method.getTypeParameters()[i], service.getClass());
if (handlerType instanceof Class) {
handlerResultType = Object.class;
} else if (handlerType instanceof ParameterizedType) {
handlerResultType = TypeToken.getGenericType(((ParameterizedType) handlerType).getActualTypeArguments()[0], handlerType);
} else {
throw new SncpException(service.getClass() + " had unknown gGenericType in " + method);
throw new SncpException(service.getClass() + " had unknown genericType in " + method);
}
if (method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}
break;
}
@@ -183,10 +186,10 @@ public final class SncpDynServlet extends SncpServlet {
System.arraycopy(originalParamTypes, 0, types, 1, originalParamTypes.length);
this.paramTypes = types;
this.paramHandlerIndex = handlerFuncIndex;
this.paramHandlerType = handlerFuncType;
this.paramHandlerClass = handlerFuncClass;
this.paramHandlerResultType = handlerResultType;
this.returnObjectType = originalReturnType;
if (CompletionStage.class.isAssignableFrom(method.getReturnType())) {
this.returnObjectType = originalReturnType == void.class || originalReturnType == Void.class ? null : originalReturnType;
if (Future.class.isAssignableFrom(method.getReturnType())) {
java.lang.reflect.Type futureType = TypeToken.getGenericType(method.getGenericReturnType(), service.getClass());
java.lang.reflect.Type returnType = null;
if (futureType instanceof Class) {
@@ -204,13 +207,14 @@ public final class SncpDynServlet extends SncpServlet {
if (non == null) {
non = service.getClass().getAnnotation(NonBlocking.class);
}
//Future代替CompletionStage 不容易判断异步
this.nonBlocking = non == null ? (CompletionStage.class.isAssignableFrom(method.getReturnType()) || this.paramHandlerIndex >= 0) : false;
}
@Override
public final void execute(SncpRequest request, SncpResponse response) throws IOException {
if (paramHandlerIndex > 0) {
response.paramAsyncHandler(paramHandlerType, paramHandlerResultType);
response.paramAsyncHandler(paramHandlerClass, paramHandlerResultType);
}
try {
action(request, response);
@@ -242,87 +246,132 @@ public final class SncpDynServlet extends SncpServlet {
/**
* <blockquote><pre>
* public class TestService implements Service {
* public interface TestService extends Service {
*
* public boolean change(TestBean bean, String name, int id) {
* return false;
* }
* public boolean change(TestBean bean, String name, int id);
*
* public void insert(CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* }
* public void insert(BooleanHandler handler, TestBean bean, String name, int id);
*
* public void update(long show, short v2, CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* }
* public void update(long show, short v2, CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id);
*
* public CompletableFuture&#60;String&#62; changeName(TestBean bean, String name, int id);
*
* public CompletableFuture&#60;String&#62; changeName(TestBean bean, String name, int id) {
* return null;
* }
* }
*
* &#064;ResourceType(TestService.class)
* class TestServiceImpl implements TestService {
*
* class DynActionTestService_change extends SncpServletAction {
* &#064;Override
* public boolean change(TestBean bean, String name, int id) {
* return false;
* }
*
* public TestService service;
* &#064;Override
* public void insert(BooleanHandler handler, TestBean bean, String name, int id) {
* }
*
* &#064;Override
* public void action(BsonReader in, BsonWriter out, OldSncpHandler handler) throws Throwable {
* TestBean arg1 = convert.convertFrom(paramTypes[1], in);
* String arg2 = convert.convertFrom(paramTypes[2], in);
* int arg3 = convert.convertFrom(paramTypes[3], in);
* Object rs = service.change(arg1, arg2, arg3);
* _callParameter(out, arg1, arg2, arg3);
* convert.convertTo(out, paramTypes[0], rs);
* }
* &#064;Override
* public void update(long show, short v2, CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* }
*
* &#064;Override
* public CompletableFuture&#60;String&#62; changeName(TestBean bean, String name, int id) {
* return null;
* }
* }
*
* class DynActionTestService_insert extends SncpServletAction {
* class BooleanHandler implements CompletionHandler&#60;Boolean, TestBean&#62; {
*
* public TestService service;
* &#064;Override
* public void completed(Boolean result, TestBean attachment) {
* }
*
* &#064;Override
* public void failed(Throwable exc, TestBean attachment) {
* }
*
* &#064;Override
* public void action(BsonReader in, BsonWriter out, OldSncpHandler handler) throws Throwable {
* OldSncpHandler arg0 = handler;
* convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[2], in);
* String arg2 = convert.convertFrom(paramTypes[3], in);
* int arg3 = convert.convertFrom(paramTypes[4], in);
* handler.sncp_setParams(arg0, arg1, arg2, arg3);
* service.insert(arg0, arg1, arg2, arg3);
* }
* }
*
* class DynActionTestService_update extends SncpServletAction {
* class DynActionTestService_change extends SncpActionServlet {
*
* public TestService service;
* public DynActionTestService_change(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
* super(resourceName, resourceType, service, serviceid, actionid, method);
* }
*
* &#064;Override
* public void action(BsonReader in, BsonWriter out, OldSncpHandler handler) throws Throwable {
* long a1 = convert.convertFrom(paramTypes[1], in);
* short a2 = convert.convertFrom(paramTypes[2], in);
* OldSncpHandler a3 = handler;
* convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[4], in);
* String arg2 = convert.convertFrom(paramTypes[5], in);
* int arg3 = convert.convertFrom(paramTypes[6], in);
* handler.sncp_setParams(a1, a2, a3, arg1, arg2, arg3);
* service.update(a1, a2, a3, arg1, arg2, arg3);
* }
* &#064;Override
* public void action(SncpRequest request, SncpResponse response) throws Throwable {
* Convert&#60;Reader, Writer&#62; convert = request.getConvert();
* Reader in = request.getReader();
* TestBean arg1 = convert.convertFrom(paramTypes[1], in);
* String arg2 = convert.convertFrom(paramTypes[2], in);
* int arg3 = convert.convertFrom(paramTypes[3], in);
* TestService serviceObj = (TestService) service();
* Object rs = serviceObj.change(arg1, arg2, arg3);
* response.finish(boolean.class, rs);
* }
* }
*
* class DynActionTestService_insert extends SncpActionServlet {
*
* class DynActionTestService_changeName extends SncpServletAction {
* public DynActionTestService_insert(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
* super(resourceName, resourceType, service, serviceid, actionid, method);
* }
*
* public TestService service;
*
* &#064;Override
* public void action(final BsonReader in, final BsonWriter out, final OldSncpHandler handler) throws Throwable {
* TestBean arg1 = convert.convertFrom(paramTypes[1], in);
* String arg2 = convert.convertFrom(paramTypes[2], in);
* int arg3 = convert.convertFrom(paramTypes[3], in);
* handler.sncp_setParams(arg1, arg2, arg3);
* CompletableFuture future = service.changeName(arg1, arg2, arg3);
* handler.sncp_setFuture(future);
* &#064;Override
* public void action(SncpRequest request, SncpResponse response) throws Throwable {
* Convert&#60;Reader, Writer&#62; convert = request.getConvert();
* Reader in = request.getReader();
* BooleanHandler arg0 = response.getParamAsyncHandler();
* convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[2], in);
* String arg2 = convert.convertFrom(paramTypes[3], in);
* int arg3 = convert.convertFrom(paramTypes[4], in);
* TestService serviceObj = (TestService) service();
* serviceObj.insert(arg0, arg1, arg2, arg3);
* response.finishVoid();
* }
* }
*
* class DynActionTestService_update extends SncpActionServlet {
*
* public DynActionTestService_update(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
* super(resourceName, resourceType, service, serviceid, actionid, method);
* }
*
* &#064;Override
* public void action(SncpRequest request, SncpResponse response) throws Throwable {
* Convert&#60;Reader, Writer&#62; convert = request.getConvert();
* Reader in = request.getReader();
* long a1 = convert.convertFrom(paramTypes[1], in);
* short a2 = convert.convertFrom(paramTypes[2], in);
* CompletionHandler a3 = response.getParamAsyncHandler();
* convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[4], in);
* String arg2 = convert.convertFrom(paramTypes[5], in);
* int arg3 = convert.convertFrom(paramTypes[6], in);
* TestService serviceObj = (TestService) service();
* serviceObj.update(a1, a2, a3, arg1, arg2, arg3);
* response.finishVoid();
* }
* }
*
* class DynActionTestService_changeName extends SncpActionServlet {
*
* public DynActionTestService_changeName(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
* super(resourceName, resourceType, service, serviceid, actionid, method);
* }
*
* &#064;Override
* public void action(SncpRequest request, SncpResponse response) throws Throwable {
* Convert&#60;Reader, Writer&#62; convert = request.getConvert();
* Reader in = request.getReader();
* TestBean arg1 = convert.convertFrom(paramTypes[1], in);
* String arg2 = convert.convertFrom(paramTypes[2], in);
* int arg3 = convert.convertFrom(paramTypes[3], in);
* TestService serviceObj = (TestService) service();
* CompletableFuture future = serviceObj.changeName(arg1, arg2, arg3);
* response.finishFuture(paramHandlerResultType, future);
* }
* }
*
* </pre></blockquote>
@@ -334,7 +383,7 @@ public final class SncpDynServlet extends SncpServlet {
* @param actionid 操作ID
* @param method 方法
*
* @return SncpServletAction
* @return SncpActionServlet
*/
@SuppressWarnings("unchecked")
public static SncpActionServlet create(
@@ -356,7 +405,7 @@ public final class SncpDynServlet extends SncpServlet {
final String responseName = SncpResponse.class.getName().replace('.', '/');
final String requestDesc = Type.getDescriptor(SncpRequest.class);
final String responseDesc = Type.getDescriptor(SncpResponse.class);
final boolean boolReturnTypeFuture = CompletionStage.class.isAssignableFrom(method.getReturnType());
final boolean boolReturnTypeFuture = Future.class.isAssignableFrom(method.getReturnType());
final String newDynName = "org/redkaledyn/sncp/servlet/action/_DynSncpActionServlet__" + resourceType.getSimpleName() + "_" + method.getName() + "_" + actionid;
Class<?> newClazz = null;
@@ -520,12 +569,12 @@ public final class SncpDynServlet extends SncpServlet {
}
mv.visitVarInsn(ASTORE, store); //11
if (boolReturnTypeFuture) { //返回类型为CompletionStage
if (boolReturnTypeFuture) { //返回类型为Future
mv.visitVarInsn(ALOAD, 2);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "returnFutureResultType", "Ljava/lang/reflect/Type;");
mv.visitVarInsn(ALOAD, store);
mv.visitMethodInsn(INVOKEVIRTUAL, responseName, "finishFuture", "(Ljava/lang/reflect/Type;Ljava/util/concurrent/CompletionStage;)V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, responseName, "finishFuture", "(Ljava/lang/reflect/Type;Ljava/util/concurrent/Future;)V", false);
} else if (handlerFuncIndex >= 0) { //参数有CompletionHandler
mv.visitVarInsn(ALOAD, 2);
mv.visitMethodInsn(INVOKEVIRTUAL, responseName, "finishVoid", "()V", false);

View File

@@ -135,17 +135,23 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finish(0, out);
}
public final void finishFuture(final Type futureResultType, final CompletionStage future) {
public final void finishFuture(final Type futureResultType, final Future future) {
if (future == null) {
finishVoid();
} else {
future.whenComplete((v, t) -> {
} else if (future instanceof CompletionStage) {
((CompletionStage) future).whenComplete((v, t) -> {
if (t != null) {
finishError((Throwable) t);
} else {
finish(futureResultType, v);
}
});
} else {
try {
finish(futureResultType, future.get());
} catch (Exception e) {
finishError(e);
}
}
}

View File

@@ -8,7 +8,8 @@ import java.lang.reflect.*;
import java.net.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import org.redkale.convert.Convert;
import org.redkale.mq.*;
import static org.redkale.net.sncp.Sncp.loadMethodActions;
import org.redkale.service.*;
@@ -41,6 +42,8 @@ public final class SncpServiceInfo<T extends Service> {
protected final String topic;
protected final Convert convert;
//MQ模式下此字段才有值
protected final MessageAgent messageAgent;
@@ -53,11 +56,12 @@ public final class SncpServiceInfo<T extends Service> {
//远程模式, 可能为null
protected Set<InetSocketAddress> remoteAddresses;
SncpServiceInfo(String resourceName, Class<T> resourceServiceType, final T service, MessageAgent messageAgent, SncpMessageClient messageClient) {
SncpServiceInfo(String resourceName, Class<T> resourceServiceType, final T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) {
this.name = resourceName;
this.serviceType = resourceServiceType;
this.serviceid = Sncp.serviceid(name, resourceServiceType);
this.serviceid = Sncp.serviceid(resourceName, resourceServiceType);
this.service = service;
this.convert = convert;
this.serviceVersion = 0;
this.messageAgent = messageAgent;
this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient();
@@ -118,27 +122,31 @@ public final class SncpServiceInfo<T extends Service> {
protected final Method method;
protected final Type returnObjectType; //void 必须设为 null
protected final Type returnObjectType; //void必须设为null
protected final Type[] paramTypes;
protected final Class[] paramClass;
protected final Class[] paramClasses;
protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理index=0固定为null, 其他为参数标记的RpcCall回调方法
protected final int paramHandlerIndex;
protected final int handlerFuncParamIndex;
protected final int paramHandlerAttachIndex;
protected final int handlerAttachParamIndex;
protected final int paramAddressTargetIndex;
protected final int addressTargetParamIndex;
protected final int paramAddressSourceIndex;
protected final int addressSourceParamIndex;
protected final int paramTopicTargetIndex;
protected final int topicTargetParamIndex;
protected final Class<? extends CompletionHandler> paramHandlerClass; //CompletionHandler参数的类型
protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture
protected final java.lang.reflect.Type paramHandlerResultType; //CompletionHandler.completed第一个参数的类型
protected final Creator<? extends CompletableFuture> futureCreator;
protected final java.lang.reflect.Type returnFutureResultType; //返回结果的CompletableFuture的结果泛型类型
protected final Class<? extends Future> returnFutureClass; //返回结果的CompletableFuture类型
protected final Creator<? extends CompletableFuture> returnFutureCreator; //返回CompletableFuture类型的构建器
protected final SncpHeader header;
@@ -146,11 +154,9 @@ public final class SncpServiceInfo<T extends Service> {
SncpServiceAction(final Class serviceImplClass, Method method, Uint128 serviceid, Uint128 actionid) {
this.actionid = actionid == null ? Sncp.actionid(method) : actionid;
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass);
this.returnObjectType = rt == void.class ? null : rt;
this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType());
this.futureCreator = boolReturnTypeFuture ? Creator.create((Class<? extends CompletableFuture>) method.getReturnType()) : null;
this.returnObjectType = rt == void.class || rt == Void.class ? null : rt;
this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), serviceImplClass);
this.paramClass = method.getParameterTypes();
this.paramClasses = method.getParameterTypes();
this.method = method;
Annotation[][] anns = method.getParameterAnnotations();
int tpoicAddrIndex = -1;
@@ -158,23 +164,35 @@ public final class SncpServiceInfo<T extends Service> {
int sourceAddrIndex = -1;
int handlerAttachIndex = -1;
int handlerFuncIndex = -1;
boolean hasattr = false;
Attribute[] atts = new Attribute[paramTypes.length + 1];
if (anns.length > 0) {
Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) {
if (CompletionHandler.class.isAssignableFrom(params[i])) {
if (boolReturnTypeFuture) {
throw new SncpException(method + " have both CompletionHandler and CompletableFuture");
}
if (handlerFuncIndex >= 0) {
throw new SncpException(method + " have more than one CompletionHandler type parameter");
}
Sncp.checkAsyncModifier(params[i], method);
handlerFuncIndex = i;
break;
Class handlerFuncClass = null;
java.lang.reflect.Type handlerResultType = null;
Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) {
if (CompletionHandler.class.isAssignableFrom(params[i])) {
if (Future.class.isAssignableFrom(method.getReturnType())) {
throw new SncpException(method + " have both CompletionHandler and CompletableFuture");
}
if (handlerFuncIndex >= 0) {
throw new SncpException(method + " have more than one CompletionHandler type parameter");
}
Sncp.checkAsyncModifier(params[i], method);
handlerFuncIndex = i;
handlerFuncClass = paramClasses[i];
java.lang.reflect.Type handlerType = TypeToken.getGenericType(method.getTypeParameters()[i], serviceImplClass);
if (handlerType instanceof Class) {
handlerResultType = Object.class;
} else if (handlerType instanceof ParameterizedType) {
handlerResultType = TypeToken.getGenericType(((ParameterizedType) handlerType).getActualTypeArguments()[0], handlerType);
} else {
throw new SncpException(serviceImplClass + " had unknown genericType in " + method);
}
if (method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}
break;
}
}
if (anns.length > 0) {
for (int i = 0; i < anns.length; i++) {
if (anns[i].length > 0) {
for (Annotation ann : anns[i]) {
@@ -183,27 +201,74 @@ public final class SncpServiceInfo<T extends Service> {
throw new SncpException(method + " have more than one @RpcAttachment parameter");
}
handlerAttachIndex = i;
} else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
targetAddrIndex = i;
} else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
sourceAddrIndex = i;
} else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) {
tpoicAddrIndex = i;
} else if (ann.annotationType() == RpcTargetAddress.class) {
if (SocketAddress.class.isAssignableFrom(params[i])) {
if (sourceAddrIndex >= 0) {
throw new SncpException(method + " have more than one @RpcTargetAddress parameter");
} else {
targetAddrIndex = i;
}
} else {
throw new SncpException(method + " must be SocketAddress Type on @RpcTargetAddress parameter");
}
} else if (ann.annotationType() == RpcSourceAddress.class) {
if (SocketAddress.class.isAssignableFrom(params[i])) {
if (sourceAddrIndex >= 0) {
throw new SncpException(method + " have more than one @RpcSourceAddress parameter");
} else {
sourceAddrIndex = i;
}
} else {
throw new SncpException(method + " must be SocketAddress Type on @RpcSourceAddress parameter");
}
} else if (ann.annotationType() == RpcTargetTopic.class) {
if (String.class.isAssignableFrom(params[i])) {
if (sourceAddrIndex >= 0) {
throw new SncpException(method + " have more than one @RpcTargetTopic parameter");
} else {
tpoicAddrIndex = i;
}
} else {
throw new SncpException(method + " must be String Type on @RpcTargetTopic parameter");
}
}
}
}
}
}
this.topicTargetParamIndex = tpoicAddrIndex;
this.addressTargetParamIndex = targetAddrIndex;
this.addressSourceParamIndex = sourceAddrIndex;
this.handlerFuncParamIndex = handlerFuncIndex;
this.handlerAttachParamIndex = handlerAttachIndex;
this.paramAttrs = hasattr ? atts : null;
this.paramTopicTargetIndex = tpoicAddrIndex;
this.paramAddressTargetIndex = targetAddrIndex;
this.paramAddressSourceIndex = sourceAddrIndex;
this.paramHandlerIndex = handlerFuncIndex;
this.paramHandlerClass = handlerFuncClass;
this.paramHandlerResultType = handlerResultType;
this.paramHandlerAttachIndex = handlerAttachIndex;
this.header = new SncpHeader(null, serviceid, actionid);
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) {
if (this.paramHandlerIndex >= 0 && method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}
if (Future.class.isAssignableFrom(method.getReturnType())) {
java.lang.reflect.Type futureType = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass);
java.lang.reflect.Type returnType = null;
if (futureType instanceof Class) {
returnType = Object.class;
} else if (futureType instanceof ParameterizedType) {
returnType = TypeToken.getGenericType(((ParameterizedType) futureType).getActualTypeArguments()[0], futureType);
} else {
throw new SncpException(serviceImplClass + " had unknown return genericType in " + method);
}
this.returnFutureResultType = returnType;
this.returnFutureClass = method.getReturnType().isAssignableFrom(CompletableFuture.class) ? CompletableFuture.class : (Class) method.getReturnType();
if (method.getReturnType().isAssignableFrom(CompletableFuture.class) || CompletableFuture.class.isAssignableFrom(method.getReturnType())) {
this.returnFutureCreator = (Creator) Creator.create(this.returnFutureClass);
} else {
throw new SncpException(serviceImplClass + " return must be CompletableFuture or subclass");
}
} else {
this.returnFutureResultType = null;
this.returnFutureClass = null;
this.returnFutureCreator = null;
}
}
public String actionName() {

View File

@@ -199,6 +199,10 @@ public interface Creator<T> {
clazz = (Class<T>) AbstractMap.SimpleEntry.class;
} else if (Iterable.class == clazz) {
clazz = (Class<T>) ArrayList.class;
} else if (CompletionStage.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(CompletableFuture.class)) {
clazz = (Class<T>) CompletableFuture.class;
} else if (Future.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(CompletableFuture.class)) {
clazz = (Class<T>) CompletableFuture.class;
}
Creator creator = CreatorInner.creatorCacheMap.get(clazz);
if (creator != null) {
@@ -586,6 +590,7 @@ public interface Creator<T> {
creatorCacheMap.put(ConcurrentHashMap.class, p -> new ConcurrentHashMap<>());
creatorCacheMap.put(CompletableFuture.class, p -> new CompletableFuture<>());
creatorCacheMap.put(CompletionStage.class, p -> new CompletableFuture<>());
creatorCacheMap.put(Future.class, p -> new CompletableFuture<>());
creatorCacheMap.put(Map.Entry.class, new Creator<Map.Entry>() {
@Override
@ConstructorParameters({"key", "value"})
@@ -626,6 +631,7 @@ public interface Creator<T> {
arrayCacheMap.put(ByteBuffer.class, t -> new ByteBuffer[t]);
arrayCacheMap.put(SocketAddress.class, t -> new SocketAddress[t]);
arrayCacheMap.put(InetSocketAddress.class, t -> new InetSocketAddress[t]);
arrayCacheMap.put(CompletableFuture.class, t -> new CompletableFuture[t]);
}
static class SimpleClassVisitor extends ClassVisitor {