增加SncpClientRequest

This commit is contained in:
redkale
2023-02-02 13:47:34 +08:00
parent ea9505c2da
commit cfc20c6248
23 changed files with 1282 additions and 766 deletions

View File

@@ -107,7 +107,7 @@ public final class Application {
/**
* 当前Service所属的SNCP Server的地址 类型: SocketAddress、InetSocketAddress、String <br>
*/
public static final String RESNAME_SNCP_ADDR = "SNCP_ADDR";
public static final String RESNAME_SNCP_ADDRESS = "SNCP_ADDRESS";
/**
* 当前Service所属的SNCP Server所属的组 类型: String<br>
@@ -942,7 +942,7 @@ public final class Application {
});
}
if (!compileMode) {
properties.put(SncpClient.class.getSimpleName() + ".handlers", LoggingFileHandler.LoggingSncpFileHandler.class.getName());
properties.put(SncpOldClient.class.getSimpleName() + ".handlers", LoggingFileHandler.LoggingSncpFileHandler.class.getName());
}
}
if (compileMode) {

View File

@@ -14,7 +14,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.stream.Stream;
import org.redkale.annotation.*;
import static org.redkale.boot.Application.RESNAME_SNCP_ADDR;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.ClusterAgent;
import org.redkale.mq.MessageAgent;
@@ -25,6 +24,7 @@ import org.redkale.service.Service;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.util.*;
import org.redkale.watch.*;
import static org.redkale.boot.Application.RESNAME_SNCP_ADDRESS;
/**
* HTTP Server节点的配置Server
@@ -136,10 +136,10 @@ public class NodeHttpServer extends NodeServer {
if (nodeService == null) {
nodeService = (Service) rf.find(resourceName, WebSocketNode.class);
}
if (sncpResFactory != null && resourceFactory.find(RESNAME_SNCP_ADDR, String.class) == null) {
resourceFactory.register(RESNAME_SNCP_ADDR, InetSocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDR, InetSocketAddress.class));
resourceFactory.register(RESNAME_SNCP_ADDR, SocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDR, SocketAddress.class));
resourceFactory.register(RESNAME_SNCP_ADDR, String.class, sncpResFactory.find(RESNAME_SNCP_ADDR, String.class));
if (sncpResFactory != null && resourceFactory.find(RESNAME_SNCP_ADDRESS, String.class) == null) {
resourceFactory.register(RESNAME_SNCP_ADDRESS, InetSocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDRESS, InetSocketAddress.class));
resourceFactory.register(RESNAME_SNCP_ADDRESS, SocketAddress.class, sncpResFactory.find(RESNAME_SNCP_ADDRESS, SocketAddress.class));
resourceFactory.register(RESNAME_SNCP_ADDRESS, String.class, sncpResFactory.find(RESNAME_SNCP_ADDRESS, String.class));
}
if (nodeService == null) {
MessageAgent messageAgent = null;

View File

@@ -135,9 +135,10 @@ public abstract class NodeServer {
}
//单点服务不会有 sncpAddress、sncpGroup
if (this.sncpAddress != null) {
this.resourceFactory.register(RESNAME_SNCP_ADDR, this.sncpAddress);
this.resourceFactory.register(RESNAME_SNCP_ADDR, SocketAddress.class, this.sncpAddress);
this.resourceFactory.register(RESNAME_SNCP_ADDR, String.class, this.sncpAddress.getHostString() + ":" + this.sncpAddress.getPort());
this.resourceFactory.register(RESNAME_SNCP_ADDRESS, this.sncpAddress);
this.resourceFactory.register(RESNAME_SNCP_ADDRESS, SocketAddress.class, this.sncpAddress);
this.resourceFactory.register(RESNAME_SNCP_ADDRESS, InetSocketAddress.class, this.sncpAddress);
this.resourceFactory.register(RESNAME_SNCP_ADDRESS, String.class, this.sncpAddress.getHostString() + ":" + this.sncpAddress.getPort());
}
if (this.sncpGroup != null) {
this.resourceFactory.register(RESNAME_SNCP_GROUP, this.sncpGroup);
@@ -290,7 +291,7 @@ public abstract class NodeServer {
}
//ResourceFactory resfactory = (isSNCP() ? appResFactory : resourceFactory);
SncpClient client = srcObj instanceof Service ? Sncp.getSncpClient((Service) srcObj) : null;
SncpOldClient client = srcObj instanceof Service ? Sncp.getSncpOldClient((Service) srcObj) : null;
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final Set<String> groups = new HashSet<>();
Service service = Modifier.isFinal(resServiceType.getModifiers()) ? (Service) resServiceType.getConstructor().newInstance() : Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, null, appResFactory, appSncpTranFactory, sncpAddr, groups, null);
@@ -342,7 +343,7 @@ public abstract class NodeServer {
throw new RuntimeException("CacheSource must be inject in Service, cannot in " + srcObj);
}
final Service srcService = (Service) srcObj;
SncpClient client = Sncp.getSncpClient(srcService);
SncpOldClient client = Sncp.getSncpOldClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
//final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService) && sncpAddr != null; //不配置SNCP服务会导致ws=false时没有注入CacheMemorySource
final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService);
@@ -489,7 +490,7 @@ public abstract class NodeServer {
}
final ResourceTypeLoader resourceLoader = (ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) -> {
try {
if (SncpClient.parseMethodActions(serviceImplClass).isEmpty()
if (SncpOldClient.parseMethodActions(serviceImplClass).isEmpty()
&& (serviceImplClass.getAnnotation(Priority.class) == null && serviceImplClass.getAnnotation(javax.annotation.Priority.class) == null)) { //class没有可用的方法且没有标记启动优先级的 通常为BaseService
if (!serviceImplClass.getName().startsWith("org.redkale.") && !serviceImplClass.getSimpleName().contains("Base")) {
logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method");

View File

@@ -75,7 +75,7 @@ public class TransportWatchService extends AbstractWatchService {
if (!Sncp.isSncpDyn(service)) {
continue;
}
SncpClient client = Sncp.getSncpClient(service);
SncpOldClient client = Sncp.getSncpOldClient(service);
if (Sncp.isRemote(service)) {
if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) {
client.getRemoteGroupTransport().addRemoteAddresses(address);
@@ -117,7 +117,7 @@ public class TransportWatchService extends AbstractWatchService {
if (!Sncp.isSncpDyn(service)) {
continue;
}
SncpClient client = Sncp.getSncpClient(service);
SncpOldClient client = Sncp.getSncpOldClient(service);
if (Sncp.isRemote(service)) {
if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) {
client.getRemoteGroupTransport().removeRemoteAddresses(address);

View File

@@ -12,7 +12,7 @@ import org.redkale.convert.*;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.net.sncp.Sncp;
import org.redkale.net.sncp.SncpHeader;
/**
* 存在MQ里面的数据结构<p>
@@ -318,8 +318,8 @@ public class MessageRecord implements Serializable {
sb.append(",\"respTopic\":\"").append(this.respTopic).append("\"");
}
if (this.content != null) {
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > Sncp.HEADER_SIZE) {
int offset = Sncp.HEADER_SIZE + 1; //循环占位符
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SIZE) {
int offset = SncpHeader.HEADER_SIZE + 1; //循环占位符
Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset);
sb.append(",\"content\":").append(rs);
} else if (this.ctype == CTYPE_HTTP_REQUEST) {

View File

@@ -50,14 +50,14 @@ public class SncpMessageResponse extends SncpResponse {
callback.run();
}
if (out == null) {
final ByteArray result = new ByteArray(Sncp.HEADER_SIZE);
final ByteArray result = new ByteArray(SncpHeader.HEADER_SIZE);
fillHeader(result, 0, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null));
return;
}
final int respBodyLength = out.count(); //body总长度
final ByteArray result = out.toByteArray();
fillHeader(result, respBodyLength - Sncp.HEADER_SIZE, retcode);
fillHeader(result, respBodyLength - SncpHeader.HEADER_SIZE, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes()));
}
}

View File

@@ -41,7 +41,7 @@ public abstract class WebSocketNode {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
//"SNCP_ADDR" 如果不是分布式(没有SNCP) 值为null
@Resource(name = Application.RESNAME_SNCP_ADDR, required = false)
@Resource(name = Application.RESNAME_SNCP_ADDRESS, required = false)
protected InetSocketAddress localSncpAddress; //为SncpServer的服务address
protected WebSocketAddress wsNodeAddress;

View File

@@ -22,7 +22,7 @@ import org.redkale.asm.Type;
import org.redkale.mq.MessageAgent;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.WebSocketNode;
import org.redkale.net.sncp.SncpClient.SncpAction;
import org.redkale.net.sncp.SncpOldClient.SncpAction;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -37,18 +37,7 @@ import org.redkale.util.*;
*/
public abstract class Sncp {
public static final int HEADER_SIZE = 60;
private static final byte[] PING_BYTES = new ByteArray(HEADER_SIZE)
.putLong(0L) //8 seqid
.putChar((char) HEADER_SIZE) //2 headerSize
.putUint128(Uint128.ZERO) //16 serviceid
.putInt(0) //4 serviceVersion
.putUint128(Uint128.ZERO) //16 actionid
.put(new byte[6]) //6 addr
.putInt(0) //4 bodyLength
.putInt(0) //4 retcode
.getBytes();
private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).write(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes();
private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length);
@@ -178,14 +167,14 @@ public abstract class Sncp {
}
}
public static SncpClient getSncpClient(Service service) {
public static SncpOldClient getSncpOldClient(Service service) {
if (service == null || !isSncpDyn(service)) {
return null;
}
try {
Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_client");
ts.setAccessible(true);
return (SncpClient) ts.get(service);
return (SncpOldClient) ts.get(service);
} catch (Exception e) {
throw new SncpException(service + " not found " + FIELDPREFIX + "_client");
}
@@ -228,7 +217,7 @@ public abstract class Sncp {
if (!isSncpDyn(service)) {
return false;
}
SncpClient client = getSncpClient(service);
SncpOldClient client = getSncpOldClient(service);
client.setRemoteGroups(groups);
if (client.getRemoteGroupTransport() != null) {
client.getRemoteGroupTransport().updateRemoteAddresses(addresses);
@@ -330,11 +319,11 @@ public abstract class Sncp {
* &#64;ResourceType(TestService.class)
* public final class _DynLocalTestService extends TestService{
*
* private AnyValue _redkale_conf;
* private AnyValue _redkale_conf;
*
* private SncpClient _redkale_client;
* private SncpOldClient _redkale_client;
*
* &#64;Override
* &#64;Override
* public String toString() {
* return _redkale_selfstring == null ? super.toString() : _redkale_selfstring;
* }
@@ -365,9 +354,9 @@ public abstract class Sncp {
throw new SncpException(serviceImplClass + " is abstract");
}
final String supDynName = serviceImplClass.getName().replace('.', '/');
final String clientName = SncpClient.class.getName().replace('.', '/');
final String clientName = SncpOldClient.class.getName().replace('.', '/');
final String resDesc = Type.getDescriptor(Resource.class);
final String clientDesc = Type.getDescriptor(SncpClient.class);
final String clientDesc = Type.getDescriptor(SncpOldClient.class);
final String anyValueDesc = Type.getDescriptor(AnyValue.class);
final String sncpDynDesc = Type.getDescriptor(SncpDyn.class);
ClassLoader loader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader;
@@ -552,12 +541,12 @@ public abstract class Sncp {
}
} while ((loop = loop.getSuperclass()) != Object.class);
}
SncpClient client = null;
SncpOldClient client = null;
{
try {
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
c.setAccessible(true);
client = new SncpClient(name, serviceImplClass, service, messageAgent, transportFactory, false, newClazz, clientSncpAddress);
client = new SncpOldClient(name, serviceImplClass, service, messageAgent, transportFactory, false, newClazz, clientSncpAddress);
c.set(service, client);
if (transportFactory != null) {
transportFactory.addSncpService(service);
@@ -604,11 +593,11 @@ public abstract class Sncp {
* &#64;ResourceType(TestService.class)
* public final class _DynRemoteTestService extends TestService{
*
* private AnyValue _redkale_conf;
* private AnyValue _redkale_conf;
*
* private SncpClient _redkale_client;
* private SncpOldClient _redkale_client;
*
* &#64;Override
* &#64;Override
* public void createSomeThing(TestBean bean){
* _redkale_client.remote(0, bean);
* }
@@ -664,9 +653,9 @@ public abstract class Sncp {
return null;
}
final String supDynName = serviceTypeOrImplClass.getName().replace('.', '/');
final String clientName = SncpClient.class.getName().replace('.', '/');
final String clientName = SncpOldClient.class.getName().replace('.', '/');
final String resDesc = Type.getDescriptor(Resource.class);
final String clientDesc = Type.getDescriptor(SncpClient.class);
final String clientDesc = Type.getDescriptor(SncpOldClient.class);
final String sncpDynDesc = Type.getDescriptor(SncpDyn.class);
final String anyValueDesc = Type.getDescriptor(AnyValue.class);
final ClassLoader loader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader;
@@ -676,7 +665,7 @@ public abstract class Sncp {
Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.'));
Class newClazz = clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz;
T service = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
SncpOldClient client = new SncpOldClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
if (transportFactory != null) {
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
@@ -800,7 +789,8 @@ public abstract class Sncp {
mv.visitEnd();
}
int i = -1;
for (final SncpAction entry : SncpClient.getSncpActions(realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass)) {
Uint128 serviceid = serviceid(name, serviceTypeOrImplClass);
for (final SncpAction entry : SncpOldClient.getSncpActions(realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, serviceid)) {
final int index = ++i;
final java.lang.reflect.Method method = entry.method;
{
@@ -895,7 +885,7 @@ public abstract class Sncp {
RedkaleClassLoader.putReflectionDeclaredConstructors(newClazz, newDynName.replace('/', '.'));
try {
T service = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
SncpOldClient client = new SncpOldClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
if (transportFactory != null) {
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));

View File

@@ -1,630 +1,25 @@
/*
* To change this license header, choose License Headers reader Project Properties.
* To change this template file, choose Tools | Templates
* and open the template reader the editor.
*
*/
package org.redkale.net.sncp;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import org.redkale.annotation.Resource;
import org.redkale.convert.bson.*;
import org.redkale.convert.json.*;
import org.redkale.mq.*;
import org.redkale.net.*;
import static org.redkale.net.sncp.Sncp.HEADER_SIZE;
import org.redkale.net.sncp.Sncp.SncpDyn;
import static org.redkale.net.sncp.SncpRequest.*;
import static org.redkale.net.sncp.SncpResponse.fillRespHeader;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
import org.redkale.net.client.*;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public final class SncpClient {
public class SncpClient extends Client<SncpClientConnection, SncpClientRequest, SncpClientResult> {
protected static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName());
protected final JsonConvert convert = JsonFactory.root().getConvert();
protected final String name;
protected final boolean remote;
private final Class serviceClass;
protected final InetSocketAddress clientSncpAddress;
private final byte[] addrBytes;
private final int addrPort;
protected final Uint128 serviceid;
protected final int serviceVersion;
protected final SncpAction[] actions;
protected final MessageAgent messageAgent;
protected final SncpMessageClient messageClient;
protected final String topic;
@Resource
protected BsonConvert bsonConvert;
//远程模式, 可能为null
protected Set<String> remoteGroups;
//远程模式, 可能为null
protected Transport remoteGroupTransport;
public <T extends Service> SncpClient(final String serviceResourceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory,
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
this.remote = remote;
this.messageAgent = messageAgent;
this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient();
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service);
this.serviceClass = serviceClass;
this.serviceVersion = 0; //暂不实现Version
this.clientSncpAddress = clientSncpAddress;
this.name = serviceResourceName;
Class<?> serviceResourceType = ResourceFactory.getResourceType(serviceTypeOrImplClass); //serviceResourceType
this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType);
final List<SncpAction> methodens = new ArrayList<>();
//------------------------------------------------------------------------------
for (Map.Entry<Uint128, Method> en : parseMethodActions(serviceClass).entrySet()) {
methodens.add(new SncpAction(serviceClass, en.getValue(), en.getKey()));
}
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
if (this.addrBytes.length != 4) {
throw new SncpException("SNCP clientAddress only support IPv4");
}
}
static List<SncpAction> getSncpActions(final Class serviceClass) {
final List<SncpAction> actions = new ArrayList<>();
//------------------------------------------------------------------------------
for (Map.Entry<Uint128, Method> en : parseMethodActions(serviceClass).entrySet()) {
actions.add(new SncpAction(serviceClass, en.getValue(), en.getKey()));
}
return actions;
}
public MessageAgent getMessageAgent() {
return messageAgent;
}
public InetSocketAddress getClientAddress() {
return clientSncpAddress;
}
public Uint128 getServiceid() {
return serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public int getActionCount() {
return actions.length;
}
public Set<String> getRemoteGroups() {
return remoteGroups;
}
public void setRemoteGroups(Set<String> remoteGroups) {
this.remoteGroups = remoteGroups;
}
public Transport getRemoteGroupTransport() {
return remoteGroupTransport;
}
public void setRemoteGroupTransport(Transport remoteGroupTransport) {
this.remoteGroupTransport = remoteGroupTransport;
@SuppressWarnings("OverridableMethodCallInConstructor")
public SncpClient(String name, AsyncGroup group, String key, ClientAddress address, int maxConns, int maxPipelines) {
super(name, group, true, address, maxConns, maxPipelines, null, null, null); //maxConns
}
@Override
public String toString() {
String service = serviceClass.getName();
if (remote) {
service = service.replace("DynLocalService", "DynRemoteService");
}
return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", name = '" + name
+ "', address = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
+ ", actions.size = " + actions.length + ")";
protected SncpClientConnection createClientConnection(int index, AsyncConnection channel) {
throw new UnsupportedOperationException("Not supported yet.");
}
public String toSimpleString() { //给Sncp产生的Service用
if (DataSource.class.isAssignableFrom(serviceClass) || CacheSource.class.isAssignableFrom(serviceClass)) {
String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")";
}
String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
if (remote) {
service = service.replace("DynLocalService", "DynRemoteService");
}
return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion
+ ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
+ ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups)
+ (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses()))
+ ", actions.size = " + actions.length + ")";
}
public static LinkedHashMap<Uint128, Method> parseMethodActions(final Class serviceClass) {
final List<Method> list = new ArrayList<>();
final List<Method> multis = new ArrayList<>();
final Map<Uint128, Method> actionids = new LinkedHashMap<>();
for (final java.lang.reflect.Method method : serviceClass.getMethods()) {
if (method.isSynthetic()) {
continue;
}
if (Modifier.isStatic(method.getModifiers())) {
continue;
}
if (Modifier.isFinal(method.getModifiers())) {
continue;
}
if (method.getAnnotation(Local.class) != null) {
continue;
}
if (method.getName().equals("getClass") || method.getName().equals("toString")) {
continue;
}
if (method.getName().equals("equals") || method.getName().equals("hashCode")) {
continue;
}
if (method.getName().equals("notify") || method.getName().equals("notifyAll") || method.getName().equals("wait")) {
continue;
}
if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == AnyValue.class) {
if (method.getName().equals("init") || method.getName().equals("stop") || method.getName().equals("destroy")) {
continue;
}
}
Uint128 actionid = Sncp.actionid(method);
Method old = actionids.get(actionid);
if (old != null) {
if (old.getDeclaringClass().equals(method.getDeclaringClass())) {
throw new SncpException(serviceClass.getName() + " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")");
}
continue;
}
actionids.put(actionid, method);
if (method.getAnnotation(SncpDyn.class) != null) {
multis.add(method);
} else {
list.add(method);
}
}
multis.sort((m1, m2) -> m1.getAnnotation(SncpDyn.class).index() - m2.getAnnotation(SncpDyn.class).index());
list.sort((Method o1, Method o2) -> {
if (!o1.getName().equals(o2.getName())) {
return o1.getName().compareTo(o2.getName());
}
if (o1.getParameterCount() != o2.getParameterCount()) {
return o1.getParameterCount() - o2.getParameterCount();
}
return 0;
});
//带SncpDyn必须排在前面
multis.addAll(list);
final LinkedHashMap<Uint128, Method> rs = new LinkedHashMap<>();
for (Method method : multis) {
for (Map.Entry<Uint128, Method> en : actionids.entrySet()) {
if (en.getValue() == method) {
rs.put(en.getKey(), en.getValue());
break;
}
}
}
return rs;
}
//只给远程模式调用的
public <T> T remote(final int index, final Object... params) {
final SncpAction action = actions[index];
final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null;
if (action.handlerFuncParamIndex >= 0) {
params[action.handlerFuncParamIndex] = null;
}
final BsonReader reader = bsonConvert.pollBsonReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥
CompletableFuture result = action.futureCreator.create();
future.whenComplete((v, e) -> {
try {
if (e != null) {
result.completeExceptionally(e);
} else {
reader.setBytes(v);
byte i;
while ((i = reader.readByte()) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
Object rs = bsonConvert.convertFrom(Object.class, reader);
result.complete(rs);
}
} catch (Exception exp) {
result.completeExceptionally(exp);
} finally {
bsonConvert.offerBsonReader(reader);
}
}); //需要获取 Executor
return (T) result;
}
if (handlerFunc != null) {
return null;
}
try {
reader.setBytes(future.get(5, TimeUnit.SECONDS));
byte i;
while ((i = reader.readByte()) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
} catch (RpcRemoteException re) {
throw re;
} catch (TimeoutException e) {
throw new RpcRemoteException(actions[index].method + " sncp remote timeout, params=" + JsonConvert.root().convertTo(params));
} catch (InterruptedException | ExecutionException e) {
throw new RpcRemoteException(actions[index].method + " sncp remote error, params=" + JsonConvert.root().convertTo(params), e);
} finally {
bsonConvert.offerBsonReader(reader);
}
}
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
final String traceid = Traces.currTraceid();
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) {
params[action.addressSourceParamIndex] = this.clientSncpAddress;
}
if (bsonConvert == null) {
bsonConvert = BsonConvert.root();
}
final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入
writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
BsonConvert bcc = bsonConvert;
if (params[i] instanceof org.redkale.service.RetResult) {
org.redkale.convert.Convert cc = ((org.redkale.service.RetResult) params[i]).convert();
if (cc instanceof BsonConvert) {
bcc = (BsonConvert) cc;
}
}
bcc.convertTo(writer, CompletionHandler.class.isAssignableFrom(myparamclass[i]) ? CompletionHandler.class : myparamtypes[i], params[i]);
}
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime();
final Uint128 actionid = action.actionid;
if (messageAgent != null) { //MQ模式
final ByteArray reqbytes = writer.toByteArray();
fillHeader(reqbytes, seqid, actionid, traceid, reqBodyLength);
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
if (targetTopic == null) {
targetTopic = this.topic;
}
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes.getBytes());
final String tt = targetTopic;
if (logger.isLoggable(Level.FINER)) {
message.attach(Utility.append(new Object[]{action.actionName()}, params));
} else {
message.attach(params);
}
return messageClient.sendMessage(message).thenApply(msg -> {
if (msg == null || msg.getContent() == null) {
logger.log(Level.SEVERE, action.method + " sncp mq(params: " + convert.convertTo(params) + ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg);
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
byte[] body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
return body;
});
}
final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr);
return connFuture.thenCompose(conn0 -> {
final CompletableFuture<byte[]> future = new CompletableFuture();
if (conn0 == null) {
future.completeExceptionally(new RpcRemoteException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect, params=" + JsonConvert.root().convertTo(params)));
return future;
}
if (!conn0.isOpen()) {
conn0.dispose();
future.completeExceptionally(new RpcRemoteException("sncp " + conn0.getRemoteAddress() + " cannot connect, params=" + JsonConvert.root().convertTo(params)));
return future;
}
final AsyncConnection conn = conn0;
final ByteArray array = writer.toByteArray();
fillHeader(array, seqid, actionid, traceid, reqBodyLength);
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachments) {
//----------------------- 读取返回结果 -------------------------------------
conn.read(new CompletionHandler<Integer, ByteBuffer>() {
private byte[] body;
private int received;
@Override
public void completed(Integer count, ByteBuffer buffer) {
try {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params)));
conn.offerReadBuffer(buffer);
transport.offerConnection(true, conn);
return;
}
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
conn.setReadBuffer(buffer);
conn.read(this);
return;
}
buffer.flip();
if (received > 0) {
int offset = this.received;
this.received += buffer.remaining();
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
} else {
conn.offerReadBuffer(buffer);
success();
}
return;
}
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取
this.body = new byte[respBodyLength];
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
conn.offerReadBuffer(buffer);
success();
}
} catch (Throwable e) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(e, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e);
}
}
@SuppressWarnings("unchecked")
public void success() {
future.complete(this.body);
transport.offerConnection(false, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final BsonReader reader = bsonConvert.pollBsonReader();
try {
reader.setBytes(this.body);
int i;
while ((i = (reader.readByte() & 0xff)) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
handler.completed(rs, handlerAttach);
} catch (Exception e) {
handler.failed(e, handlerAttach);
} finally {
bsonConvert.offerBsonReader(reader);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment2) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
conn.offerReadBuffer(attachment2);
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed, params=" + JsonConvert.root().convertTo(params), exc);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed, params=" + JsonConvert.root().convertTo(params), exc);
}
});
return future;
});
}
private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) {
long rseqid = buffer.getLong();
if (rseqid != seqid) {
throw new SncpException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid);
}
if (buffer.getChar() != HEADER_SIZE) {
throw new SncpException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
}
Uint128 rserviceid = Uint128.read(buffer);
if (!rserviceid.equals(this.serviceid)) {
throw new SncpException("sncp(" + action.method + ") response.serviceid = " + serviceid + ", but request.serviceid =" + rserviceid);
}
int version = buffer.getInt();
if (version != this.serviceVersion) {
throw new SncpException("sncp(" + action.method + ") response.serviceVersion = " + serviceVersion + ", but request.serviceVersion =" + version);
}
Uint128 raction = Uint128.read(buffer);
Uint128 actid = action.actionid;
if (!actid.equals(raction)) {
throw new SncpException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + raction + ")");
}
buffer.getInt(); //地址
buffer.getChar(); //端口
}
private void fillHeader(ByteArray buffer, long seqid, Uint128 actionid, String traceid, int bodyLength) {
fillRespHeader(buffer, seqid, this.serviceid, this.serviceVersion,
actionid, traceid, this.addrBytes, this.addrPort, bodyLength, 0); //结果码, 请求方固定传0
}
protected static final class SncpAction {
protected final Uint128 actionid;
protected final Method method;
protected final Type resultTypes; //void 必须设为 null
protected final Type[] paramTypes;
protected final Class[] paramClass;
protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理index=0固定为null, 其他为参数标记的RpcCall回调方法
protected final int handlerFuncParamIndex;
protected final int handlerAttachParamIndex;
protected final int addressTargetParamIndex;
protected final int addressSourceParamIndex;
protected final int topicTargetParamIndex;
protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture
protected final Creator<? extends CompletableFuture> futureCreator;
@SuppressWarnings("unchecked")
public SncpAction(final Class clazz, Method method, Uint128 actionid) {
this.actionid = actionid == null ? Sncp.actionid(method) : actionid;
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), clazz);
this.resultTypes = rt == void.class ? null : rt;
this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType());
this.futureCreator = boolReturnTypeFuture ? Creator.create((Class<? extends CompletableFuture>) method.getReturnType()) : null;
this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), clazz);
this.paramClass = method.getParameterTypes();
this.method = method;
Annotation[][] anns = method.getParameterAnnotations();
int tpoicAddrIndex = -1;
int targetAddrIndex = -1;
int sourceAddrIndex = -1;
int handlerAttachIndex = -1;
int handlerFuncIndex = -1;
boolean hasattr = false;
Attribute[] atts = new Attribute[paramTypes.length + 1];
if (anns.length > 0) {
Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) {
if (CompletionHandler.class.isAssignableFrom(params[i])) {
if (boolReturnTypeFuture) {
throw new SncpException(method + " have both CompletionHandler and CompletableFuture");
}
if (handlerFuncIndex >= 0) {
throw new SncpException(method + " have more than one CompletionHandler type parameter");
}
Sncp.checkAsyncModifier(params[i], method);
handlerFuncIndex = i;
break;
}
}
for (int i = 0; i < anns.length; i++) {
if (anns[i].length > 0) {
for (Annotation ann : anns[i]) {
if (ann.annotationType() == RpcAttachment.class) {
if (handlerAttachIndex >= 0) {
throw new SncpException(method + " have more than one @RpcAttachment parameter");
}
handlerAttachIndex = i;
} else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
targetAddrIndex = i;
} else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
sourceAddrIndex = i;
} else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) {
tpoicAddrIndex = i;
}
}
}
}
}
this.topicTargetParamIndex = tpoicAddrIndex;
this.addressTargetParamIndex = targetAddrIndex;
this.addressSourceParamIndex = sourceAddrIndex;
this.handlerFuncParamIndex = handlerFuncIndex;
this.handlerAttachParamIndex = handlerAttachIndex;
this.paramAttrs = hasattr ? atts : null;
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}
}
public String actionName() {
return method.getDeclaringClass().getSimpleName() + "." + method.getName();
}
@Override
public String toString() {
return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}";
}
}
}

