This commit is contained in:
Redkale
2017-03-21 11:45:40 +08:00
parent 63a9005e6b
commit f6b5882cd4
5 changed files with 124 additions and 30 deletions

View File

@@ -0,0 +1,33 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.sncp;
import org.redkale.util.AsyncHandler;
/**
* 异步回调函数
*
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
*/
public abstract class SncpAsyncHandler<V, A> implements AsyncHandler<V, A> {
protected Object[] params;
public Object[] getParams() {
return params;
}
public void setParams(Object... params) {
this.params = params;
}
}

View File

@@ -296,7 +296,7 @@ public final class SncpClient {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
}
return bsonConvert.convertFrom(action.resultTypes, reader);
return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.log(Level.SEVERE, actions[index].method + " sncp (params: " + jsonConvert.convertTo(params) + ") remote error", e);
throw new RuntimeException(actions[index].method + " sncp remote error", e);
@@ -438,7 +438,7 @@ public final class SncpClient {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
}
Object rs = bsonConvert.convertFrom(action.resultTypes, reader);
Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
handler.completed(rs, handlerAttach);
} catch (Exception e) {
handler.failed(e, handlerAttach);

View File

@@ -120,32 +120,39 @@ public final class SncpDynServlet extends SncpServlet {
BsonWriter out = action.convert.pollBsonWriter(bufferSupplier);
out.writeTo(DEFAULT_HEADER);
BsonReader in = action.convert.pollBsonReader();
SncpAsyncHandler handler;
try {
AsyncHandler handler = action.handlerFuncParamIndex >= 0 ? AsyncHandler.create((v, a) -> {
try {
action.convert.convertTo(out, Object.class, v);
response.finish(0, out);
} catch (Exception e) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", e);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
} finally {
action.convert.offerBsonReader(in);
action.convert.offerBsonWriter(out);
handler = action.handlerFuncParamIndex >= 0 ? new SncpAsyncHandler() {
@Override
public void completed(Object result, Object attachment) {
try {
action._callParameter(out, params);
action.convert.convertTo(out, Object.class, result);
response.finish(0, out);
} catch (Exception e) {
failed(e, attachment);
} finally {
action.convert.offerBsonReader(in);
action.convert.offerBsonWriter(out);
}
}
}, (t, a) -> {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
}) : null;
@Override
public void failed(Throwable exc, Object attachment) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", exc);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
}
} : null;
in.setBytes(request.getBody());
action.action(in, out);
response.finish(0, out);
action.action(in, out, handler);
if (handler == null) {
response.finish(0, out);
action.convert.offerBsonReader(in);
action.convert.offerBsonWriter(out);
}
} catch (Throwable t) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
} finally {
action.convert.offerBsonReader(in);
action.convert.offerBsonWriter(out);
}
}
}
@@ -163,7 +170,7 @@ public final class SncpDynServlet extends SncpServlet {
protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在AsyncHandler参数
public abstract void action(final BsonReader in, final BsonWriter out) throws Throwable;
public abstract void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable;
public final void _callParameter(final BsonWriter out, final Object... params) {
if (paramAttrs != null) {
@@ -213,6 +220,8 @@ public final class SncpDynServlet extends SncpServlet {
final String supDynName = SncpServletAction.class.getName().replace('.', '/');
final String serviceName = serviceClass.getName().replace('.', '/');
final String convertName = BsonConvert.class.getName().replace('.', '/');
final String handlerName = SncpAsyncHandler.class.getName().replace('.', '/');
final String asyncHandlerDesc = Type.getDescriptor(SncpAsyncHandler.class);
final String convertReaderDesc = Type.getDescriptor(BsonReader.class);
final String convertWriterDesc = Type.getDescriptor(BsonWriter.class);
final String serviceDesc = Type.getDescriptor(serviceClass);
@@ -256,11 +265,11 @@ public final class SncpDynServlet extends SncpServlet {
}
int handlerFuncIndex = -1;
{ // action方法
mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + ")V", null, new String[]{"java/lang/Throwable"}));
mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + asyncHandlerDesc + ")V", null, new String[]{"java/lang/Throwable"}));
//mv.setDebug(true);
int iconst = ICONST_1;
int intconst = 1;
int store = 3; //action的参数个数+1
int store = 4; //action的参数个数+1
final Class[] paramClasses = method.getParameterTypes();
int[][] codes = new int[paramClasses.length][2];
for (int i = 0; i < paramClasses.length; i++) { //参数
@@ -269,6 +278,19 @@ public final class SncpDynServlet extends SncpServlet {
throw new RuntimeException(method + " have more than one AsyncHandler type parameter");
}
handlerFuncIndex = i;
mv.visitVarInsn(ALOAD, 3);
mv.visitVarInsn(ASTORE, store);
codes[i] = new int[]{ALOAD, store};
store++;
iconst++;
intconst++;
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
mv.visitLdcInsn(Type.getType(Type.getDescriptor(AsyncHandler.class)));
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false);
mv.visitInsn(POP);
continue;
}
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
@@ -321,6 +343,43 @@ public final class SncpDynServlet extends SncpServlet {
intconst++;
store++;
}
if (handlerFuncIndex >= 0) { //调用SncpAsyncHandler.setParams(Object... params)
mv.visitVarInsn(ALOAD, 3);
if (paramClasses.length > 5) {
mv.visitIntInsn(BIPUSH, paramClasses.length);
} else {
mv.visitInsn(paramClasses.length + ICONST_0);
}
mv.visitTypeInsn(ANEWARRAY, "java/lang/Object");
int insn = 3; //action的参数个数
for (int j = 0; j < paramClasses.length; j++) {
final Class pt = paramClasses[j];
mv.visitInsn(DUP);
insn++;
if (j <= 5) {
mv.visitInsn(ICONST_0 + j);
} else {
mv.visitIntInsn(BIPUSH, j);
}
if (pt.isPrimitive()) {
if (pt == long.class) {
mv.visitVarInsn(LLOAD, insn++);
} else if (pt == float.class) {
mv.visitVarInsn(FLOAD, insn++);
} else if (pt == double.class) {
mv.visitVarInsn(DLOAD, insn++);
} else {
mv.visitVarInsn(ILOAD, insn);
}
Class bigclaz = java.lang.reflect.Array.get(java.lang.reflect.Array.newInstance(pt, 1), 0).getClass();
mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf", "(" + Type.getDescriptor(pt) + ")" + Type.getDescriptor(bigclaz), false);
} else {
mv.visitVarInsn(ALOAD, insn);
}
mv.visitInsn(AASTORE);
}
mv.visitMethodInsn(INVOKEVIRTUAL, handlerName, "setParams", "([Ljava/lang/Object;)V", false);
}
{ //调用service
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "service", serviceDesc);
@@ -343,7 +402,7 @@ public final class SncpDynServlet extends SncpServlet {
}
mv.visitVarInsn(ASTORE, store); //11
}
if (true) {
if (handlerFuncIndex < 0) {
//------------------------- _callParameter 方法 --------------------------------
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 2);
@@ -353,7 +412,7 @@ public final class SncpDynServlet extends SncpServlet {
mv.visitIntInsn(BIPUSH, paramClasses.length);
}
mv.visitTypeInsn(ANEWARRAY, "java/lang/Object");
int insn = 2;
int insn = 3;//action的参数个数
for (int j = 0; j < paramClasses.length; j++) {
final Class pt = paramClasses[j];
mv.visitInsn(DUP);

View File

@@ -50,7 +50,7 @@ public class ABMainService implements Service {
//------------------------ 初始化 BCService ------------------------------------
final Transport bctransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577)));
BCService bcservice = Sncp.createLocalService("", null, ResourceFactory.root(), BCService.class, new InetSocketAddress("127.0.0.1", 5588), bctransport, null);
CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), bctransport);
CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), bctransport);
factory.inject(remoteCService);
factory.register("", remoteCService);
SncpServer bcserver = new SncpServer();
@@ -61,7 +61,7 @@ public class ABMainService implements Service {
//------------------------ 初始化 ABMainService ------------------------------------
final Transport abtransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588)));
ABMainService service = Sncp.createLocalService("", null, ResourceFactory.root(), ABMainService.class, new InetSocketAddress("127.0.0.1", 5599), bctransport, null);
BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), abtransport);
BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), abtransport);
factory.inject(remoteBCService);
factory.register("", remoteBCService);
@@ -157,8 +157,9 @@ public class ABMainService implements Service {
bcService.bcCurrentTime(AsyncHandler.create((v, a) -> {
System.out.println("执行了 ABMainService.abCurrentTime----异步方法");
String rs = "异步abCurrentTime: " + v;
if (handler != null) handler.completed(rs, null);
if (handler != null) handler.completed(rs, a);
}, (t, a) -> {
if (handler != null) handler.failed(t, a);
}), name);
}
}

View File

@@ -27,9 +27,10 @@ public class BCService implements Service {
public void bcCurrentTime(final AsyncHandler<String, Void> handler, final String name) {
cService.ccCurrentTime(AsyncHandler.create((v, a) -> {
System.out.println("执行了 BCService.bcCurrentTime----异步方法");
String rs = "异步bcCurrentTime: " + v.getResult();
String rs = "异步bcCurrentTime: " + (v == null ? null : v.getResult());
if (handler != null) handler.completed(rs, null);
}, (t, a) -> {
if (handler != null) handler.failed(t, a);
}), name);
}
}