This commit is contained in:
@@ -51,8 +51,6 @@ public final class SncpClient {
|
||||
|
||||
protected final int addressSourceParamIndex;
|
||||
|
||||
protected final SncpAction syncAction; //异步方法对应的同步方法Action, 只有方法包含AsyncHandler参数时,本字段才有值
|
||||
|
||||
public SncpAction(final Class clazz, Method method, DLong actionid) {
|
||||
this.actionid = actionid == null ? Sncp.hash(method) : actionid;
|
||||
Type rt = method.getGenericReturnType();
|
||||
@@ -117,23 +115,6 @@ public final class SncpClient {
|
||||
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) {
|
||||
throw new RuntimeException(method + " have AsyncHandler type parameter but return type is not void");
|
||||
}
|
||||
if (this.handlerFuncParamIndex >= 0) {
|
||||
List<Class> syncparams = new ArrayList<>();
|
||||
for (Class p : method.getParameterTypes()) {
|
||||
if (!AsyncHandler.class.isAssignableFrom(p)) {
|
||||
syncparams.add(p);
|
||||
}
|
||||
}
|
||||
Method syncMethod = null;
|
||||
try {
|
||||
syncMethod = clazz.getMethod(method.getName(), syncparams.toArray(new Class[syncparams.size()]));
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException("Async menthod (" + method + ") have no sync menthod ");
|
||||
}
|
||||
this.syncAction = new SncpAction(clazz, syncMethod, Sncp.hash(syncMethod));
|
||||
} else {
|
||||
this.syncAction = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -315,7 +296,7 @@ public final class SncpClient {
|
||||
final Attribute attr = action.paramAttrs[i];
|
||||
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
|
||||
}
|
||||
return bsonConvert.convertFrom(action.syncAction == null ? action.resultTypes : action.syncAction.resultTypes, reader);
|
||||
return bsonConvert.convertFrom(action.resultTypes, reader);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
logger.log(Level.SEVERE, actions[index].method + " sncp (params: " + jsonConvert.convertTo(params) + ") remote error", e);
|
||||
throw new RuntimeException(actions[index].method + " sncp remote error", e);
|
||||
@@ -350,13 +331,11 @@ public final class SncpClient {
|
||||
final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
|
||||
writer.writeTo(DEFAULT_HEADER);
|
||||
for (int i = 0; i < params.length; i++) {
|
||||
if (action.handlerFuncParamIndex != i) { //AsyncHandler参数不能传递
|
||||
bsonConvert.convertTo(writer, myparamtypes[i], params[i]);
|
||||
}
|
||||
bsonConvert.convertTo(writer, myparamtypes[i], params[i]);
|
||||
}
|
||||
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
|
||||
final long seqid = System.nanoTime();
|
||||
final DLong actionid = action.syncAction == null ? action.actionid : action.syncAction.actionid;
|
||||
final DLong actionid = action.actionid;
|
||||
final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
|
||||
final AsyncConnection conn = transport.pollConnection(addr);
|
||||
if (conn == null || !conn.isOpen()) {
|
||||
@@ -459,7 +438,7 @@ public final class SncpClient {
|
||||
final Attribute attr = action.paramAttrs[i];
|
||||
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
|
||||
}
|
||||
Object rs = bsonConvert.convertFrom(action.syncAction == null ? action.resultTypes : action.syncAction.resultTypes, reader);
|
||||
Object rs = bsonConvert.convertFrom(action.resultTypes, reader);
|
||||
handler.completed(rs, handlerAttach);
|
||||
} catch (Exception e) {
|
||||
handler.failed(e, handlerAttach);
|
||||
@@ -502,7 +481,7 @@ public final class SncpClient {
|
||||
int version = buffer.getInt();
|
||||
if (version != this.serviceversion) throw new RuntimeException("sncp(" + action.method + ") response.serviceversion = " + serviceversion + ", but request.serviceversion =" + version);
|
||||
DLong raction = DLong.read(buffer);
|
||||
DLong actid = action.syncAction == null ? action.actionid : action.syncAction.actionid;
|
||||
DLong actid = action.actionid;
|
||||
if (!actid.equals(raction)) throw new RuntimeException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + raction + ")");
|
||||
buffer.getInt(); //地址
|
||||
buffer.getChar(); //端口
|
||||
|
||||
@@ -121,6 +121,22 @@ public final class SncpDynServlet extends SncpServlet {
|
||||
out.writeTo(DEFAULT_HEADER);
|
||||
BsonReader in = action.convert.pollBsonReader();
|
||||
try {
|
||||
AsyncHandler handler = action.handlerFuncParamIndex >= 0 ? AsyncHandler.create((v, a) -> {
|
||||
try {
|
||||
action.convert.convertTo(out, Object.class, v);
|
||||
response.finish(0, out);
|
||||
} catch (Exception e) {
|
||||
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", e);
|
||||
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
|
||||
} finally {
|
||||
action.convert.offerBsonReader(in);
|
||||
action.convert.offerBsonWriter(out);
|
||||
}
|
||||
}, (t, a) -> {
|
||||
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
|
||||
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
|
||||
}) : null;
|
||||
|
||||
in.setBytes(request.getBody());
|
||||
action.action(in, out);
|
||||
response.finish(0, out);
|
||||
@@ -145,6 +161,8 @@ public final class SncpDynServlet extends SncpServlet {
|
||||
|
||||
protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type, void的返回参数类型为null
|
||||
|
||||
protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在AsyncHandler参数
|
||||
|
||||
public abstract void action(final BsonReader in, final BsonWriter out) throws Throwable;
|
||||
|
||||
public final void _callParameter(final BsonWriter out, final Object... params) {
|
||||
@@ -236,6 +254,7 @@ public final class SncpDynServlet extends SncpServlet {
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex); //不可能会发生
|
||||
}
|
||||
int handlerFuncIndex = -1;
|
||||
{ // action方法
|
||||
mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + ")V", null, new String[]{"java/lang/Throwable"}));
|
||||
//mv.setDebug(true);
|
||||
@@ -245,6 +264,12 @@ public final class SncpDynServlet extends SncpServlet {
|
||||
final Class[] paramClasses = method.getParameterTypes();
|
||||
int[][] codes = new int[paramClasses.length][2];
|
||||
for (int i = 0; i < paramClasses.length; i++) { //参数
|
||||
if (AsyncHandler.class.isAssignableFrom(paramClasses[i])) {
|
||||
if (handlerFuncIndex >= 0) {
|
||||
throw new RuntimeException(method + " have more than one AsyncHandler type parameter");
|
||||
}
|
||||
handlerFuncIndex = i;
|
||||
}
|
||||
mv.visitVarInsn(ALOAD, 0);
|
||||
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
|
||||
mv.visitVarInsn(ALOAD, 0);
|
||||
@@ -318,44 +343,45 @@ public final class SncpDynServlet extends SncpServlet {
|
||||
}
|
||||
mv.visitVarInsn(ASTORE, store); //11
|
||||
}
|
||||
//------------------------- _callParameter 方法 --------------------------------
|
||||
mv.visitVarInsn(ALOAD, 0);
|
||||
mv.visitVarInsn(ALOAD, 2);
|
||||
if (paramClasses.length <= 5) { //参数总数量
|
||||
mv.visitInsn(ICONST_0 + paramClasses.length);
|
||||
} else {
|
||||
mv.visitIntInsn(BIPUSH, paramClasses.length);
|
||||
}
|
||||
mv.visitTypeInsn(ANEWARRAY, "java/lang/Object");
|
||||
int insn = 2;
|
||||
for (int j = 0; j < paramClasses.length; j++) {
|
||||
final Class pt = paramClasses[j];
|
||||
mv.visitInsn(DUP);
|
||||
insn++;
|
||||
if (j <= 5) {
|
||||
mv.visitInsn(ICONST_0 + j);
|
||||
if (true) {
|
||||
//------------------------- _callParameter 方法 --------------------------------
|
||||
mv.visitVarInsn(ALOAD, 0);
|
||||
mv.visitVarInsn(ALOAD, 2);
|
||||
if (paramClasses.length <= 5) { //参数总数量
|
||||
mv.visitInsn(ICONST_0 + paramClasses.length);
|
||||
} else {
|
||||
mv.visitIntInsn(BIPUSH, j);
|
||||
mv.visitIntInsn(BIPUSH, paramClasses.length);
|
||||
}
|
||||
if (pt.isPrimitive()) {
|
||||
if (pt == long.class) {
|
||||
mv.visitVarInsn(LLOAD, insn++);
|
||||
} else if (pt == float.class) {
|
||||
mv.visitVarInsn(FLOAD, insn++);
|
||||
} else if (pt == double.class) {
|
||||
mv.visitVarInsn(DLOAD, insn++);
|
||||
mv.visitTypeInsn(ANEWARRAY, "java/lang/Object");
|
||||
int insn = 2;
|
||||
for (int j = 0; j < paramClasses.length; j++) {
|
||||
final Class pt = paramClasses[j];
|
||||
mv.visitInsn(DUP);
|
||||
insn++;
|
||||
if (j <= 5) {
|
||||
mv.visitInsn(ICONST_0 + j);
|
||||
} else {
|
||||
mv.visitVarInsn(ILOAD, insn);
|
||||
mv.visitIntInsn(BIPUSH, j);
|
||||
}
|
||||
Class bigclaz = java.lang.reflect.Array.get(java.lang.reflect.Array.newInstance(pt, 1), 0).getClass();
|
||||
mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf", "(" + Type.getDescriptor(pt) + ")" + Type.getDescriptor(bigclaz), false);
|
||||
} else {
|
||||
mv.visitVarInsn(ALOAD, insn);
|
||||
if (pt.isPrimitive()) {
|
||||
if (pt == long.class) {
|
||||
mv.visitVarInsn(LLOAD, insn++);
|
||||
} else if (pt == float.class) {
|
||||
mv.visitVarInsn(FLOAD, insn++);
|
||||
} else if (pt == double.class) {
|
||||
mv.visitVarInsn(DLOAD, insn++);
|
||||
} else {
|
||||
mv.visitVarInsn(ILOAD, insn);
|
||||
}
|
||||
Class bigclaz = java.lang.reflect.Array.get(java.lang.reflect.Array.newInstance(pt, 1), 0).getClass();
|
||||
mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf", "(" + Type.getDescriptor(pt) + ")" + Type.getDescriptor(bigclaz), false);
|
||||
} else {
|
||||
mv.visitVarInsn(ALOAD, insn);
|
||||
}
|
||||
mv.visitInsn(AASTORE);
|
||||
}
|
||||
mv.visitInsn(AASTORE);
|
||||
mv.visitMethodInsn(INVOKEVIRTUAL, newDynName, "_callParameter", "(" + convertWriterDesc + "[Ljava/lang/Object;)V", false);
|
||||
}
|
||||
mv.visitMethodInsn(INVOKEVIRTUAL, newDynName, "_callParameter", "(" + convertWriterDesc + "[Ljava/lang/Object;)V", false);
|
||||
|
||||
//-------------------------直接返回 或者 调用convertTo方法 --------------------------------
|
||||
int maxStack = codes.length > 0 ? codes[codes.length - 1][1] : 1;
|
||||
if (returnClass == void.class) { //返回
|
||||
@@ -398,6 +424,7 @@ public final class SncpDynServlet extends SncpServlet {
|
||||
types[0] = rt;
|
||||
System.arraycopy(ptypes, 0, types, 1, ptypes.length);
|
||||
instance.paramTypes = types;
|
||||
instance.handlerFuncParamIndex = handlerFuncIndex;
|
||||
|
||||
org.redkale.util.Attribute[] atts = new org.redkale.util.Attribute[ptypes.length + 1];
|
||||
Annotation[][] anns = method.getParameterAnnotations();
|
||||
|
||||
@@ -147,14 +147,14 @@ public class ABMainService implements Service {
|
||||
|
||||
@RestMapping(name = "syncabtime")
|
||||
public String abCurrentTime(@RestParam(name = "#") final String name) {
|
||||
String rs = "同步abCurrentTime: " + bcService.showCurrentTime(name);
|
||||
String rs = "同步abCurrentTime: " + bcService.bcCurrentTime(name);
|
||||
System.out.println("执行了 ABMainService.abCurrentTime++++同步方法");
|
||||
return rs;
|
||||
}
|
||||
|
||||
@RestMapping(name = "asyncabtime")
|
||||
public void abCurrentTime(final AsyncHandler<String, Void> handler, @RestParam(name = "#") final String name) {
|
||||
bcService.showCurrentTime(AsyncHandler.create((v, a) -> {
|
||||
bcService.bcCurrentTime(AsyncHandler.create((v, a) -> {
|
||||
System.out.println("执行了 ABMainService.abCurrentTime----异步方法");
|
||||
String rs = "异步abCurrentTime: " + v;
|
||||
if (handler != null) handler.completed(rs, null);
|
||||
|
||||
@@ -18,16 +18,16 @@ public class BCService implements Service {
|
||||
@Resource
|
||||
private CService cService;
|
||||
|
||||
public String showCurrentTime(final String name) {
|
||||
String rs = "同步showCurrentTime: " + cService.getCurrentTime(name).getResult();
|
||||
System.out.println("执行了 BCService.showCurrentTime++++同步方法");
|
||||
public String bcCurrentTime(final String name) {
|
||||
String rs = "同步bcCurrentTime: " + cService.ccCurrentTime(name).getResult();
|
||||
System.out.println("执行了 BCService.bcCurrentTime++++同步方法");
|
||||
return rs;
|
||||
}
|
||||
|
||||
public void showCurrentTime(final AsyncHandler<String, Void> handler, final String name) {
|
||||
cService.getCurrentTime(AsyncHandler.create((v, a) -> {
|
||||
System.out.println("执行了 BCService.showCurrentTime----异步方法");
|
||||
String rs = "异步showCurrentTime: " + v.getResult();
|
||||
public void bcCurrentTime(final AsyncHandler<String, Void> handler, final String name) {
|
||||
cService.ccCurrentTime(AsyncHandler.create((v, a) -> {
|
||||
System.out.println("执行了 BCService.bcCurrentTime----异步方法");
|
||||
String rs = "异步bcCurrentTime: " + v.getResult();
|
||||
if (handler != null) handler.completed(rs, null);
|
||||
}, (t, a) -> {
|
||||
}), name);
|
||||
|
||||
@@ -14,15 +14,15 @@ import org.redkale.util.*;
|
||||
*/
|
||||
public class CService implements Service {
|
||||
|
||||
public RetResult<String> getCurrentTime(final String name) {
|
||||
String rs = "同步getCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis());
|
||||
System.out.println("执行了 CService.getCurrentTime++++同步方法");
|
||||
public RetResult<String> ccCurrentTime(final String name) {
|
||||
String rs = "同步ccCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis());
|
||||
System.out.println("执行了 CService.ccCurrentTime++++同步方法");
|
||||
return new RetResult(rs);
|
||||
}
|
||||
|
||||
public void getCurrentTime(final AsyncHandler<RetResult<String>, Void> handler, final String name) {
|
||||
String rs = "异步getCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis());
|
||||
System.out.println("执行了 CService.getCurrentTime----异步方法");
|
||||
public void ccCurrentTime(final AsyncHandler<RetResult<String>, Void> handler, final String name) {
|
||||
String rs = "异步ccCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis());
|
||||
System.out.println("执行了 CService.ccCurrentTime----异步方法");
|
||||
if (handler != null) handler.completed(new RetResult(rs), null);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user