View File

@@ -0,0 +1,93 @@
/*
*
*/
package org.redkale.net.sncp;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
import org.redkale.net.client.ClientCodec;
import org.redkale.util.ByteArray;
/**
*
* @author zhangjx
*/
public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientResult> {
protected static final Logger logger = Logger.getLogger(SncpClientCodec.class.getSimpleName());
private ByteArray recyclableArray;
protected ByteArray halfBodyBytes;
protected ByteArray halfHeaderBytes;
SncpClientResult lastResult = null;
public SncpClientCodec(SncpClientConnection connection) {
super(connection);
}
protected ByteArray pollArray() {
if (recyclableArray == null) {
recyclableArray = new ByteArray();
return recyclableArray;
}
recyclableArray.clear();
return recyclableArray;
}
@Override
public boolean decodeMessages(ByteBuffer realBuf, ByteArray array) {
SncpClientConnection conn = (SncpClientConnection) connection;
ByteBuffer buffer = realBuf;
boolean hadResult = false;
while (buffer.hasRemaining()) {
if (halfHeaderBytes != null) {
if (buffer.remaining() + halfHeaderBytes.length() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header
halfHeaderBytes.put(buffer);
return hadResult;
}
halfHeaderBytes.put(buffer, SncpHeader.HEADER_SIZE - halfHeaderBytes.length());
//读取完整header
SncpClientResult result = new SncpClientResult();
result.readHeader(halfHeaderBytes);
halfHeaderBytes = null;
if (result.getBodyLength() < 1) {
addMessage(findRequest(result.getRequestid()), result);
lastResult = null;
continue;
}
//还需要读body
lastResult = result;
}
if (lastResult != null) { //buffer不够
if (halfBodyBytes != null) {
if (buffer.remaining() + halfBodyBytes.length() < lastResult.getBodyLength()) { //buffer不足以读取完整body
halfBodyBytes.put(buffer);
return hadResult;
}
halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfHeaderBytes.length());
//读取完整body
lastResult.setBodyContent(halfBodyBytes.getBytes());
halfBodyBytes = null;
addMessage(findRequest(lastResult.getRequestid()), lastResult);
lastResult = null;
continue;
}
}
if (buffer.remaining() < SncpHeader.HEADER_SIZE) { //内容不足以读取完整header
halfHeaderBytes = pollArray();
halfHeaderBytes.put(buffer);
return hadResult;
}
SncpClientRequest request = null;
buffer = realBuf;
}
return hadResult;
}
}

