This commit is contained in:
@@ -43,6 +43,10 @@ public final class SncpClient {
|
||||
|
||||
protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理,index=0固定为null, 其他为参数标记的RpcCall回调方法
|
||||
|
||||
protected final int handlerFuncParamIndex;
|
||||
|
||||
protected final int handlerAttachParamIndex;
|
||||
|
||||
protected final int addressTargetParamIndex;
|
||||
|
||||
protected final int addressSourceParamIndex;
|
||||
@@ -60,16 +64,24 @@ public final class SncpClient {
|
||||
Annotation[][] anns = method.getParameterAnnotations();
|
||||
int targetAddrIndex = -1;
|
||||
int sourceAddrIndex = -1;
|
||||
|
||||
int handlerAttachIndex = -1;
|
||||
int handlerFuncIndex = -1;
|
||||
boolean hasattr = false;
|
||||
Attribute[] atts = new Attribute[paramTypes.length + 1];
|
||||
if (anns.length > 0) {
|
||||
Class<?>[] params = method.getParameterTypes();
|
||||
|
||||
for (int i = 0; i < params.length; i++) {
|
||||
if (AsyncHandler.class.isAssignableFrom(params[i])) {
|
||||
handlerFuncIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < anns.length; i++) {
|
||||
if (anns[i].length > 0) {
|
||||
for (Annotation ann : anns[i]) {
|
||||
if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
|
||||
if (ann.annotationType() == RpcAttachment.class) {
|
||||
handlerAttachIndex = i;
|
||||
} else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
|
||||
targetAddrIndex = i;
|
||||
} else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
|
||||
sourceAddrIndex = i;
|
||||
@@ -91,7 +103,10 @@ public final class SncpClient {
|
||||
}
|
||||
this.addressTargetParamIndex = targetAddrIndex;
|
||||
this.addressSourceParamIndex = sourceAddrIndex;
|
||||
this.handlerFuncParamIndex = handlerFuncIndex;
|
||||
this.handlerAttachParamIndex = handlerAttachIndex;
|
||||
this.paramAttrs = hasattr ? atts : null;
|
||||
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) throw new RuntimeException(method + " has AsyncHandler type parameter but return type is not void");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -222,8 +237,9 @@ public final class SncpClient {
|
||||
|
||||
public void remoteSameGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) {
|
||||
final SncpAction action = actions[index];
|
||||
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了
|
||||
for (InetSocketAddress addr : transport.getRemoteAddresses()) {
|
||||
remote0(bsonConvert, jsonConvert, transport, addr, action, params);
|
||||
remote0(null, bsonConvert, jsonConvert, transport, addr, action, params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,8 +256,9 @@ public final class SncpClient {
|
||||
public void remoteDiffGroup(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport[] transports, final int index, final Object... params) {
|
||||
if (transports == null || transports.length < 1) return;
|
||||
final SncpAction action = actions[index];
|
||||
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了
|
||||
for (Transport transport : transports) {
|
||||
remote0(bsonConvert, jsonConvert, transport, null, action, params);
|
||||
remote0(null, bsonConvert, jsonConvert, transport, null, action, params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,8 +276,10 @@ public final class SncpClient {
|
||||
//只给远程模式调用的
|
||||
public <T> T remote(final BsonConvert bsonConvert, final JsonConvert jsonConvert, Transport transport, final int index, final Object... params) {
|
||||
final SncpAction action = actions[index];
|
||||
SncpFuture<byte[]> future = remote0(bsonConvert, jsonConvert, transport, null, action, params);
|
||||
|
||||
final AsyncHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (AsyncHandler) params[action.handlerFuncParamIndex] : null;
|
||||
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null;
|
||||
SncpFuture<byte[]> future = remote0(handlerFunc, bsonConvert, jsonConvert, transport, null, action, params);
|
||||
if (handlerFunc != null) return null;
|
||||
final BsonReader reader = bsonConvert.pollBsonReader();
|
||||
try {
|
||||
reader.setBytes(future.get(5, TimeUnit.SECONDS));
|
||||
@@ -282,22 +301,23 @@ public final class SncpClient {
|
||||
if (transports == null || transports.length < 1) return;
|
||||
remote(bsonConvert, jsonConvert, transports[0], index, params);
|
||||
for (int i = 1; i < transports.length; i++) {
|
||||
remote0(bsonConvert, jsonConvert, transports[i], null, actions[index], params);
|
||||
remote0(null, bsonConvert, jsonConvert, transports[i], null, actions[index], params);
|
||||
}
|
||||
}
|
||||
|
||||
private SncpFuture<byte[]> remote0(final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
|
||||
private SncpFuture<byte[]> remote0(final AsyncHandler handler, final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
|
||||
if ("rest".equalsIgnoreCase(transport.getSubprotocol())) {
|
||||
return remoteRest0(jsonConvert, transport, addr0, action, params);
|
||||
return remoteRest0(handler, jsonConvert, transport, addr0, action, params);
|
||||
}
|
||||
return remoteSncp0(bsonConvert, transport, addr0, action, params);
|
||||
return remoteSncp0(handler, bsonConvert, transport, addr0, action, params);
|
||||
}
|
||||
|
||||
private SncpFuture<byte[]> remoteRest0(final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
|
||||
//尚未实现
|
||||
private SncpFuture<byte[]> remoteRest0(final AsyncHandler handler, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private SncpFuture<byte[]> remoteSncp0(final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
|
||||
private SncpFuture<byte[]> remoteSncp0(final AsyncHandler handler, final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
|
||||
Type[] myparamtypes = action.paramTypes;
|
||||
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
|
||||
final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
|
||||
@@ -400,6 +420,24 @@ public final class SncpClient {
|
||||
future.set(this.body);
|
||||
transport.offerBuffer(buffer);
|
||||
transport.offerConnection(false, conn);
|
||||
if (handler != null) {
|
||||
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
|
||||
final BsonReader reader = bsonConvert.pollBsonReader();
|
||||
try {
|
||||
reader.setBytes(this.body);
|
||||
int i;
|
||||
while ((i = (reader.readByte() & 0xff)) != 0) {
|
||||
final Attribute attr = action.paramAttrs[i];
|
||||
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
|
||||
}
|
||||
Object rs = bsonConvert.convertFrom(action.resultTypes, reader);
|
||||
handler.completed(rs, handlerAttach);
|
||||
} catch (Exception e) {
|
||||
handler.failed(e, handlerAttach);
|
||||
} finally {
|
||||
bsonConvert.offerBsonReader(reader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -408,6 +446,10 @@ public final class SncpClient {
|
||||
future.set(new RuntimeException(action.method + " sncp remote exec failed"));
|
||||
transport.offerBuffer(buffer);
|
||||
transport.offerConnection(true, conn);
|
||||
if (handler != null) {
|
||||
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
|
||||
handler.failed(exc, handlerAttach);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -191,6 +191,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
return !entry.isExpired();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exists(final AsyncHandler<Boolean, K> handler, @RpcAttachment final K key) {
|
||||
boolean rs = exists(key);
|
||||
if (handler != null) handler.completed(rs, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(K key) {
|
||||
if (key == null) return null;
|
||||
@@ -201,6 +207,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
return (V) entry.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void get(final AsyncHandler<V, K> handler, @RpcAttachment final K key) {
|
||||
V rs = get(key);
|
||||
if (handler != null) handler.completed(rs, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public V getAndRefresh(K key, final int expireSeconds) {
|
||||
@@ -214,6 +226,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
return (V) entry.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getAndRefresh(final AsyncHandler<V, K> handler, @RpcAttachment final K key, final int expireSeconds) {
|
||||
V rs = getAndRefresh(key, expireSeconds);
|
||||
if (handler != null) handler.completed(rs, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void refresh(K key, final int expireSeconds) {
|
||||
@@ -224,6 +242,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
entry.expireSeconds = expireSeconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final int expireSeconds) {
|
||||
refresh(key, expireSeconds);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void set(K key, V value) {
|
||||
@@ -239,6 +263,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final V value) {
|
||||
set(key, value);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void set(int expireSeconds, K key, V value) {
|
||||
@@ -254,6 +284,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(final AsyncVoidHandler<K> handler, final int expireSeconds, @RpcAttachment final K key, final V value) {
|
||||
set(expireSeconds, key, value);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setExpireSeconds(K key, int expireSeconds) {
|
||||
@@ -263,6 +299,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
entry.expireSeconds = expireSeconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExpireSeconds(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final int expireSeconds) {
|
||||
setExpireSeconds(key, expireSeconds);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void remove(K key) {
|
||||
@@ -270,16 +312,34 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
container.remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final AsyncVoidHandler<K> handler, @RpcAttachment final K key) {
|
||||
remove(key);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<V> getCollection(final K key) {
|
||||
return (Collection<V>) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getCollection(final AsyncHandler<Collection<V>, K> handler, @RpcAttachment final K key) {
|
||||
Collection<V> rs = getCollection(key);
|
||||
if (handler != null) handler.completed(rs, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds) {
|
||||
return (Collection<V>) getAndRefresh(key, expireSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getCollectionAndRefresh(final AsyncHandler<Collection<V>, K> handler, @RpcAttachment final K key, final int expireSeconds) {
|
||||
Collection<V> rs = getCollectionAndRefresh(key, expireSeconds);
|
||||
if (handler != null) handler.completed(rs, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendListItem(K key, V value) {
|
||||
@@ -296,6 +356,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendListItem(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final V value) {
|
||||
appendListItem(key, value);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeListItem(K key, V value) {
|
||||
@@ -305,6 +371,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
((Collection) entry.getValue()).remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeListItem(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final V value) {
|
||||
removeListItem(key, value);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendSetItem(K key, V value) {
|
||||
@@ -321,6 +393,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendSetItem(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final V value) {
|
||||
appendSetItem(key, value);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeSetItem(K key, V value) {
|
||||
@@ -330,6 +408,12 @@ public class CacheSourceService<K extends Serializable, V extends Object> implem
|
||||
((Set) entry.getValue()).remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSetItem(final AsyncVoidHandler<K> handler, @RpcAttachment final K key, final V value) {
|
||||
removeSetItem(key, value);
|
||||
if (handler != null) handler.completed(null, key);
|
||||
}
|
||||
|
||||
public static enum CacheEntryType {
|
||||
OBJECT, SET, LIST;
|
||||
}
|
||||
|
||||
26
src/org/redkale/service/RpcAttachment.java
Normal file
26
src/org/redkale/service/RpcAttachment.java
Normal file
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.service;
|
||||
|
||||
import static java.lang.annotation.ElementType.PARAMETER;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* SNCP协议中用于CompletionHandler回调函数中的attach字段。
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
@Inherited
|
||||
@Documented
|
||||
@Target({PARAMETER})
|
||||
@Retention(RUNTIME)
|
||||
public @interface RpcAttachment {
|
||||
|
||||
}
|
||||
@@ -7,6 +7,7 @@ package org.redkale.source;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -51,4 +52,36 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
||||
|
||||
public void removeSetItem(final K key, final V value);
|
||||
|
||||
//----------------------异步版---------------------------------
|
||||
public void exists(final AsyncHandler<Boolean, K> handler, final K key);
|
||||
|
||||
public void get(final AsyncHandler<V, K> handler, final K key);
|
||||
|
||||
public void getAndRefresh(final AsyncHandler<V, K> handler, final K key, final int expireSeconds);
|
||||
|
||||
public void refresh(final AsyncVoidHandler<K> handler, final K key, final int expireSeconds);
|
||||
|
||||
public void set(final AsyncVoidHandler<K> handler, final K key, final V value);
|
||||
|
||||
public void set(final AsyncVoidHandler<K> handler, final int expireSeconds, final K key, final V value);
|
||||
|
||||
public void setExpireSeconds(final AsyncVoidHandler<K> handler, final K key, final int expireSeconds);
|
||||
|
||||
public void remove(final AsyncVoidHandler<K> handler, final K key);
|
||||
|
||||
public void getCollection(final AsyncHandler<Collection<V>, K> handler, final K key);
|
||||
|
||||
public void getCollectionAndRefresh(final AsyncHandler<Collection<V>, K> handler, final K key, final int expireSeconds);
|
||||
|
||||
public void appendListItem(final AsyncVoidHandler<K> handler, final K key, final V value);
|
||||
|
||||
public void removeListItem(final AsyncVoidHandler<K> handler, final K key, final V value);
|
||||
|
||||
public void appendSetItem(final AsyncVoidHandler<K> handler, final K key, final V value);
|
||||
|
||||
public void removeSetItem(final AsyncVoidHandler<K> handler, final K key, final V value);
|
||||
|
||||
default void isOpen(final AsyncHandler<Boolean, Void> handler) {
|
||||
if (handler != null) handler.completed(Boolean.TRUE, null);
|
||||
}
|
||||
}
|
||||
|
||||
47
src/org/redkale/util/AsyncHandler.java
Normal file
47
src/org/redkale/util/AsyncHandler.java
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.util;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
|
||||
/**
|
||||
* 异步接口
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
* @param <V> 结果对象的泛型
|
||||
* @param <A> 附件对象的泛型
|
||||
*/
|
||||
public interface AsyncHandler<V, A> extends CompletionHandler<V, A> {
|
||||
|
||||
/**
|
||||
* 创建 AsyncHandler 对象
|
||||
*
|
||||
* @param <V> 结果对象的泛型
|
||||
* @param <A> 附件对象的泛型
|
||||
* @param success 成功的回调函数
|
||||
* @param fail 失败的回调函数
|
||||
*
|
||||
* @return AsyncHandler
|
||||
*/
|
||||
public static <V, A> AsyncHandler<V, A> create(final BiConsumer<V, A> success, final BiConsumer<Throwable, A> fail) {
|
||||
return new AsyncHandler<V, A>() {
|
||||
@Override
|
||||
public void completed(V result, A attachment) {
|
||||
if (success != null) success.accept(result, attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, A attachment) {
|
||||
if (fail != null) fail.accept(exc, attachment);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
44
src/org/redkale/util/AsyncVoidHandler.java
Normal file
44
src/org/redkale/util/AsyncVoidHandler.java
Normal file
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.util;
|
||||
|
||||
import java.util.function.*;
|
||||
|
||||
/**
|
||||
* 没有返回值的异步接口
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
* @param <A> 附件对象的泛型
|
||||
*/
|
||||
public interface AsyncVoidHandler< A> extends AsyncHandler<Void, A> {
|
||||
|
||||
/**
|
||||
* 创建 AsyncVoidHandler 对象
|
||||
*
|
||||
* @param <A> 附件对象的泛型
|
||||
* @param success 成功的回调函数
|
||||
* @param fail 失败的回调函数
|
||||
*
|
||||
* @return AsyncHandler
|
||||
*/
|
||||
public static < A> AsyncVoidHandler< A> create(final Consumer< A> success, final BiConsumer<Throwable, A> fail) {
|
||||
return new AsyncVoidHandler< A>() {
|
||||
@Override
|
||||
public void completed(Void result, A attachment) {
|
||||
if (success != null) success.accept(attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, A attachment) {
|
||||
if (fail != null) fail.accept(exc, attachment);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user