diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index a6579c66a..84a921eae 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -43,6 +43,10 @@ public final class SncpClient { protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法 + protected final int handlerFuncParamIndex; + + protected final int handlerAttachParamIndex; + protected final int addressTargetParamIndex; protected final int addressSourceParamIndex; @@ -60,16 +64,24 @@ public final class SncpClient { Annotation[][] anns = method.getParameterAnnotations(); int targetAddrIndex = -1; int sourceAddrIndex = -1; - + int handlerAttachIndex = -1; + int handlerFuncIndex = -1; boolean hasattr = false; Attribute[] atts = new Attribute[paramTypes.length + 1]; if (anns.length > 0) { Class[] params = method.getParameterTypes(); - + for (int i = 0; i < params.length; i++) { + if (AsyncHandler.class.isAssignableFrom(params[i])) { + handlerFuncIndex = i; + break; + } + } for (int i = 0; i < anns.length; i++) { if (anns[i].length > 0) { for (Annotation ann : anns[i]) { - if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { + if (ann.annotationType() == RpcAttachment.class) { + handlerAttachIndex = i; + } else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { targetAddrIndex = i; } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { sourceAddrIndex = i; @@ -91,7 +103,10 @@ public final class SncpClient { } this.addressTargetParamIndex = targetAddrIndex; this.addressSourceParamIndex = sourceAddrIndex; + this.handlerFuncParamIndex = handlerFuncIndex; + this.handlerAttachParamIndex = handlerAttachIndex; this.paramAttrs = hasattr ? atts : null; + if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) throw new RuntimeException(method + " has AsyncHandler type parameter but return type is not void"); } @Override @@ -222,8 +237,9 @@ public final class SncpClient { public void remoteSameGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) { final SncpAction action = actions[index]; + if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了 for (InetSocketAddress addr : transport.getRemoteAddresses()) { - remote0(bsonConvert, jsonConvert, transport, addr, action, params); + remote0(null, bsonConvert, jsonConvert, transport, addr, action, params); } } @@ -240,8 +256,9 @@ public final class SncpClient { public void remoteDiffGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport[] transports, final int index, final Object... params) { if (transports == null || transports.length < 1) return; final SncpAction action = actions[index]; + if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了 for (Transport transport : transports) { - remote0(bsonConvert, jsonConvert, transport, null, action, params); + remote0(null, bsonConvert, jsonConvert, transport, null, action, params); } } @@ -259,8 +276,10 @@ public final class SncpClient { //只给远程模式调用的 public T remote(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) { final SncpAction action = actions[index]; - SncpFuture future = remote0(bsonConvert, jsonConvert, transport, null, action, params); - + final AsyncHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (AsyncHandler) params[action.handlerFuncParamIndex] : null; + if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; + SncpFuture future = remote0(handlerFunc, bsonConvert, jsonConvert, transport, null, action, params); + if (handlerFunc != null) return null; final BsonReader reader = bsonConvert.pollBsonReader(); try { reader.setBytes(future.get(5, TimeUnit.SECONDS)); @@ -282,22 +301,23 @@ public final class SncpClient { if (transports == null || transports.length < 1) return; remote(bsonConvert, jsonConvert, transports[0], index, params); for (int i = 1; i < transports.length; i++) { - remote0(bsonConvert, jsonConvert, transports[i], null, actions[index], params); + remote0(null, bsonConvert, jsonConvert, transports[i], null, actions[index], params); } } - private SncpFuture remote0(final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private SncpFuture remote0(final AsyncHandler handler, final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { if ("rest".equalsIgnoreCase(transport.getSubprotocol())) { - return remoteRest0(jsonConvert, transport, addr0, action, params); + return remoteRest0(handler, jsonConvert, transport, addr0, action, params); } - return remoteSncp0(bsonConvert, transport, addr0, action, params); + return remoteSncp0(handler, bsonConvert, transport, addr0, action, params); } - private SncpFuture remoteRest0(final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + //尚未实现 + private SncpFuture remoteRest0(final AsyncHandler handler, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { return null; } - private SncpFuture remoteSncp0(final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private SncpFuture remoteSncp0(final AsyncHandler handler, final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { Type[] myparamtypes = action.paramTypes; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 @@ -400,6 +420,24 @@ public final class SncpClient { future.set(this.body); transport.offerBuffer(buffer); transport.offerConnection(false, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + final BsonReader reader = bsonConvert.pollBsonReader(); + try { + reader.setBytes(this.body); + int i; + while ((i = (reader.readByte() & 0xff)) != 0) { + final Attribute attr = action.paramAttrs[i]; + attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader)); + } + Object rs = bsonConvert.convertFrom(action.resultTypes, reader); + handler.completed(rs, handlerAttach); + } catch (Exception e) { + handler.failed(e, handlerAttach); + } finally { + bsonConvert.offerBsonReader(reader); + } + } } @Override @@ -408,6 +446,10 @@ public final class SncpClient { future.set(new RuntimeException(action.method + " sncp remote exec failed")); transport.offerBuffer(buffer); transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(exc, handlerAttach); + } } }); } diff --git a/src/org/redkale/service/CacheSourceService.java b/src/org/redkale/service/CacheSourceService.java index 8e49e92e7..8869625b6 100644 --- a/src/org/redkale/service/CacheSourceService.java +++ b/src/org/redkale/service/CacheSourceService.java @@ -191,6 +191,12 @@ public class CacheSourceService implem return !entry.isExpired(); } + @Override + public void exists(final AsyncHandler handler, @RpcAttachment final K key) { + boolean rs = exists(key); + if (handler != null) handler.completed(rs, key); + } + @Override public V get(K key) { if (key == null) return null; @@ -201,6 +207,12 @@ public class CacheSourceService implem return (V) entry.getValue(); } + @Override + public void get(final AsyncHandler handler, @RpcAttachment final K key) { + V rs = get(key); + if (handler != null) handler.completed(rs, key); + } + @Override @RpcMultiRun public V getAndRefresh(K key, final int expireSeconds) { @@ -214,6 +226,12 @@ public class CacheSourceService implem return (V) entry.getValue(); } + @Override + public void getAndRefresh(final AsyncHandler handler, @RpcAttachment final K key, final int expireSeconds) { + V rs = getAndRefresh(key, expireSeconds); + if (handler != null) handler.completed(rs, key); + } + @Override @RpcMultiRun public void refresh(K key, final int expireSeconds) { @@ -224,6 +242,12 @@ public class CacheSourceService implem entry.expireSeconds = expireSeconds; } + @Override + public void refresh(final AsyncVoidHandler handler, @RpcAttachment final K key, final int expireSeconds) { + refresh(key, expireSeconds); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void set(K key, V value) { @@ -239,6 +263,12 @@ public class CacheSourceService implem } } + @Override + public void set(final AsyncVoidHandler handler, @RpcAttachment final K key, final V value) { + set(key, value); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void set(int expireSeconds, K key, V value) { @@ -254,6 +284,12 @@ public class CacheSourceService implem } } + @Override + public void set(final AsyncVoidHandler handler, final int expireSeconds, @RpcAttachment final K key, final V value) { + set(expireSeconds, key, value); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void setExpireSeconds(K key, int expireSeconds) { @@ -263,6 +299,12 @@ public class CacheSourceService implem entry.expireSeconds = expireSeconds; } + @Override + public void setExpireSeconds(final AsyncVoidHandler handler, @RpcAttachment final K key, final int expireSeconds) { + setExpireSeconds(key, expireSeconds); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void remove(K key) { @@ -270,16 +312,34 @@ public class CacheSourceService implem container.remove(key); } + @Override + public void remove(final AsyncVoidHandler handler, @RpcAttachment final K key) { + remove(key); + if (handler != null) handler.completed(null, key); + } + @Override public Collection getCollection(final K key) { return (Collection) get(key); } + @Override + public void getCollection(final AsyncHandler, K> handler, @RpcAttachment final K key) { + Collection rs = getCollection(key); + if (handler != null) handler.completed(rs, key); + } + @Override public Collection getCollectionAndRefresh(final K key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); } + @Override + public void getCollectionAndRefresh(final AsyncHandler, K> handler, @RpcAttachment final K key, final int expireSeconds) { + Collection rs = getCollectionAndRefresh(key, expireSeconds); + if (handler != null) handler.completed(rs, key); + } + @Override @RpcMultiRun public void appendListItem(K key, V value) { @@ -296,6 +356,12 @@ public class CacheSourceService implem } } + @Override + public void appendListItem(final AsyncVoidHandler handler, @RpcAttachment final K key, final V value) { + appendListItem(key, value); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void removeListItem(K key, V value) { @@ -305,6 +371,12 @@ public class CacheSourceService implem ((Collection) entry.getValue()).remove(value); } + @Override + public void removeListItem(final AsyncVoidHandler handler, @RpcAttachment final K key, final V value) { + removeListItem(key, value); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void appendSetItem(K key, V value) { @@ -321,6 +393,12 @@ public class CacheSourceService implem } } + @Override + public void appendSetItem(final AsyncVoidHandler handler, @RpcAttachment final K key, final V value) { + appendSetItem(key, value); + if (handler != null) handler.completed(null, key); + } + @Override @RpcMultiRun public void removeSetItem(K key, V value) { @@ -330,6 +408,12 @@ public class CacheSourceService implem ((Set) entry.getValue()).remove(value); } + @Override + public void removeSetItem(final AsyncVoidHandler handler, @RpcAttachment final K key, final V value) { + removeSetItem(key, value); + if (handler != null) handler.completed(null, key); + } + public static enum CacheEntryType { OBJECT, SET, LIST; } diff --git a/src/org/redkale/service/RpcAttachment.java b/src/org/redkale/service/RpcAttachment.java new file mode 100644 index 000000000..d807f1163 --- /dev/null +++ b/src/org/redkale/service/RpcAttachment.java @@ -0,0 +1,26 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.service; + +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.*; + +/** + * SNCP协议中用于CompletionHandler回调函数中的attach字段。 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +@Inherited +@Documented +@Target({PARAMETER}) +@Retention(RUNTIME) +public @interface RpcAttachment { + +} diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index 8351ecff2..6b674da8c 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -7,6 +7,7 @@ package org.redkale.source; import java.io.*; import java.util.*; +import org.redkale.util.*; /** * @@ -51,4 +52,36 @@ public interface CacheSource { public void removeSetItem(final K key, final V value); + //----------------------异步版--------------------------------- + public void exists(final AsyncHandler handler, final K key); + + public void get(final AsyncHandler handler, final K key); + + public void getAndRefresh(final AsyncHandler handler, final K key, final int expireSeconds); + + public void refresh(final AsyncVoidHandler handler, final K key, final int expireSeconds); + + public void set(final AsyncVoidHandler handler, final K key, final V value); + + public void set(final AsyncVoidHandler handler, final int expireSeconds, final K key, final V value); + + public void setExpireSeconds(final AsyncVoidHandler handler, final K key, final int expireSeconds); + + public void remove(final AsyncVoidHandler handler, final K key); + + public void getCollection(final AsyncHandler, K> handler, final K key); + + public void getCollectionAndRefresh(final AsyncHandler, K> handler, final K key, final int expireSeconds); + + public void appendListItem(final AsyncVoidHandler handler, final K key, final V value); + + public void removeListItem(final AsyncVoidHandler handler, final K key, final V value); + + public void appendSetItem(final AsyncVoidHandler handler, final K key, final V value); + + public void removeSetItem(final AsyncVoidHandler handler, final K key, final V value); + + default void isOpen(final AsyncHandler handler) { + if (handler != null) handler.completed(Boolean.TRUE, null); + } } diff --git a/src/org/redkale/util/AsyncHandler.java b/src/org/redkale/util/AsyncHandler.java new file mode 100644 index 000000000..513ed7bd8 --- /dev/null +++ b/src/org/redkale/util/AsyncHandler.java @@ -0,0 +1,47 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.util; + +import java.util.function.BiConsumer; +import java.nio.channels.CompletionHandler; + +/** + * 异步接口 + * + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 结果对象的泛型 + * @param 附件对象的泛型 + */ +public interface AsyncHandler extends CompletionHandler { + + /** + * 创建 AsyncHandler 对象 + * + * @param 结果对象的泛型 + * @param 附件对象的泛型 + * @param success 成功的回调函数 + * @param fail 失败的回调函数 + * + * @return AsyncHandler + */ + public static AsyncHandler create(final BiConsumer success, final BiConsumer fail) { + return new AsyncHandler() { + @Override + public void completed(V result, A attachment) { + if (success != null) success.accept(result, attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + if (fail != null) fail.accept(exc, attachment); + } + }; + } +} diff --git a/src/org/redkale/util/AsyncVoidHandler.java b/src/org/redkale/util/AsyncVoidHandler.java new file mode 100644 index 000000000..b385483f2 --- /dev/null +++ b/src/org/redkale/util/AsyncVoidHandler.java @@ -0,0 +1,44 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.util; + +import java.util.function.*; + +/** + * 没有返回值的异步接口 + * + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 附件对象的泛型 + */ +public interface AsyncVoidHandler< A> extends AsyncHandler { + + /** + * 创建 AsyncVoidHandler 对象 + * + * @param 附件对象的泛型 + * @param success 成功的回调函数 + * @param fail 失败的回调函数 + * + * @return AsyncHandler + */ + public static < A> AsyncVoidHandler< A> create(final Consumer< A> success, final BiConsumer fail) { + return new AsyncVoidHandler< A>() { + @Override + public void completed(Void result, A attachment) { + if (success != null) success.accept(attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + if (fail != null) fail.accept(exc, attachment); + } + }; + } +}