View File

@@ -0,0 +1,24 @@
/*
*
*/
package org.redkale.net.sncp;
import org.redkale.net.AsyncConnection;
import org.redkale.net.client.*;
/**
*
* @author zhangjx
*/
public class SncpClientConnection extends ClientConnection<SncpClientRequest, SncpClientResult> {
public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) {
super(client, index, channel);
}
@Override
protected ClientCodec createCodec() {
return new SncpClientCodec(this);
}
}

View File

@@ -0,0 +1,101 @@
/*
*
*/
package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import java.util.Objects;
import org.redkale.net.client.*;
import org.redkale.util.*;
/**
*
* @author zhangjx
*/
public class SncpClientRequest extends ClientRequest {
private final InetSocketAddress clientSncpAddress;
private final byte[] addrBytes;
private final int addrPort;
private long seqid;
private Uint128 serviceid;
private int serviceVersion;
private Uint128 actionid;
private byte[] bodyContent;
public SncpClientRequest(InetSocketAddress clientSncpAddress) {
this.clientSncpAddress = clientSncpAddress;
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
}
public SncpClientRequest prepare(long seqid, Uint128 serviceid, int serviceVersion, Uint128 actionid, String traceid, byte[] bodyContent) {
super.prepare();
this.seqid = seqid;
this.serviceid = serviceid;
this.serviceVersion = serviceVersion;
this.actionid = actionid;
this.traceid = traceid;
this.bodyContent = bodyContent;
return this;
}
@Override
protected boolean recycle() {
boolean rs = super.recycle();
this.seqid = 0;
this.serviceVersion = 0;
this.serviceid = null;
this.actionid = null;
this.bodyContent = null;
return rs;
}
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
}
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"
+ "seqid = " + seqid
+ ", serviceVersion = " + serviceVersion
+ ", serviceid = " + serviceid
+ ", actionid = " + actionid
+ ", bodyLength = " + (bodyContent == null ? -1 : bodyContent.length)
+ "}";
}
public long getSeqid() {
return seqid;
}
public Uint128 getServiceid() {
return serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public Uint128 getActionid() {
return actionid;
}
public InetSocketAddress getClientSncpAddress() {
return clientSncpAddress;
}
public byte[] getBodyContent() {
return bodyContent;
}
}

