diff --git a/src/main/java/org/redkale/convert/Convert.java b/src/main/java/org/redkale/convert/Convert.java index 7c2db9c00..2fb24a8ec 100644 --- a/src/main/java/org/redkale/convert/Convert.java +++ b/src/main/java/org/redkale/convert/Convert.java @@ -55,6 +55,14 @@ public abstract class Convert { public abstract boolean isBinary(); + public abstract R pollReader(); + + public abstract void offerReader(final R reader); + + public abstract W pollWriter(); + + public abstract void offerWriter(final W write); + public abstract T convertFrom(final Type type, final byte[] bytes); public abstract T convertFrom(final Type type, final R reader); diff --git a/src/main/java/org/redkale/convert/Reader.java b/src/main/java/org/redkale/convert/Reader.java index b6ca6fc06..553436a20 100644 --- a/src/main/java/org/redkale/convert/Reader.java +++ b/src/main/java/org/redkale/convert/Reader.java @@ -30,6 +30,13 @@ public abstract class Reader { public static final short SIGN_NOLENBUTBYTES = -3; //目前只适合于protobuf的boolean[]...double[]类型 + /** + * 设置Reader的内容,通常结合对象池使用 + * + * @param content 内容 + */ + public abstract void prepare(byte[] content); + /** * 是否还存在下个元素或字段
* 注意: 主要用于Array、Collection、Stream或Map等集合对象 diff --git a/src/main/java/org/redkale/convert/bson/BsonConvert.java b/src/main/java/org/redkale/convert/bson/BsonConvert.java index 1fca652c3..a00a3b273 100644 --- a/src/main/java/org/redkale/convert/bson/BsonConvert.java +++ b/src/main/java/org/redkale/convert/bson/BsonConvert.java @@ -41,7 +41,9 @@ public class BsonConvert extends BinaryConvert { private final ThreadLocal writerPool = ThreadLocal.withInitial(BsonWriter::new); - private final Consumer offerConsumer = w -> offerBsonWriter(w); + private final Consumer writerConsumer = w -> offerWriter(w); + + private final ThreadLocal readerPool = ThreadLocal.withInitial(BsonReader::new); private final boolean tiny; @@ -75,32 +77,44 @@ public class BsonConvert extends BinaryConvert { } //------------------------------ reader ----------------------------------------------------------- - public BsonReader pollBsonReader(final ByteBuffer... buffers) { + public BsonReader pollReader(final ByteBuffer... buffers) { return new BsonByteBufferReader((ConvertMask) null, buffers); } - public BsonReader pollBsonReader(final InputStream in) { + public BsonReader pollReader(final InputStream in) { return new BsonStreamReader(in); } - public BsonReader pollBsonReader() { - return new BsonReader(); + @Override + public BsonReader pollReader() { + BsonReader reader = readerPool.get(); + if (reader == null) { + reader = new BsonReader(); + } else { + readerPool.set(null); + } + return reader; } - public void offerBsonReader(final BsonReader in) { - //无需回收 + @Override + public void offerReader(final BsonReader in) { + if (in != null) { + in.recycle(); + readerPool.set(in); + } } //------------------------------ writer ----------------------------------------------------------- - public BsonByteBufferWriter pollBsonWriter(final Supplier supplier) { + public BsonByteBufferWriter pollWriter(final Supplier supplier) { return configWrite(new BsonByteBufferWriter(tiny, supplier)); } - protected BsonWriter pollBsonWriter(final OutputStream out) { + protected BsonWriter pollWriter(final OutputStream out) { return configWrite(new BsonStreamWriter(tiny, out)); } - public BsonWriter pollBsonWriter() { + @Override + public BsonWriter pollWriter() { BsonWriter writer = writerPool.get(); if (writer == null) { writer = new BsonWriter(); @@ -110,7 +124,8 @@ public class BsonConvert extends BinaryConvert { return configWrite(writer.tiny(tiny)); } - public void offerBsonWriter(final BsonWriter out) { + @Override + public void offerWriter(final BsonWriter out) { if (out != null) { out.recycle(); writerPool.set(out); @@ -179,10 +194,10 @@ public class BsonConvert extends BinaryConvert { @Override public byte[] convertTo(final Object value) { if (value == null) { - final BsonWriter out = pollBsonWriter(); + final BsonWriter out = pollWriter(); out.writeNull(); byte[] result = out.toArray(); - offerBsonWriter(out); + offerWriter(out); return result; } return convertTo(value.getClass(), value); @@ -193,10 +208,10 @@ public class BsonConvert extends BinaryConvert { if (type == null) { return null; } - final BsonWriter writer = pollBsonWriter(); + final BsonWriter writer = pollWriter(); factory.loadEncoder(type).convertTo(writer, value); byte[] result = writer.toArray(); - offerBsonWriter(writer); + offerWriter(writer); return result; } @@ -217,13 +232,13 @@ public class BsonConvert extends BinaryConvert { @Override public void convertToBytes(final Type type, final Object value, final ConvertBytesHandler handler) { - final BsonWriter writer = pollBsonWriter(); + final BsonWriter writer = pollWriter(); if (type == null) { writer.writeNull(); } else { factory.loadEncoder(type).convertTo(writer, value); } - writer.completed(handler, offerConsumer); + writer.completed(handler, writerConsumer); } @Override @@ -244,9 +259,9 @@ public class BsonConvert extends BinaryConvert { public void convertTo(final OutputStream out, final Object value) { if (value == null) { - pollBsonWriter(out).writeNull(); + pollWriter(out).writeNull(); } else { - factory.loadEncoder(value.getClass()).convertTo(pollBsonWriter(out), value); + factory.loadEncoder(value.getClass()).convertTo(pollWriter(out), value); } } @@ -255,9 +270,9 @@ public class BsonConvert extends BinaryConvert { return; } if (value == null) { - pollBsonWriter(out).writeNull(); + pollWriter(out).writeNull(); } else { - factory.loadEncoder(type).convertTo(pollBsonWriter(out), value); + factory.loadEncoder(type).convertTo(pollWriter(out), value); } } @@ -266,7 +281,7 @@ public class BsonConvert extends BinaryConvert { if (supplier == null) { return null; } - BsonByteBufferWriter out = pollBsonWriter(supplier); + BsonByteBufferWriter out = pollWriter(supplier); if (value == null) { out.writeNull(); } else { @@ -280,7 +295,7 @@ public class BsonConvert extends BinaryConvert { if (supplier == null || type == null) { return null; } - BsonByteBufferWriter writer = pollBsonWriter(supplier); + BsonByteBufferWriter writer = pollWriter(supplier); if (value == null) { writer.writeNull(); } else { diff --git a/src/main/java/org/redkale/convert/bson/BsonReader.java b/src/main/java/org/redkale/convert/bson/BsonReader.java index 136a8f5a7..8cbc4983c 100644 --- a/src/main/java/org/redkale/convert/bson/BsonReader.java +++ b/src/main/java/org/redkale/convert/bson/BsonReader.java @@ -55,6 +55,11 @@ public class BsonReader extends Reader { setBytes(bytes, start, len); } + @Override + public void prepare(byte[] bytes) { + setBytes(bytes); + } + public final BsonReader setBytes(byte[] bytes) { if (bytes == null) { this.position = 0; diff --git a/src/main/java/org/redkale/convert/json/JsonConvert.java b/src/main/java/org/redkale/convert/json/JsonConvert.java index bd0b3bf3f..658912faa 100644 --- a/src/main/java/org/redkale/convert/json/JsonConvert.java +++ b/src/main/java/org/redkale/convert/json/JsonConvert.java @@ -37,6 +37,8 @@ public class JsonConvert extends TextConvert { private final Consumer offerBytesConsumer = w -> offerJsonBytesWriter(w); + private final ThreadLocal readerPool = ThreadLocal.withInitial(JsonReader::new); + private final boolean tiny; private Encodeable lastConvertEncodeable; @@ -81,6 +83,45 @@ public class JsonConvert extends TextConvert { }; } + @Override + public JsonReader pollReader() { + JsonReader reader = readerPool.get(); + if (reader == null) { + reader = new JsonReader(); + } else { + readerPool.set(null); + } + return reader; + } + + @Override + public void offerReader(final JsonReader in) { + if (in != null) { + in.recycle(); + readerPool.set(in); + } + } + + @Override + public JsonWriter pollWriter() { + JsonBytesWriter writer = bytesWriterPool.get(); + if (writer == null) { + writer = new JsonBytesWriter(); + } else { + bytesWriterPool.set(null); + } + return configWrite((JsonBytesWriter) writer.tiny(tiny)); + } + + @Override + public void offerWriter(final JsonWriter writer) { + if (writer instanceof JsonBytesWriter) { + JsonBytesWriter bw = (JsonBytesWriter) writer; + bw.recycle(); + bytesWriterPool.set(bw); + } + } + //------------------------------ writer ----------------------------------------------------------- private JsonBytesWriter pollJsonBytesWriter() { JsonBytesWriter writer = bytesWriterPool.get(); diff --git a/src/main/java/org/redkale/convert/json/JsonReader.java b/src/main/java/org/redkale/convert/json/JsonReader.java index 1bf1228a1..ad5a252c4 100644 --- a/src/main/java/org/redkale/convert/json/JsonReader.java +++ b/src/main/java/org/redkale/convert/json/JsonReader.java @@ -5,6 +5,7 @@ */ package org.redkale.convert.json; +import java.nio.charset.StandardCharsets; import java.util.Map; import org.redkale.convert.*; import static org.redkale.convert.Reader.*; @@ -59,6 +60,13 @@ public class JsonReader extends Reader { this.limit = start + len - 1; } + @Override + public void prepare(byte[] bytes) { + if (bytes != null) { + setText(new String(bytes, StandardCharsets.UTF_8)); + } + } + protected boolean recycle() { this.position = -1; this.limit = -1; diff --git a/src/main/java/org/redkale/net/sncp/OldSncpClient.java b/src/main/java/org/redkale/net/sncp/OldSncpClient.java index 0aca53579..4362dc039 100644 --- a/src/main/java/org/redkale/net/sncp/OldSncpClient.java +++ b/src/main/java/org/redkale/net/sncp/OldSncpClient.java @@ -161,25 +161,21 @@ public final class OldSncpClient { //只给远程模式调用的 public T remote(final int index, final Object... params) { final SncpServiceAction action = actions[index]; - final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null; - if (action.handlerFuncParamIndex >= 0) { - params[action.handlerFuncParamIndex] = null; + final CompletionHandler handlerFunc = action.paramHandlerIndex >= 0 ? (CompletionHandler) params[action.paramHandlerIndex] : null; + if (action.paramHandlerIndex >= 0) { + params[action.paramHandlerIndex] = null; } - final BsonReader reader = bsonConvert.pollBsonReader(); + final BsonReader reader = bsonConvert.pollReader(); CompletableFuture future = remote0(handlerFunc, remoteGroupTransport, null, action, params); - if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥 - CompletableFuture result = action.futureCreator.create(); + if (action.returnFutureResultType != null) { //与handlerFuncIndex互斥 + CompletableFuture result = (CompletableFuture) action.returnFutureCreator.create(); future.whenComplete((v, e) -> { try { if (e != null) { result.completeExceptionally(e); } else { reader.setBytes(v); - byte i; - while ((i = reader.readByte()) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); - } + reader.readByte(); //读掉0 Object rs = bsonConvert.convertFrom(Object.class, reader); result.complete(rs); @@ -187,7 +183,7 @@ public final class OldSncpClient { } catch (Exception exp) { result.completeExceptionally(exp); } finally { - bsonConvert.offerBsonReader(reader); + bsonConvert.offerReader(reader); } }); //需要获取 Executor return (T) result; @@ -197,12 +193,8 @@ public final class OldSncpClient { } try { reader.setBytes(future.get(5, TimeUnit.SECONDS)); - byte i; - while ((i = reader.readByte()) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); - } - return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.returnObjectType, reader); + reader.readByte(); //读掉0 + return bsonConvert.convertFrom(action.paramHandlerIndex >= 0 ? Object.class : action.returnObjectType, reader); } catch (RpcRemoteException re) { throw re; } catch (TimeoutException e) { @@ -210,21 +202,21 @@ public final class OldSncpClient { } catch (InterruptedException | ExecutionException e) { throw new RpcRemoteException(actions[index].method + " sncp remote error, params=" + JsonConvert.root().convertTo(params), e); } finally { - bsonConvert.offerBsonReader(reader); + bsonConvert.offerReader(reader); } } private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpServiceAction action, final Object... params) { final String traceid = Traces.currTraceid(); final Type[] myparamtypes = action.paramTypes; - final Class[] myparamclass = action.paramClass; - if (action.addressSourceParamIndex >= 0) { - params[action.addressSourceParamIndex] = this.clientSncpAddress; + final Class[] myparamclass = action.paramClasses; + if (action.paramAddressSourceIndex >= 0) { + params[action.paramAddressSourceIndex] = this.clientSncpAddress; } if (bsonConvert == null) { bsonConvert = BsonConvert.root(); } - final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入 + final BsonWriter writer = bsonConvert.pollWriter(); // 将head写入 writer.writePlaceholderTo(HEADER_SIZE); for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean BsonConvert bcc = bsonConvert; @@ -242,7 +234,7 @@ public final class OldSncpClient { if (messageAgent != null) { //MQ模式 final ByteArray reqbytes = writer.toByteArray(); fillHeader(reqbytes, action, seqid, traceid, reqBodyLength); - String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; + String targetTopic = action.paramTopicTargetIndex >= 0 ? (String) params[action.paramTopicTargetIndex] : this.topic; if (targetTopic == null) { targetTopic = this.topic; } @@ -272,7 +264,7 @@ public final class OldSncpClient { return body; }); } - final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; + final SocketAddress addr = addr0 == null ? (action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : null) : addr0; CompletableFuture connFuture = transport.pollConnection(addr); return connFuture.thenCompose(conn0 -> { final CompletableFuture future = new CompletableFuture(); @@ -356,7 +348,7 @@ public final class OldSncpClient { future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params))); transport.offerConnection(true, conn); if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; handler.failed(e, handlerAttach); } logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e); @@ -368,21 +360,17 @@ public final class OldSncpClient { future.complete(this.body); transport.offerConnection(false, conn); if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - final BsonReader reader = bsonConvert.pollBsonReader(); + final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; + final BsonReader reader = bsonConvert.pollReader(); try { reader.setBytes(this.body); - int i; - while ((i = (reader.readByte() & 0xff)) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader)); - } - Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.returnObjectType, reader); + reader.readByte(); //读掉0 + Object rs = bsonConvert.convertFrom(action.paramHandlerIndex >= 0 ? Object.class : action.returnObjectType, reader); handler.completed(rs, handlerAttach); } catch (Exception e) { handler.failed(e, handlerAttach); } finally { - bsonConvert.offerBsonReader(reader); + bsonConvert.offerReader(reader); } } } @@ -393,7 +381,7 @@ public final class OldSncpClient { conn.offerReadBuffer(attachment2); transport.offerConnection(true, conn); if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; handler.failed(exc, handlerAttach); } logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed, params=" + JsonConvert.root().convertTo(params), exc); @@ -406,7 +394,7 @@ public final class OldSncpClient { future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); transport.offerConnection(true, conn); if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; handler.failed(exc, handlerAttach); } logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed, params=" + JsonConvert.root().convertTo(params), exc); diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 353dd1c29..2fc08e7b3 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -19,6 +19,7 @@ import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; +import org.redkale.convert.Convert; import org.redkale.mq.*; import org.redkale.net.TransportFactory; import org.redkale.net.http.WebSocketNode; @@ -150,8 +151,8 @@ public abstract class Sncp { } public static SncpServiceInfo createSncpServiceInfo(String resourceName, - Class resourceServiceType, T service, MessageAgent messageAgent, SncpMessageClient messageClient) { - return new SncpServiceInfo(resourceName, resourceServiceType, service, messageAgent, messageClient); + Class resourceServiceType, T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) { + return new SncpServiceInfo(resourceName, resourceServiceType, service, convert, messageAgent, messageClient); } public static Uint128 actionid(final RpcAction action) { diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 367a98090..a7c857562 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -4,10 +4,15 @@ package org.redkale.net.sncp; import java.net.InetSocketAddress; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; import org.redkale.annotation.Resource; +import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonConvert; import org.redkale.net.*; import org.redkale.net.client.*; +import org.redkale.net.sncp.SncpServiceInfo.SncpServiceAction; +import org.redkale.util.Traces; /** * SNCP版Client @@ -40,12 +45,68 @@ public class SncpClient extends Client connect(SncpServiceInfo info) { + return super.connect(); } //只给远程模式调用的 public T remote(final SncpServiceInfo info, final int index, final Object... params) { + final String traceid = Traces.currTraceid(); + final Convert convert = info.convert; + final SncpServiceAction action = info.actions[index]; + CompletionHandler callbackHandler = null; + Object callbackHandlerAttach = null; + if (action.paramHandlerIndex >= 0) { + callbackHandler = (CompletionHandler) params[action.paramHandlerIndex]; + params[action.paramHandlerIndex] = null; + if (action.paramHandlerAttachIndex >= 0) { + callbackHandlerAttach = params[action.paramHandlerAttachIndex]; + params[action.paramHandlerAttachIndex] = null; + } + } + final CompletableFuture future = remote(info, action, traceid, params); + if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler + final CompletionHandler handler = callbackHandler; + final Object attach = callbackHandlerAttach; + if (handler == null) { //传入的CompletionHandler参数为null + future.join(); + } else { + future.whenComplete((v, t) -> { + if (t == null) { + handler.completed(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v), attach); + } else { + handler.failed(t, attach); + } + }); + } + } else if (action.returnFutureClass != null) { //返回类型为CompletableFuture + if (action.returnFutureClass == CompletableFuture.class) { + return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)); + } else { + final CompletableFuture stage = action.returnFutureCreator.create(); + future.whenComplete((v, t) -> { + if (t == null) { + stage.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)); + } else { + stage.completeExceptionally(t); + } + }); + return (T) stage; + } + } else if (action.returnObjectType != null) { //返回类型为JavaBean + return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)).join(); + } else { //返回类型为void + future.join(); + } + return null; + } + + protected CompletableFuture remote( + final SncpServiceInfo info, + final SncpServiceAction action, + final String traceid, + final Object... params) { + return null; } } diff --git a/src/main/java/org/redkale/net/sncp/SncpDynServlet.java b/src/main/java/org/redkale/net/sncp/SncpDynServlet.java index 692e23606..691840ddc 100644 --- a/src/main/java/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpDynServlet.java @@ -9,7 +9,7 @@ import java.io.IOException; import java.lang.reflect.*; import java.nio.channels.CompletionHandler; import java.util.*; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import org.redkale.annotation.NonBlocking; @@ -138,11 +138,11 @@ public final class SncpDynServlet extends SncpServlet { protected final java.lang.reflect.Type[] paramTypes; //第一个元素存放返回类型return type, void的返回参数类型为null, 数组长度为:1+参数个数 - protected final java.lang.reflect.Type returnObjectType; //返回结果的CompletableFuture的结果泛型类型 + protected final java.lang.reflect.Type returnObjectType; //返回结果类型 void必须设为null protected final int paramHandlerIndex; //>=0表示存在CompletionHandler参数 - protected final Class paramHandlerType; //CompletionHandler参数的类型 + protected final Class paramHandlerClass; //CompletionHandler参数的类型 protected final java.lang.reflect.Type paramHandlerResultType; //CompletionHandler.completed第一个参数的类型 @@ -155,21 +155,24 @@ public final class SncpDynServlet extends SncpServlet { this.method = method; int handlerFuncIndex = -1; - Class handlerFuncType = null; + Class handlerFuncClass = null; java.lang.reflect.Type handlerResultType = null; try { final Class[] paramClasses = method.getParameterTypes(); for (int i = 0; i < paramClasses.length; i++) { //反序列化方法的每个参数 if (CompletionHandler.class.isAssignableFrom(paramClasses[i])) { handlerFuncIndex = i; - handlerFuncType = paramClasses[i]; + handlerFuncClass = paramClasses[i]; java.lang.reflect.Type handlerType = TypeToken.getGenericType(method.getTypeParameters()[i], service.getClass()); if (handlerType instanceof Class) { handlerResultType = Object.class; } else if (handlerType instanceof ParameterizedType) { handlerResultType = TypeToken.getGenericType(((ParameterizedType) handlerType).getActualTypeArguments()[0], handlerType); } else { - throw new SncpException(service.getClass() + " had unknown gGenericType in " + method); + throw new SncpException(service.getClass() + " had unknown genericType in " + method); + } + if (method.getReturnType() != void.class) { + throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); } break; } @@ -183,10 +186,10 @@ public final class SncpDynServlet extends SncpServlet { System.arraycopy(originalParamTypes, 0, types, 1, originalParamTypes.length); this.paramTypes = types; this.paramHandlerIndex = handlerFuncIndex; - this.paramHandlerType = handlerFuncType; + this.paramHandlerClass = handlerFuncClass; this.paramHandlerResultType = handlerResultType; - this.returnObjectType = originalReturnType; - if (CompletionStage.class.isAssignableFrom(method.getReturnType())) { + this.returnObjectType = originalReturnType == void.class || originalReturnType == Void.class ? null : originalReturnType; + if (Future.class.isAssignableFrom(method.getReturnType())) { java.lang.reflect.Type futureType = TypeToken.getGenericType(method.getGenericReturnType(), service.getClass()); java.lang.reflect.Type returnType = null; if (futureType instanceof Class) { @@ -204,13 +207,14 @@ public final class SncpDynServlet extends SncpServlet { if (non == null) { non = service.getClass().getAnnotation(NonBlocking.class); } + //Future代替CompletionStage 不容易判断异步 this.nonBlocking = non == null ? (CompletionStage.class.isAssignableFrom(method.getReturnType()) || this.paramHandlerIndex >= 0) : false; } @Override public final void execute(SncpRequest request, SncpResponse response) throws IOException { if (paramHandlerIndex > 0) { - response.paramAsyncHandler(paramHandlerType, paramHandlerResultType); + response.paramAsyncHandler(paramHandlerClass, paramHandlerResultType); } try { action(request, response); @@ -242,87 +246,132 @@ public final class SncpDynServlet extends SncpServlet { /** *
-         *  public class TestService implements Service {
+         *      public interface TestService extends Service {
          *
-         *      public boolean change(TestBean bean, String name, int id) {
-         *          return false;
-         *      }
+         *     public boolean change(TestBean bean, String name, int id);
          *
-         *      public void insert(CompletionHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) {
-         *      }
+         *     public void insert(BooleanHandler handler, TestBean bean, String name, int id);
          *
-         *      public void update(long show, short v2, CompletionHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) {
-         *      }
+         *     public void update(long show, short v2, CompletionHandler<Boolean, TestBean> handler, TestBean bean, String name, int id);
+         *
+         *    public CompletableFuture<String> changeName(TestBean bean, String name, int id);
          *
-         *      public CompletableFuture<String> changeName(TestBean bean, String name, int id) {
-         *          return null;
-         *      }
          * }
          *
+         * @ResourceType(TestService.class)
+         * class TestServiceImpl implements TestService {
          *
-         * class DynActionTestService_change extends SncpServletAction {
+         *     @Override
+         *     public boolean change(TestBean bean, String name, int id) {
+         *         return false;
+         *     }
          *
-         *      public TestService service;
+         *     @Override
+         *     public void insert(BooleanHandler handler, TestBean bean, String name, int id) {
+         *     }
          *
-         *      @Override
-         * public void action(BsonReader in, BsonWriter out, OldSncpHandler handler) throws Throwable {
-         * TestBean arg1 = convert.convertFrom(paramTypes[1], in);
-         * String arg2 = convert.convertFrom(paramTypes[2], in);
-         * int arg3 = convert.convertFrom(paramTypes[3], in);
-         * Object rs = service.change(arg1, arg2, arg3);
-         * _callParameter(out, arg1, arg2, arg3);
-         * convert.convertTo(out, paramTypes[0], rs);
-         * }
+         *     @Override
+         *     public void update(long show, short v2, CompletionHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) {
+         *     }
+         *
+         *     @Override
+         *     public CompletableFuture<String> changeName(TestBean bean, String name, int id) {
+         *         return null;
+         *     }
          * }
          *
-         * class DynActionTestService_insert extends SncpServletAction {
+         * class BooleanHandler implements CompletionHandler<Boolean, TestBean> {
          *
-         * public TestService service;
+         *     @Override
+         *     public void completed(Boolean result, TestBean attachment) {
+         *     }
+         *
+         *     @Override
+         *     public void failed(Throwable exc, TestBean attachment) {
+         *     }
          *
-         * @Override
-         * public void action(BsonReader in, BsonWriter out, OldSncpHandler handler) throws Throwable {
-         * OldSncpHandler arg0 = handler;
-         * convert.convertFrom(CompletionHandler.class, in);
-         * TestBean arg1 = convert.convertFrom(paramTypes[2], in);
-         * String arg2 = convert.convertFrom(paramTypes[3], in);
-         * int arg3 = convert.convertFrom(paramTypes[4], in);
-         * handler.sncp_setParams(arg0, arg1, arg2, arg3);
-         * service.insert(arg0, arg1, arg2, arg3);
-         * }
          * }
          *
-         * class DynActionTestService_update extends SncpServletAction {
+         * class DynActionTestService_change extends SncpActionServlet {
          *
-         * public TestService service;
+         *     public DynActionTestService_change(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
+         *         super(resourceName, resourceType, service, serviceid, actionid, method);
+         *     }
          *
-         * @Override
-         * public void action(BsonReader in, BsonWriter out, OldSncpHandler handler) throws Throwable {
-         * long a1 = convert.convertFrom(paramTypes[1], in);
-         * short a2 = convert.convertFrom(paramTypes[2], in);
-         * OldSncpHandler a3 = handler;
-         * convert.convertFrom(CompletionHandler.class, in);
-         * TestBean arg1 = convert.convertFrom(paramTypes[4], in);
-         * String arg2 = convert.convertFrom(paramTypes[5], in);
-         * int arg3 = convert.convertFrom(paramTypes[6], in);
-         * handler.sncp_setParams(a1, a2, a3, arg1, arg2, arg3);
-         * service.update(a1, a2, a3, arg1, arg2, arg3);
-         * }
+         *     @Override
+         *     public void action(SncpRequest request, SncpResponse response) throws Throwable {
+         *         Convert<Reader, Writer> convert = request.getConvert();
+         *         Reader in = request.getReader();
+         *         TestBean arg1 = convert.convertFrom(paramTypes[1], in);
+         *         String arg2 = convert.convertFrom(paramTypes[2], in);
+         *         int arg3 = convert.convertFrom(paramTypes[3], in);
+         *         TestService serviceObj = (TestService) service();
+         *         Object rs = serviceObj.change(arg1, arg2, arg3);
+         *         response.finish(boolean.class, rs);
+         *     }
          * }
          *
+         * class DynActionTestService_insert extends SncpActionServlet {
          *
-         * class DynActionTestService_changeName extends SncpServletAction {
+         *     public DynActionTestService_insert(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
+         *         super(resourceName, resourceType, service, serviceid, actionid, method);
+         *     }
          *
-         * public TestService service;
-         *
-         * @Override
-         * public void action(final BsonReader in, final BsonWriter out, final OldSncpHandler handler) throws Throwable {
-         * TestBean arg1 = convert.convertFrom(paramTypes[1], in);
-         * String arg2 = convert.convertFrom(paramTypes[2], in);
-         * int arg3 = convert.convertFrom(paramTypes[3], in);
-         * handler.sncp_setParams(arg1, arg2, arg3);
-         * CompletableFuture future = service.changeName(arg1, arg2, arg3);
-         * handler.sncp_setFuture(future);
+         *     @Override
+         *     public void action(SncpRequest request, SncpResponse response) throws Throwable {
+         *         Convert<Reader, Writer> convert = request.getConvert();
+         *         Reader in = request.getReader();
+         *         BooleanHandler arg0 = response.getParamAsyncHandler();
+         *         convert.convertFrom(CompletionHandler.class, in);
+         *         TestBean arg1 = convert.convertFrom(paramTypes[2], in);
+         *         String arg2 = convert.convertFrom(paramTypes[3], in);
+         *         int arg3 = convert.convertFrom(paramTypes[4], in);
+         *         TestService serviceObj = (TestService) service();
+         *         serviceObj.insert(arg0, arg1, arg2, arg3);
+         *         response.finishVoid();
+         *     }
          * }
+         *
+         * class DynActionTestService_update extends SncpActionServlet {
+         *
+         *     public DynActionTestService_update(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
+         *         super(resourceName, resourceType, service, serviceid, actionid, method);
+         *     }
+         *
+         *     @Override
+         *     public void action(SncpRequest request, SncpResponse response) throws Throwable {
+         *         Convert<Reader, Writer> convert = request.getConvert();
+         *         Reader in = request.getReader();
+         *         long a1 = convert.convertFrom(paramTypes[1], in);
+         *         short a2 = convert.convertFrom(paramTypes[2], in);
+         *         CompletionHandler a3 = response.getParamAsyncHandler();
+         *         convert.convertFrom(CompletionHandler.class, in);
+         *         TestBean arg1 = convert.convertFrom(paramTypes[4], in);
+         *         String arg2 = convert.convertFrom(paramTypes[5], in);
+         *         int arg3 = convert.convertFrom(paramTypes[6], in);
+         *         TestService serviceObj = (TestService) service();
+         *         serviceObj.update(a1, a2, a3, arg1, arg2, arg3);
+         *         response.finishVoid();
+         *     }
+         * }
+         *
+         * class DynActionTestService_changeName extends SncpActionServlet {
+         *
+         *     public DynActionTestService_changeName(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) {
+         *         super(resourceName, resourceType, service, serviceid, actionid, method);
+         *     }
+         *
+         *     @Override
+         *     public void action(SncpRequest request, SncpResponse response) throws Throwable {
+         *         Convert<Reader, Writer> convert = request.getConvert();
+         *         Reader in = request.getReader();
+         *         TestBean arg1 = convert.convertFrom(paramTypes[1], in);
+         *         String arg2 = convert.convertFrom(paramTypes[2], in);
+         *         int arg3 = convert.convertFrom(paramTypes[3], in);
+         *         TestService serviceObj = (TestService) service();
+         *         CompletableFuture future = serviceObj.changeName(arg1, arg2, arg3);
+         *         response.finishFuture(paramHandlerResultType, future);
+         *     }
          * }
          *
          * 
@@ -334,7 +383,7 @@ public final class SncpDynServlet extends SncpServlet { * @param actionid 操作ID * @param method 方法 * - * @return SncpServletAction + * @return SncpActionServlet */ @SuppressWarnings("unchecked") public static SncpActionServlet create( @@ -356,7 +405,7 @@ public final class SncpDynServlet extends SncpServlet { final String responseName = SncpResponse.class.getName().replace('.', '/'); final String requestDesc = Type.getDescriptor(SncpRequest.class); final String responseDesc = Type.getDescriptor(SncpResponse.class); - final boolean boolReturnTypeFuture = CompletionStage.class.isAssignableFrom(method.getReturnType()); + final boolean boolReturnTypeFuture = Future.class.isAssignableFrom(method.getReturnType()); final String newDynName = "org/redkaledyn/sncp/servlet/action/_DynSncpActionServlet__" + resourceType.getSimpleName() + "_" + method.getName() + "_" + actionid; Class newClazz = null; @@ -520,12 +569,12 @@ public final class SncpDynServlet extends SncpServlet { } mv.visitVarInsn(ASTORE, store); //11 - if (boolReturnTypeFuture) { //返回类型为CompletionStage + if (boolReturnTypeFuture) { //返回类型为Future mv.visitVarInsn(ALOAD, 2); mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "returnFutureResultType", "Ljava/lang/reflect/Type;"); mv.visitVarInsn(ALOAD, store); - mv.visitMethodInsn(INVOKEVIRTUAL, responseName, "finishFuture", "(Ljava/lang/reflect/Type;Ljava/util/concurrent/CompletionStage;)V", false); + mv.visitMethodInsn(INVOKEVIRTUAL, responseName, "finishFuture", "(Ljava/lang/reflect/Type;Ljava/util/concurrent/Future;)V", false); } else if (handlerFuncIndex >= 0) { //参数有CompletionHandler mv.visitVarInsn(ALOAD, 2); mv.visitMethodInsn(INVOKEVIRTUAL, responseName, "finishVoid", "()V", false); diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index 5df3af620..774770796 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -135,17 +135,23 @@ public class SncpResponse extends Response { finish(0, out); } - public final void finishFuture(final Type futureResultType, final CompletionStage future) { + public final void finishFuture(final Type futureResultType, final Future future) { if (future == null) { finishVoid(); - } else { - future.whenComplete((v, t) -> { + } else if (future instanceof CompletionStage) { + ((CompletionStage) future).whenComplete((v, t) -> { if (t != null) { finishError((Throwable) t); } else { finish(futureResultType, v); } }); + } else { + try { + finish(futureResultType, future.get()); + } catch (Exception e) { + finishError(e); + } } } diff --git a/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java b/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java index 79cf1fa80..fe2c7da87 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java @@ -8,7 +8,8 @@ import java.lang.reflect.*; import java.net.*; import java.nio.channels.CompletionHandler; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; +import org.redkale.convert.Convert; import org.redkale.mq.*; import static org.redkale.net.sncp.Sncp.loadMethodActions; import org.redkale.service.*; @@ -41,6 +42,8 @@ public final class SncpServiceInfo { protected final String topic; + protected final Convert convert; + //MQ模式下此字段才有值 protected final MessageAgent messageAgent; @@ -53,11 +56,12 @@ public final class SncpServiceInfo { //远程模式, 可能为null protected Set remoteAddresses; - SncpServiceInfo(String resourceName, Class resourceServiceType, final T service, MessageAgent messageAgent, SncpMessageClient messageClient) { + SncpServiceInfo(String resourceName, Class resourceServiceType, final T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) { this.name = resourceName; this.serviceType = resourceServiceType; - this.serviceid = Sncp.serviceid(name, resourceServiceType); + this.serviceid = Sncp.serviceid(resourceName, resourceServiceType); this.service = service; + this.convert = convert; this.serviceVersion = 0; this.messageAgent = messageAgent; this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); @@ -118,27 +122,31 @@ public final class SncpServiceInfo { protected final Method method; - protected final Type returnObjectType; //void 必须设为 null + protected final Type returnObjectType; //void必须设为null protected final Type[] paramTypes; - protected final Class[] paramClass; + protected final Class[] paramClasses; - protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 + protected final int paramHandlerIndex; - protected final int handlerFuncParamIndex; + protected final int paramHandlerAttachIndex; - protected final int handlerAttachParamIndex; + protected final int paramAddressTargetIndex; - protected final int addressTargetParamIndex; + protected final int paramAddressSourceIndex; - protected final int addressSourceParamIndex; + protected final int paramTopicTargetIndex; - protected final int topicTargetParamIndex; + protected final Class paramHandlerClass; //CompletionHandler参数的类型 - protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture + protected final java.lang.reflect.Type paramHandlerResultType; //CompletionHandler.completed第一个参数的类型 - protected final Creator futureCreator; + protected final java.lang.reflect.Type returnFutureResultType; //返回结果的CompletableFuture的结果泛型类型 + + protected final Class returnFutureClass; //返回结果的CompletableFuture类型 + + protected final Creator returnFutureCreator; //返回CompletableFuture类型的构建器 protected final SncpHeader header; @@ -146,11 +154,9 @@ public final class SncpServiceInfo { SncpServiceAction(final Class serviceImplClass, Method method, Uint128 serviceid, Uint128 actionid) { this.actionid = actionid == null ? Sncp.actionid(method) : actionid; Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass); - this.returnObjectType = rt == void.class ? null : rt; - this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); - this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; + this.returnObjectType = rt == void.class || rt == Void.class ? null : rt; this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), serviceImplClass); - this.paramClass = method.getParameterTypes(); + this.paramClasses = method.getParameterTypes(); this.method = method; Annotation[][] anns = method.getParameterAnnotations(); int tpoicAddrIndex = -1; @@ -158,23 +164,35 @@ public final class SncpServiceInfo { int sourceAddrIndex = -1; int handlerAttachIndex = -1; int handlerFuncIndex = -1; - boolean hasattr = false; - Attribute[] atts = new Attribute[paramTypes.length + 1]; - if (anns.length > 0) { - Class[] params = method.getParameterTypes(); - for (int i = 0; i < params.length; i++) { - if (CompletionHandler.class.isAssignableFrom(params[i])) { - if (boolReturnTypeFuture) { - throw new SncpException(method + " have both CompletionHandler and CompletableFuture"); - } - if (handlerFuncIndex >= 0) { - throw new SncpException(method + " have more than one CompletionHandler type parameter"); - } - Sncp.checkAsyncModifier(params[i], method); - handlerFuncIndex = i; - break; + Class handlerFuncClass = null; + java.lang.reflect.Type handlerResultType = null; + Class[] params = method.getParameterTypes(); + for (int i = 0; i < params.length; i++) { + if (CompletionHandler.class.isAssignableFrom(params[i])) { + if (Future.class.isAssignableFrom(method.getReturnType())) { + throw new SncpException(method + " have both CompletionHandler and CompletableFuture"); } + if (handlerFuncIndex >= 0) { + throw new SncpException(method + " have more than one CompletionHandler type parameter"); + } + Sncp.checkAsyncModifier(params[i], method); + handlerFuncIndex = i; + handlerFuncClass = paramClasses[i]; + java.lang.reflect.Type handlerType = TypeToken.getGenericType(method.getTypeParameters()[i], serviceImplClass); + if (handlerType instanceof Class) { + handlerResultType = Object.class; + } else if (handlerType instanceof ParameterizedType) { + handlerResultType = TypeToken.getGenericType(((ParameterizedType) handlerType).getActualTypeArguments()[0], handlerType); + } else { + throw new SncpException(serviceImplClass + " had unknown genericType in " + method); + } + if (method.getReturnType() != void.class) { + throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); + } + break; } + } + if (anns.length > 0) { for (int i = 0; i < anns.length; i++) { if (anns[i].length > 0) { for (Annotation ann : anns[i]) { @@ -183,27 +201,74 @@ public final class SncpServiceInfo { throw new SncpException(method + " have more than one @RpcAttachment parameter"); } handlerAttachIndex = i; - } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - targetAddrIndex = i; - } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { - sourceAddrIndex = i; - } else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) { - tpoicAddrIndex = i; + } else if (ann.annotationType() == RpcTargetAddress.class) { + if (SocketAddress.class.isAssignableFrom(params[i])) { + if (sourceAddrIndex >= 0) { + throw new SncpException(method + " have more than one @RpcTargetAddress parameter"); + } else { + targetAddrIndex = i; + } + } else { + throw new SncpException(method + " must be SocketAddress Type on @RpcTargetAddress parameter"); + } + } else if (ann.annotationType() == RpcSourceAddress.class) { + if (SocketAddress.class.isAssignableFrom(params[i])) { + if (sourceAddrIndex >= 0) { + throw new SncpException(method + " have more than one @RpcSourceAddress parameter"); + } else { + sourceAddrIndex = i; + } + } else { + throw new SncpException(method + " must be SocketAddress Type on @RpcSourceAddress parameter"); + } + } else if (ann.annotationType() == RpcTargetTopic.class) { + if (String.class.isAssignableFrom(params[i])) { + if (sourceAddrIndex >= 0) { + throw new SncpException(method + " have more than one @RpcTargetTopic parameter"); + } else { + tpoicAddrIndex = i; + } + } else { + throw new SncpException(method + " must be String Type on @RpcTargetTopic parameter"); + } } } } } } - this.topicTargetParamIndex = tpoicAddrIndex; - this.addressTargetParamIndex = targetAddrIndex; - this.addressSourceParamIndex = sourceAddrIndex; - this.handlerFuncParamIndex = handlerFuncIndex; - this.handlerAttachParamIndex = handlerAttachIndex; - this.paramAttrs = hasattr ? atts : null; + this.paramTopicTargetIndex = tpoicAddrIndex; + this.paramAddressTargetIndex = targetAddrIndex; + this.paramAddressSourceIndex = sourceAddrIndex; + this.paramHandlerIndex = handlerFuncIndex; + this.paramHandlerClass = handlerFuncClass; + this.paramHandlerResultType = handlerResultType; + this.paramHandlerAttachIndex = handlerAttachIndex; this.header = new SncpHeader(null, serviceid, actionid); - if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { + if (this.paramHandlerIndex >= 0 && method.getReturnType() != void.class) { throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); } + if (Future.class.isAssignableFrom(method.getReturnType())) { + java.lang.reflect.Type futureType = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass); + java.lang.reflect.Type returnType = null; + if (futureType instanceof Class) { + returnType = Object.class; + } else if (futureType instanceof ParameterizedType) { + returnType = TypeToken.getGenericType(((ParameterizedType) futureType).getActualTypeArguments()[0], futureType); + } else { + throw new SncpException(serviceImplClass + " had unknown return genericType in " + method); + } + this.returnFutureResultType = returnType; + this.returnFutureClass = method.getReturnType().isAssignableFrom(CompletableFuture.class) ? CompletableFuture.class : (Class) method.getReturnType(); + if (method.getReturnType().isAssignableFrom(CompletableFuture.class) || CompletableFuture.class.isAssignableFrom(method.getReturnType())) { + this.returnFutureCreator = (Creator) Creator.create(this.returnFutureClass); + } else { + throw new SncpException(serviceImplClass + " return must be CompletableFuture or subclass"); + } + } else { + this.returnFutureResultType = null; + this.returnFutureClass = null; + this.returnFutureCreator = null; + } } public String actionName() { diff --git a/src/main/java/org/redkale/util/Creator.java b/src/main/java/org/redkale/util/Creator.java index b2ac3d8bc..422db2dfc 100644 --- a/src/main/java/org/redkale/util/Creator.java +++ b/src/main/java/org/redkale/util/Creator.java @@ -199,6 +199,10 @@ public interface Creator { clazz = (Class) AbstractMap.SimpleEntry.class; } else if (Iterable.class == clazz) { clazz = (Class) ArrayList.class; + } else if (CompletionStage.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(CompletableFuture.class)) { + clazz = (Class) CompletableFuture.class; + } else if (Future.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(CompletableFuture.class)) { + clazz = (Class) CompletableFuture.class; } Creator creator = CreatorInner.creatorCacheMap.get(clazz); if (creator != null) { @@ -586,6 +590,7 @@ public interface Creator { creatorCacheMap.put(ConcurrentHashMap.class, p -> new ConcurrentHashMap<>()); creatorCacheMap.put(CompletableFuture.class, p -> new CompletableFuture<>()); creatorCacheMap.put(CompletionStage.class, p -> new CompletableFuture<>()); + creatorCacheMap.put(Future.class, p -> new CompletableFuture<>()); creatorCacheMap.put(Map.Entry.class, new Creator() { @Override @ConstructorParameters({"key", "value"}) @@ -626,6 +631,7 @@ public interface Creator { arrayCacheMap.put(ByteBuffer.class, t -> new ByteBuffer[t]); arrayCacheMap.put(SocketAddress.class, t -> new SocketAddress[t]); arrayCacheMap.put(InetSocketAddress.class, t -> new InetSocketAddress[t]); + arrayCacheMap.put(CompletableFuture.class, t -> new CompletableFuture[t]); } static class SimpleClassVisitor extends ClassVisitor { diff --git a/src/test/java/org/redkale/test/convert/BsonMainTest.java b/src/test/java/org/redkale/test/convert/BsonMainTest.java index 089d77d67..0b067e51e 100644 --- a/src/test/java/org/redkale/test/convert/BsonMainTest.java +++ b/src/test/java/org/redkale/test/convert/BsonMainTest.java @@ -39,7 +39,7 @@ public class BsonMainTest { byte[] bytes = convert.convertTo(SimpleEntity.class, entry); System.out.println("长度: " + bytes.length); Assertions.assertEquals(271, bytes.length); - BsonByteBufferWriter writer = convert.pollBsonWriter(() -> ByteBuffer.allocate(1)); + BsonByteBufferWriter writer = convert.pollWriter(() -> ByteBuffer.allocate(1)); convert.convertTo(writer, SimpleEntity.class, entry); ByteBuffer[] buffers = writer.toBuffers(); int len = 0; @@ -71,7 +71,7 @@ public class BsonMainTest { ComplextEntity bean = new ComplextEntity(); byte[] bytes2 = convert.convertTo(Object.class, bean); final int len = bytes2.length; - BsonByteBufferWriter writer = convert.pollBsonWriter(() -> ByteBuffer.allocate(len / 2)); + BsonByteBufferWriter writer = convert.pollWriter(() -> ByteBuffer.allocate(len / 2)); convert.convertTo(writer, bean); bytes2 = writer.toArray(); System.out.println(convert.convertFrom(ComplextEntity.class, bytes2).toString()); diff --git a/src/test/java/org/redkale/test/sncp/TestService.java b/src/test/java/org/redkale/test/sncp/TestService.java index 49113a87b..b1f9c1009 100644 --- a/src/test/java/org/redkale/test/sncp/TestService.java +++ b/src/test/java/org/redkale/test/sncp/TestService.java @@ -28,119 +28,123 @@ public interface TestService extends Service { public CompletableFuture changeName(TestBean bean, String name, int id); - @ResourceType(TestService.class) - public static class TestServiceImpl implements TestService { +} - @Override - public boolean change(TestBean bean, String name, int id) { - return false; - } +@ResourceType(TestService.class) +class TestServiceImpl implements TestService { - @Override - public void insert(BooleanHandler handler, TestBean bean, String name, int id) { - } - - @Override - public void update(long show, short v2, CompletionHandler handler, TestBean bean, String name, int id) { - } - - @Override - public CompletableFuture changeName(TestBean bean, String name, int id) { - return null; - } + @Override + public boolean change(TestBean bean, String name, int id) { + return false; } - public static class BooleanHandler implements CompletionHandler { - - @Override - public void completed(Boolean result, TestBean attachment) { - } - - @Override - public void failed(Throwable exc, TestBean attachment) { - } - + public void delete(TestBean bean) { } - public static class DynActionTestService_change extends SncpActionServlet { - - public DynActionTestService_change(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { - super(resourceName, resourceType, service, serviceid, actionid, method); - } - - @Override - public void action(SncpRequest request, SncpResponse response) throws Throwable { - Convert convert = request.getConvert(); - Reader in = request.getReader(); - TestBean arg1 = convert.convertFrom(paramTypes[1], in); - String arg2 = convert.convertFrom(paramTypes[2], in); - int arg3 = convert.convertFrom(paramTypes[3], in); - TestService serviceObj = (TestService) service(); - Object rs = serviceObj.change(arg1, arg2, arg3); - response.finish(boolean.class, rs); - } + @Override + public void insert(BooleanHandler handler, TestBean bean, String name, int id) { } - public static class DynActionTestService_insert extends SncpActionServlet { - - public DynActionTestService_insert(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { - super(resourceName, resourceType, service, serviceid, actionid, method); - } - - @Override - public void action(SncpRequest request, SncpResponse response) throws Throwable { - Convert convert = request.getConvert(); - Reader in = request.getReader(); - BooleanHandler arg0 = response.getParamAsyncHandler(); - convert.convertFrom(CompletionHandler.class, in); - TestBean arg1 = convert.convertFrom(paramTypes[2], in); - String arg2 = convert.convertFrom(paramTypes[3], in); - int arg3 = convert.convertFrom(paramTypes[4], in); - TestService serviceObj = (TestService) service(); - serviceObj.insert(arg0, arg1, arg2, arg3); - response.finishVoid(); - } + @Override + public void update(long show, short v2, CompletionHandler handler, TestBean bean, String name, int id) { } - public static class DynActionTestService_update extends SncpActionServlet { - - public DynActionTestService_update(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { - super(resourceName, resourceType, service, serviceid, actionid, method); - } - - @Override - public void action(SncpRequest request, SncpResponse response) throws Throwable { - Convert convert = request.getConvert(); - Reader in = request.getReader(); - long a1 = convert.convertFrom(paramTypes[1], in); - short a2 = convert.convertFrom(paramTypes[2], in); - CompletionHandler a3 = response.getParamAsyncHandler(); - convert.convertFrom(CompletionHandler.class, in); - TestBean arg1 = convert.convertFrom(paramTypes[4], in); - String arg2 = convert.convertFrom(paramTypes[5], in); - int arg3 = convert.convertFrom(paramTypes[6], in); - TestService serviceObj = (TestService) service(); - serviceObj.update(a1, a2, a3, arg1, arg2, arg3); - response.finishVoid(); - } - } - - public static class DynActionTestService_changeName extends SncpActionServlet { - - public DynActionTestService_changeName(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { - super(resourceName, resourceType, service, serviceid, actionid, method); - } - - @Override - public void action(SncpRequest request, SncpResponse response) throws Throwable { - Convert convert = request.getConvert(); - Reader in = request.getReader(); - TestBean arg1 = convert.convertFrom(paramTypes[1], in); - String arg2 = convert.convertFrom(paramTypes[2], in); - int arg3 = convert.convertFrom(paramTypes[3], in); - TestService serviceObj = (TestService) service(); - CompletableFuture future = serviceObj.changeName(arg1, arg2, arg3); - response.finishFuture(paramHandlerResultType, future); - } + @Override + public CompletableFuture changeName(TestBean bean, String name, int id) { + return null; + } +} + +class BooleanHandler implements CompletionHandler { + + @Override + public void completed(Boolean result, TestBean attachment) { + } + + @Override + public void failed(Throwable exc, TestBean attachment) { + } + +} + +class DynActionTestService_change extends SncpActionServlet { + + public DynActionTestService_change(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { + super(resourceName, resourceType, service, serviceid, actionid, method); + } + + @Override + public void action(SncpRequest request, SncpResponse response) throws Throwable { + Convert convert = request.getConvert(); + Reader in = request.getReader(); + TestBean arg1 = convert.convertFrom(paramTypes[1], in); + String arg2 = convert.convertFrom(paramTypes[2], in); + int arg3 = convert.convertFrom(paramTypes[3], in); + TestService serviceObj = (TestService) service(); + Object rs = serviceObj.change(arg1, arg2, arg3); + response.finish(boolean.class, rs); + } +} + +class DynActionTestService_insert extends SncpActionServlet { + + public DynActionTestService_insert(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { + super(resourceName, resourceType, service, serviceid, actionid, method); + } + + @Override + public void action(SncpRequest request, SncpResponse response) throws Throwable { + Convert convert = request.getConvert(); + Reader in = request.getReader(); + BooleanHandler arg0 = response.getParamAsyncHandler(); + convert.convertFrom(CompletionHandler.class, in); + TestBean arg1 = convert.convertFrom(paramTypes[2], in); + String arg2 = convert.convertFrom(paramTypes[3], in); + int arg3 = convert.convertFrom(paramTypes[4], in); + TestService serviceObj = (TestService) service(); + serviceObj.insert(arg0, arg1, arg2, arg3); + response.finishVoid(); + } +} + +class DynActionTestService_update extends SncpActionServlet { + + public DynActionTestService_update(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { + super(resourceName, resourceType, service, serviceid, actionid, method); + } + + @Override + public void action(SncpRequest request, SncpResponse response) throws Throwable { + Convert convert = request.getConvert(); + Reader in = request.getReader(); + long a1 = convert.convertFrom(paramTypes[1], in); + short a2 = convert.convertFrom(paramTypes[2], in); + CompletionHandler a3 = response.getParamAsyncHandler(); + convert.convertFrom(CompletionHandler.class, in); + TestBean arg1 = convert.convertFrom(paramTypes[4], in); + String arg2 = convert.convertFrom(paramTypes[5], in); + int arg3 = convert.convertFrom(paramTypes[6], in); + TestService serviceObj = (TestService) service(); + serviceObj.update(a1, a2, a3, arg1, arg2, arg3); + response.finishVoid(); + } +} + +class DynActionTestService_changeName extends SncpActionServlet { + + public DynActionTestService_changeName(String resourceName, Class resourceType, Service service, Uint128 serviceid, Uint128 actionid, final Method method) { + super(resourceName, resourceType, service, serviceid, actionid, method); + } + + @Override + public void action(SncpRequest request, SncpResponse response) throws Throwable { + Convert convert = request.getConvert(); + Reader in = request.getReader(); + TestBean arg1 = convert.convertFrom(paramTypes[1], in); + String arg2 = convert.convertFrom(paramTypes[2], in); + int arg3 = convert.convertFrom(paramTypes[3], in); + TestService serviceObj = (TestService) service(); + CompletableFuture future = serviceObj.changeName(arg1, arg2, arg3); + response.finishFuture(paramHandlerResultType, future); } }