View File

@@ -0,0 +1,144 @@
/*
*
*/
package org.redkale.net.sncp;
import java.io.Serializable;
import java.nio.ByteBuffer;
import org.redkale.util.*;
/**
*
* @author zhangjx
*/
public class SncpClientResult {
private long seqid;
private Uint128 serviceid;
private int serviceVersion;
private Uint128 actionid;
private byte[] addrBytes;
private int addrPort;
private int bodyLength;
private byte[] bodyContent;
private int retcode;
protected void readHeader(ByteBuffer buffer) {
this.seqid = buffer.getLong(); //8
buffer.getChar(); //HEADER_SIZE 2
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
this.addrBytes = new byte[4];
buffer.get(this.addrBytes); //addr 4
this.addrPort = buffer.getChar(); //port 2
this.bodyLength = buffer.getInt(); //4
this.retcode = buffer.getInt(); //4
}
protected void readHeader(ByteArray array) {
int offset = 0;
this.seqid = array.getLong(offset); //8
offset += 8;
array.getChar(offset); //HEADER_SIZE 2
offset += 2;
this.serviceid = array.getUint128(offset); //16
offset += 16;
this.serviceVersion = array.getInt(offset); //4
offset += 4;
this.actionid = array.getUint128(offset); //16
offset += 16;
this.addrBytes = array.getBytes(offset, 4); //addr 4
offset += 4;
this.addrPort = array.getChar(offset); //port 2
offset += 2;
this.bodyLength = array.getInt(offset); //4
offset += 4;
this.retcode = array.getInt(offset); //4
}
public Serializable getRequestid() {
return seqid;
}
public long getSeqid() {
return seqid;
}
public void setSeqid(long seqid) {
this.seqid = seqid;
}
public Uint128 getServiceid() {
return serviceid;
}
public void setServiceid(Uint128 serviceid) {
this.serviceid = serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(int serviceVersion) {
this.serviceVersion = serviceVersion;
}
public Uint128 getActionid() {
return actionid;
}
public void setActionid(Uint128 actionid) {
this.actionid = actionid;
}
public byte[] getAddrBytes() {
return addrBytes;
}
public void setAddrBytes(byte[] addrBytes) {
this.addrBytes = addrBytes;
}
public int getAddrPort() {
return addrPort;
}
public void setAddrPort(int addrPort) {
this.addrPort = addrPort;
}
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
public byte[] getBodyContent() {
return bodyContent;
}
public void setBodyContent(byte[] bodyContent) {
this.bodyContent = bodyContent;
}
public int getRetcode() {
return retcode;
}
public void setRetcode(int retcode) {
this.retcode = retcode;
}
}

View File

@@ -82,7 +82,7 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
response.finish(pongBytes);
return;
}
SncpServlet servlet = (SncpServlet) mappingServlet(request.getServiceid());
SncpServlet servlet = (SncpServlet) mappingServlet(request.getHeader().getServiceid());
if (servlet == null) {
response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); //无效serviceid
} else {

View File

@@ -49,7 +49,7 @@ public final class SncpDynServlet extends SncpServlet {
this.maxNameLength = maxNameLength;
this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType);
RedkaleClassLoader.putReflectionPublicMethods(service.getClass().getName());
for (Map.Entry<Uint128, Method> en : SncpClient.parseMethodActions(service.getClass()).entrySet()) {
for (Map.Entry<Uint128, Method> en : SncpOldClient.parseMethodActions(service.getClass()).entrySet()) {
SncpServletAction action;
try {
action = SncpServletAction.create(service, en.getKey(), en.getValue());
@@ -100,7 +100,7 @@ public final class SncpDynServlet extends SncpServlet {
@Override
@SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException {
final SncpServletAction action = actions.get(request.getActionid());
final SncpServletAction action = actions.get(request.getHeader().getActionid());
//logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));
if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid

View File

@@ -0,0 +1,204 @@
/*
*
*/
package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.redkale.util.*;
/**
*
* @author zhangjx
*/
public class SncpHeader {
public static final int HEADER_SIZE = 60;
private static final byte[] EMPTY_ADDR = new byte[4];
private long seqid;
private Uint128 serviceid;
private int serviceVersion;
private Uint128 actionid;
//SncpRequest的值是clientSncpAddressSncpResponse的值是serverSncpAddress
private byte[] addrBytes;
private int addrPort;
private int bodyLength;
private int retcode;
public SncpHeader() {
}
public SncpHeader(InetSocketAddress clientSncpAddress) {
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
}
public SncpHeader(InetSocketAddress clientSncpAddress, Uint128 serviceid, Uint128 actionid) {
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
this.serviceid = serviceid;
this.actionid = actionid;
}
public boolean read(ByteBuffer buffer) {
this.seqid = buffer.getLong(); //8
if (buffer.getChar() != HEADER_SIZE) { //HEADER_SIZE 2
return false;
}
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
this.addrBytes = new byte[4];
buffer.get(this.addrBytes); //addr 4
this.addrPort = buffer.getChar(); //port 2
this.bodyLength = buffer.getInt(); //4
this.retcode = buffer.getInt(); //4
return true;
}
public boolean readHeader(ByteArray array) {
int offset = 0;
this.seqid = array.getLong(offset); //8
offset += 8;
if (array.getChar(offset) != HEADER_SIZE) { //HEADER_SIZE 2
return false;
}
offset += 2;
this.serviceid = array.getUint128(offset); //16
offset += 16;
this.serviceVersion = array.getInt(offset); //4
offset += 4;
this.actionid = array.getUint128(offset); //16
offset += 16;
this.addrBytes = array.getBytes(offset, 4); //addr 4
offset += 4;
this.addrPort = array.getChar(offset); //port 2
offset += 2;
this.bodyLength = array.getInt(offset); //4
offset += 4;
this.retcode = array.getInt(offset); //4
return true;
}
public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) {
byte[] newAddrBytes = address == null ? EMPTY_ADDR : address.getAddress().getAddress();
int newAddrPort = address == null ? 0 : address.getPort();
return write(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode);
}
public ByteArray write(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) {
int offset = 0;
array.putLong(offset, newSeqid);
offset += 8;
array.putChar(offset, (char) HEADER_SIZE);
offset += 2;
array.putUint128(offset, serviceid);
offset += 16;
array.putInt(offset, serviceVersion);
offset += 4;
array.putUint128(offset, actionid);
offset += 16;
array.put(offset, newAddrBytes);
offset += newAddrBytes.length; //4
array.putChar(offset, (char) newAddrPort);
offset += 2;
array.putInt(offset, bodyLength);
offset += 4;
array.putInt(offset, retcode); //4
return array;
}
@Override
public String toString() {
return getClass().getSimpleName()
+ "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid
+ ",serviceVersion=" + this.serviceVersion
+ ",actionid=" + this.actionid
+ ",address=" + getAddress()
+ ",bodyLength=" + this.bodyLength
+ ",retcode=" + this.retcode
+ "}";
}
public InetSocketAddress getAddress() {
if (addrBytes == null || addrBytes[0] == 0) {
return null;
}
return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]), addrPort);
}
public long getSeqid() {
return seqid;
}
public void setSeqid(long seqid) {
this.seqid = seqid;
}
public Uint128 getServiceid() {
return serviceid;
}
public void setServiceid(Uint128 serviceid) {
this.serviceid = serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(int serviceVersion) {
this.serviceVersion = serviceVersion;
}
public Uint128 getActionid() {
return actionid;
}
public void setActionid(Uint128 actionid) {
this.actionid = actionid;
}
public byte[] getAddrBytes() {
return addrBytes;
}
public void setAddrBytes(byte[] addrBytes) {
this.addrBytes = addrBytes;
}
public int getAddrPort() {
return addrPort;
}
public void setAddrPort(int addrPort) {
this.addrPort = addrPort;
}
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
public int getRetcode() {
return retcode;
}
public void setRetcode(int retcode) {
this.retcode = retcode;
}
}

View File

@@ -0,0 +1,631 @@
/*
* To change this license header, choose License Headers reader Project Properties.
* To change this template file, choose Tools | Templates
* and open the template reader the editor.
*/
package org.redkale.net.sncp;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import org.redkale.annotation.Resource;
import org.redkale.convert.bson.*;
import org.redkale.convert.json.*;
import org.redkale.mq.*;
import org.redkale.net.*;
import org.redkale.net.sncp.Sncp.SncpDyn;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import static org.redkale.net.sncp.SncpRequest.*;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public final class SncpOldClient {
protected static final Logger logger = Logger.getLogger(SncpOldClient.class.getSimpleName());
protected final JsonConvert convert = JsonFactory.root().getConvert();
protected final String name;
protected final boolean remote;
private final Class serviceClass;
protected final InetSocketAddress clientSncpAddress;
private final byte[] addrBytes;
private final int addrPort;
protected final Uint128 serviceid;
protected final int serviceVersion;
protected final SncpAction[] actions;
protected final MessageAgent messageAgent;
protected final SncpMessageClient messageClient;
protected final String topic;
@Resource
protected BsonConvert bsonConvert;
//远程模式, 可能为null
protected Set<String> remoteGroups;
//远程模式, 可能为null
protected Transport remoteGroupTransport;
public <T extends Service> SncpOldClient(final String serviceResourceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory,
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
this.remote = remote;
this.messageAgent = messageAgent;
this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient();
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service);
this.serviceClass = serviceClass;
this.serviceVersion = 0; //暂不实现Version
this.clientSncpAddress = clientSncpAddress;
this.name = serviceResourceName;
Class<?> serviceResourceType = ResourceFactory.getResourceType(serviceTypeOrImplClass); //serviceResourceType
this.serviceid = Sncp.serviceid(serviceResourceName, serviceResourceType);
final List<SncpAction> methodens = new ArrayList<>();
//------------------------------------------------------------------------------
for (Map.Entry<Uint128, Method> en : parseMethodActions(serviceClass).entrySet()) {
methodens.add(new SncpAction(serviceClass, en.getValue(), serviceid, en.getKey()));
}
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
if (this.addrBytes.length != 4) {
throw new SncpException("SNCP clientAddress only support IPv4");
}
}
static List<SncpAction> getSncpActions(final Class serviceClass, Uint128 serviceid) {
final List<SncpAction> actions = new ArrayList<>();
//------------------------------------------------------------------------------
for (Map.Entry<Uint128, Method> en : parseMethodActions(serviceClass).entrySet()) {
actions.add(new SncpAction(serviceClass, en.getValue(), serviceid, en.getKey()));
}
return actions;
}
public MessageAgent getMessageAgent() {
return messageAgent;
}
public InetSocketAddress getClientAddress() {
return clientSncpAddress;
}
public Uint128 getServiceid() {
return serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public int getActionCount() {
return actions.length;
}
public Set<String> getRemoteGroups() {
return remoteGroups;
}
public void setRemoteGroups(Set<String> remoteGroups) {
this.remoteGroups = remoteGroups;
}
public Transport getRemoteGroupTransport() {
return remoteGroupTransport;
}
public void setRemoteGroupTransport(Transport remoteGroupTransport) {
this.remoteGroupTransport = remoteGroupTransport;
}
@Override
public String toString() {
String service = serviceClass.getName();
if (remote) {
service = service.replace("DynLocalService", "DynRemoteService");
}
return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", name = '" + name
+ "', address = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
+ ", actions.size = " + actions.length + ")";
}
public String toSimpleString() { //给Sncp产生的Service用
if (DataSource.class.isAssignableFrom(serviceClass) || CacheSource.class.isAssignableFrom(serviceClass)) {
String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")";
}
String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
if (remote) {
service = service.replace("DynLocalService", "DynRemoteService");
}
return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion
+ ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
+ ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups)
+ (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses()))
+ ", actions.size = " + actions.length + ")";
}
public static LinkedHashMap<Uint128, Method> parseMethodActions(final Class serviceClass) {
final List<Method> list = new ArrayList<>();
final List<Method> multis = new ArrayList<>();
final Map<Uint128, Method> actionids = new LinkedHashMap<>();
for (final java.lang.reflect.Method method : serviceClass.getMethods()) {
if (method.isSynthetic()) {
continue;
}
if (Modifier.isStatic(method.getModifiers())) {
continue;
}
if (Modifier.isFinal(method.getModifiers())) {
continue;
}
if (method.getAnnotation(Local.class) != null) {
continue;
}
if (method.getName().equals("getClass") || method.getName().equals("toString")) {
continue;
}
if (method.getName().equals("equals") || method.getName().equals("hashCode")) {
continue;
}
if (method.getName().equals("notify") || method.getName().equals("notifyAll") || method.getName().equals("wait")) {
continue;
}
if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == AnyValue.class) {
if (method.getName().equals("init") || method.getName().equals("stop") || method.getName().equals("destroy")) {
continue;
}
}
Uint128 actionid = Sncp.actionid(method);
Method old = actionids.get(actionid);
if (old != null) {
if (old.getDeclaringClass().equals(method.getDeclaringClass())) {
throw new SncpException(serviceClass.getName() + " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")");
}
continue;
}
actionids.put(actionid, method);
if (method.getAnnotation(SncpDyn.class) != null) {
multis.add(method);
} else {
list.add(method);
}
}
multis.sort((m1, m2) -> m1.getAnnotation(SncpDyn.class).index() - m2.getAnnotation(SncpDyn.class).index());
list.sort((Method o1, Method o2) -> {
if (!o1.getName().equals(o2.getName())) {
return o1.getName().compareTo(o2.getName());
}
if (o1.getParameterCount() != o2.getParameterCount()) {
return o1.getParameterCount() - o2.getParameterCount();
}
return 0;
});
//带SncpDyn必须排在前面
multis.addAll(list);
final LinkedHashMap<Uint128, Method> rs = new LinkedHashMap<>();
for (Method method : multis) {
for (Map.Entry<Uint128, Method> en : actionids.entrySet()) {
if (en.getValue() == method) {
rs.put(en.getKey(), en.getValue());
break;
}
}
}
return rs;
}
//只给远程模式调用的
public <T> T remote(final int index, final Object... params) {
final SncpAction action = actions[index];
final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null;
if (action.handlerFuncParamIndex >= 0) {
params[action.handlerFuncParamIndex] = null;
}
final BsonReader reader = bsonConvert.pollBsonReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥
CompletableFuture result = action.futureCreator.create();
future.whenComplete((v, e) -> {
try {
if (e != null) {
result.completeExceptionally(e);
} else {
reader.setBytes(v);
byte i;
while ((i = reader.readByte()) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
Object rs = bsonConvert.convertFrom(Object.class, reader);
result.complete(rs);
}
} catch (Exception exp) {
result.completeExceptionally(exp);
} finally {
bsonConvert.offerBsonReader(reader);
}
}); //需要获取 Executor
return (T) result;
}
if (handlerFunc != null) {
return null;
}
try {
reader.setBytes(future.get(5, TimeUnit.SECONDS));
byte i;
while ((i = reader.readByte()) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
} catch (RpcRemoteException re) {
throw re;
} catch (TimeoutException e) {
throw new RpcRemoteException(actions[index].method + " sncp remote timeout, params=" + JsonConvert.root().convertTo(params));
} catch (InterruptedException | ExecutionException e) {
throw new RpcRemoteException(actions[index].method + " sncp remote error, params=" + JsonConvert.root().convertTo(params), e);
} finally {
bsonConvert.offerBsonReader(reader);
}
}
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
final String traceid = Traces.currTraceid();
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) {
params[action.addressSourceParamIndex] = this.clientSncpAddress;
}
if (bsonConvert == null) {
bsonConvert = BsonConvert.root();
}
final BsonWriter writer = bsonConvert.pollBsonWriter(); // 将head写入
writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
BsonConvert bcc = bsonConvert;
if (params[i] instanceof org.redkale.service.RetResult) {
org.redkale.convert.Convert cc = ((org.redkale.service.RetResult) params[i]).convert();
if (cc instanceof BsonConvert) {
bcc = (BsonConvert) cc;
}
}
bcc.convertTo(writer, CompletionHandler.class.isAssignableFrom(myparamclass[i]) ? CompletionHandler.class : myparamtypes[i], params[i]);
}
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime();
final Uint128 actionid = action.actionid;
if (messageAgent != null) { //MQ模式
final ByteArray reqbytes = writer.toByteArray();
fillHeader(reqbytes, action, seqid, traceid, reqBodyLength);
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
if (targetTopic == null) {
targetTopic = this.topic;
}
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes.getBytes());
final String tt = targetTopic;
if (logger.isLoggable(Level.FINER)) {
message.attach(Utility.append(new Object[]{action.actionName()}, params));
} else {
message.attach(params);
}
return messageClient.sendMessage(message).thenApply(msg -> {
if (msg == null || msg.getContent() == null) {
logger.log(Level.SEVERE, action.method + " sncp mq(params: " + convert.convertTo(params) + ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg);
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
byte[] body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
return body;
});
}
final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr);
return connFuture.thenCompose(conn0 -> {
final CompletableFuture<byte[]> future = new CompletableFuture();
if (conn0 == null) {
future.completeExceptionally(new RpcRemoteException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect, params=" + JsonConvert.root().convertTo(params)));
return future;
}
if (!conn0.isOpen()) {
conn0.dispose();
future.completeExceptionally(new RpcRemoteException("sncp " + conn0.getRemoteAddress() + " cannot connect, params=" + JsonConvert.root().convertTo(params)));
return future;
}
final AsyncConnection conn = conn0;
final ByteArray array = writer.toByteArray();
fillHeader(array, action, seqid, traceid, reqBodyLength);
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachments) {
//----------------------- 读取返回结果 -------------------------------------
conn.read(new CompletionHandler<Integer, ByteBuffer>() {
private byte[] body;
private int received;
@Override
public void completed(Integer count, ByteBuffer buffer) {
try {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params)));
conn.offerReadBuffer(buffer);
transport.offerConnection(true, conn);
return;
}
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
conn.setReadBuffer(buffer);
conn.read(this);
return;
}
buffer.flip();
if (received > 0) {
int offset = this.received;
this.received += buffer.remaining();
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
} else {
conn.offerReadBuffer(buffer);
success();
}
return;
}
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取
this.body = new byte[respBodyLength];
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
conn.offerReadBuffer(buffer);
success();
}
} catch (Throwable e) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(e, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e);
}
}
@SuppressWarnings("unchecked")
public void success() {
future.complete(this.body);
transport.offerConnection(false, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final BsonReader reader = bsonConvert.pollBsonReader();
try {
reader.setBytes(this.body);
int i;
while ((i = (reader.readByte() & 0xff)) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.genericType(), reader));
}
Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
handler.completed(rs, handlerAttach);
} catch (Exception e) {
handler.failed(e, handlerAttach);
} finally {
bsonConvert.offerBsonReader(reader);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment2) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
conn.offerReadBuffer(attachment2);
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed, params=" + JsonConvert.root().convertTo(params), exc);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed, params=" + JsonConvert.root().convertTo(params), exc);
}
});
return future;
});
}
private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) {
long rseqid = buffer.getLong();
if (rseqid != seqid) {
throw new SncpException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid);
}
if (buffer.getChar() != HEADER_SIZE) {
throw new SncpException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
}
Uint128 rserviceid = Uint128.read(buffer);
if (!rserviceid.equals(this.serviceid)) {
throw new SncpException("sncp(" + action.method + ") response.serviceid = " + serviceid + ", but request.serviceid =" + rserviceid);
}
int version = buffer.getInt();
if (version != this.serviceVersion) {
throw new SncpException("sncp(" + action.method + ") response.serviceVersion = " + serviceVersion + ", but request.serviceVersion =" + version);
}
Uint128 raction = Uint128.read(buffer);
Uint128 actid = action.actionid;
if (!actid.equals(raction)) {
throw new SncpException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + raction + ")");
}
buffer.getInt(); //地址
buffer.getChar(); //端口
}
private void fillHeader(ByteArray buffer, SncpAction action, long seqid, String traceid, int bodyLength) {
action.header.write(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0
}
protected static final class SncpAction {
protected final Uint128 actionid;
protected final Method method;
protected final Type resultTypes; //void 必须设为 null
protected final Type[] paramTypes;
protected final Class[] paramClass;
protected final Attribute[] paramAttrs; // 为null表示无RpcCall处理index=0固定为null, 其他为参数标记的RpcCall回调方法
protected final int handlerFuncParamIndex;
protected final int handlerAttachParamIndex;
protected final int addressTargetParamIndex;
protected final int addressSourceParamIndex;
protected final int topicTargetParamIndex;
protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture
protected final Creator<? extends CompletableFuture> futureCreator;
protected final SncpHeader header;
@SuppressWarnings("unchecked")
public SncpAction(final Class clazz, Method method, Uint128 serviceid, Uint128 actionid) {
this.actionid = actionid == null ? Sncp.actionid(method) : actionid;
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), clazz);
this.resultTypes = rt == void.class ? null : rt;
this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType());
this.futureCreator = boolReturnTypeFuture ? Creator.create((Class<? extends CompletableFuture>) method.getReturnType()) : null;
this.paramTypes = TypeToken.getGenericType(method.getGenericParameterTypes(), clazz);
this.paramClass = method.getParameterTypes();
this.method = method;
Annotation[][] anns = method.getParameterAnnotations();
int tpoicAddrIndex = -1;
int targetAddrIndex = -1;
int sourceAddrIndex = -1;
int handlerAttachIndex = -1;
int handlerFuncIndex = -1;
boolean hasattr = false;
Attribute[] atts = new Attribute[paramTypes.length + 1];
if (anns.length > 0) {
Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) {
if (CompletionHandler.class.isAssignableFrom(params[i])) {
if (boolReturnTypeFuture) {
throw new SncpException(method + " have both CompletionHandler and CompletableFuture");
}
if (handlerFuncIndex >= 0) {
throw new SncpException(method + " have more than one CompletionHandler type parameter");
}
Sncp.checkAsyncModifier(params[i], method);
handlerFuncIndex = i;
break;
}
}
for (int i = 0; i < anns.length; i++) {
if (anns[i].length > 0) {
for (Annotation ann : anns[i]) {
if (ann.annotationType() == RpcAttachment.class) {
if (handlerAttachIndex >= 0) {
throw new SncpException(method + " have more than one @RpcAttachment parameter");
}
handlerAttachIndex = i;
} else if (ann.annotationType() == RpcTargetAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
targetAddrIndex = i;
} else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
sourceAddrIndex = i;
} else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) {
tpoicAddrIndex = i;
}
}
}
}
}
this.topicTargetParamIndex = tpoicAddrIndex;
this.addressTargetParamIndex = targetAddrIndex;
this.addressSourceParamIndex = sourceAddrIndex;
this.handlerFuncParamIndex = handlerFuncIndex;
this.handlerAttachParamIndex = handlerAttachIndex;
this.paramAttrs = hasattr ? atts : null;
this.header = new SncpHeader(null, serviceid, actionid);
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}
}
public String actionName() {
return method.getDeclaringClass().getSimpleName() + "." + method.getName();
}
@Override
public String toString() {
return "{" + actionid + "," + (method == null ? "null" : method.getName()) + "}";
}
}
}

View File

@@ -6,12 +6,11 @@
package org.redkale.net.sncp;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.logging.*;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.net.Request;
import static org.redkale.net.sncp.Sncp.HEADER_SIZE;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.Uint128;
/**
@@ -35,17 +34,9 @@ public class SncpRequest extends Request<SncpContext> {
protected final BsonConvert convert;
private long seqid;
protected int readState = READ_STATE_ROUTE;
private int serviceVersion;
private Uint128 serviceid;
private Uint128 actionid;
private int bodyLength;
private SncpHeader header;
private int bodyOffset;
@@ -53,8 +44,6 @@ public class SncpRequest extends Request<SncpContext> {
private byte[] body;
private final byte[] addrBytes = new byte[6];
protected SncpRequest(SncpContext context) {
super(context);
this.convert = context.getBsonConvert();
@@ -67,38 +56,33 @@ public class SncpRequest extends Request<SncpContext> {
if (buffer.remaining() < HEADER_SIZE) {
return HEADER_SIZE - buffer.remaining(); //小于60
}
this.seqid = buffer.getLong(); //8
if (buffer.getChar() != HEADER_SIZE) { //2
this.header = new SncpHeader();
if (!this.header.read(buffer)) {
if (context.getLogger().isLoggable(Level.FINEST)) {
context.getLogger().finest("sncp buffer header.length not " + HEADER_SIZE);
}
return -1;
}
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
buffer.get(addrBytes); //ipaddr //6
this.bodyLength = buffer.getInt(); //4
if (buffer.getInt() != 0) { //4 retcode
if (this.header.getRetcode() != 0) { // retcode
if (context.getLogger().isLoggable(Level.FINEST)) {
context.getLogger().finest("sncp buffer header.retcode not 0");
}
return -1;
}
this.body = new byte[this.bodyLength];
this.body = new byte[this.header.getBodyLength()];
this.readState = READ_STATE_BODY;
}
//---------------------body----------------------------------
if (this.readState == READ_STATE_BODY) {
if (this.bodyLength == 0) {
int bodyLength = this.header.getBodyLength();
if (bodyLength == 0) {
this.readState = READ_STATE_END;
if (this.seqid == 0 && this.serviceid == Uint128.ZERO && this.actionid == Uint128.ZERO) {
if (this.header.getSeqid() == 0 && this.header.getServiceid() == Uint128.ZERO && this.header.getActionid() == Uint128.ZERO) {
this.ping = true;
}
return 0;
}
int len = Math.min(this.bodyLength, buffer.remaining());
int len = Math.min(bodyLength, buffer.remaining());
buffer.get(body, 0, len);
this.bodyOffset = len;
int rs = bodyLength - len;
@@ -112,7 +96,7 @@ public class SncpRequest extends Request<SncpContext> {
@Override
protected Serializable getRequestid() {
return seqid;
return header.getSeqid();
}
@Override
@@ -126,24 +110,16 @@ public class SncpRequest extends Request<SncpContext> {
@Override
public String toString() {
return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid
+ ",serviceVersion=" + this.serviceVersion + ",serviceid=" + this.serviceid
+ ",actionid=" + this.actionid + ",bodyLength=" + this.bodyLength
+ ",bodyOffset=" + this.bodyOffset + ",remoteAddress=" + getRemoteAddress() + "}";
return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + "}";
}
@Override
protected void recycle() {
this.seqid = 0;
this.readState = READ_STATE_ROUTE;
this.serviceid = null;
this.serviceVersion = 0;
this.actionid = null;
this.bodyLength = 0;
this.header = null;
this.bodyOffset = 0;
this.body = null;
this.ping = false;
this.addrBytes[0] = 0;
super.recycle();
}
@@ -155,28 +131,8 @@ public class SncpRequest extends Request<SncpContext> {
return body;
}
public long getSeqid() {
return seqid;
}
public int getServiceVersion() {
return serviceVersion;
}
public Uint128 getServiceid() {
return serviceid;
}
public Uint128 getActionid() {
return actionid;
}
public InetSocketAddress getRemoteAddress() {
if (addrBytes[0] == 0) {
return null;
}
return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]),
((0xff00 & (addrBytes[4] << 8)) | (0xff & addrBytes[5])));
public SncpHeader getHeader() {
return header;
}
}

View File

@@ -7,8 +7,8 @@ package org.redkale.net.sncp;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import static org.redkale.net.sncp.Sncp.HEADER_SIZE;
import org.redkale.util.*;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.ByteArray;
/**
*
@@ -80,32 +80,8 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
}
protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) {
fillRespHeader(buffer, request.getSeqid(), request.getServiceid(), request.getServiceVersion(),
request.getActionid(), request.getTraceid(), this.addrBytes, this.addrPort, bodyLength, retcode);
}
protected static void fillRespHeader(ByteArray buffer, long seqid, Uint128 serviceid, int serviceVersion,
Uint128 actionid, String traceid, byte[] addrBytes, int addrPort, int bodyLength, int retcode) {
//---------------------head----------------------------------
int offset = 0;
buffer.putLong(offset, seqid);
offset += 8;
buffer.putChar(offset, (char) HEADER_SIZE);
offset += 2;
buffer.putUint128(offset, serviceid);
offset += 16;
buffer.putInt(offset, serviceVersion);
offset += 4;
buffer.putUint128(offset, actionid);
offset += 16;
buffer.put(offset, addrBytes);
offset += addrBytes.length; //4
buffer.putChar(offset, (char) addrPort);
offset += 2;
buffer.putInt(offset, bodyLength);
offset += 4;
buffer.putInt(offset, retcode);
offset += 4;
SncpHeader header = request.getHeader();
header.write(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode);
}
}

View File

@@ -941,11 +941,12 @@ public final class ByteArray implements ByteTuple {
* 写入ByteBuffer指定长度的数据
*
* @param buffer 数据
* @param len 指定长度
* @param length 指定长度
*
* @return ByteArray
*/
public ByteArray put(ByteBuffer buffer, int len) {
public ByteArray put(ByteBuffer buffer, int length) {
int len = Math.min(buffer.remaining(), length);
if (len < 1) {
return this;
}

View File

@@ -23,11 +23,11 @@ public final class Uint128 extends Number implements Comparable<Uint128> {
protected final byte[] value;
protected Uint128(long v1, long v2) { //暂时不用
this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32),
(byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32),
(byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2};
}
// private Uint128(long v1, long v2) { //暂时不用
// this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32),
// (byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32),
// (byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2};
// }
private Uint128(byte[] bytes) {
if (bytes == null || bytes.length != 16) {

View File

@@ -120,7 +120,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
System.out.println(method);
}
System.out.println("-----------------------------------");
for (Method method : SncpClient.parseMethodActions(service.getClass()).values()) {
for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) {
System.out.println(method);
}
System.out.println("-----------------------------------");
@@ -129,7 +129,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
System.out.println(method);
}
System.out.println("-----------------------------------");
for (Method method : SncpClient.parseMethodActions(service.getClass()).values()) {
for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) {
System.out.println(method);
}
System.out.println("-----------------------------------");
@@ -138,7 +138,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
System.out.println(method);
}
System.out.println("-----------------------------------");
for (Method method : SncpClient.parseMethodActions(service.getClass()).values()) {
for (Method method : SncpOldClient.parseMethodActions(service.getClass()).values()) {
System.out.println(method);
}
System.out.println("-----------------------------------");

View File

@@ -15,6 +15,6 @@ import org.redkale.annotation.ResourceType;
@ResourceType(SncpTestIService.class)
public class _DynLocalSncpTestService extends SncpTestServiceImpl {
private SncpClient _redkale_client;
private SncpOldClient _redkale_client;
}