This commit is contained in:
@@ -271,7 +271,6 @@ public abstract class NodeServer {
|
||||
SncpClient client = Sncp.getSncpClient(srcService);
|
||||
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
|
||||
final Set<String> groups = new HashSet<>();
|
||||
if (client != null && client.getLocalGroup() != null) groups.add(client.getLocalGroup());
|
||||
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
|
||||
}
|
||||
}
|
||||
@@ -302,10 +301,7 @@ public abstract class NodeServer {
|
||||
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource
|
||||
final Service srcService = (Service) src;
|
||||
SncpClient client = Sncp.getSncpClient(srcService);
|
||||
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
|
||||
final Set<String> groups = new HashSet<>();
|
||||
if (client != null && client.getLocalGroup() != null) groups.add(client.getLocalGroup());
|
||||
|
||||
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
|
||||
SimpleEntry<Class, AnyValue> resEntry = cacheResource.get(resourceName);
|
||||
AnyValue sourceConf = resEntry == null ? null : resEntry.getValue();
|
||||
if (sourceConf == null) {
|
||||
@@ -315,7 +311,7 @@ public abstract class NodeServer {
|
||||
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
|
||||
Object source = null;
|
||||
if (CacheSource.class.isAssignableFrom(sourceType)) { // CacheSource
|
||||
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
|
||||
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, null, Sncp.getConf(srcService));
|
||||
Type genericType = field.getGenericType();
|
||||
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
|
||||
Type valType = pt == null ? null : pt.getActualTypeArguments()[0];
|
||||
|
||||
@@ -72,10 +72,6 @@ public class TransportWatchService extends AbstractWatchService {
|
||||
if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) {
|
||||
client.getRemoteGroupTransport().addRemoteAddresses(address);
|
||||
}
|
||||
} else {
|
||||
if (group.equals(client.getLocalGroup())) {
|
||||
client.getLocalGroupTransport().addRemoteAddresses(address);
|
||||
}
|
||||
}
|
||||
}
|
||||
DefaultAnyValue node = DefaultAnyValue.create("addr", addr).addValue("port", port);
|
||||
@@ -109,10 +105,6 @@ public class TransportWatchService extends AbstractWatchService {
|
||||
if (client.getRemoteGroups() != null && client.getRemoteGroups().contains(group)) {
|
||||
client.getRemoteGroupTransport().removeRemoteAddresses(address);
|
||||
}
|
||||
} else {
|
||||
if (group.equals(client.getLocalGroup())) {
|
||||
client.getLocalGroupTransport().removeRemoteAddresses(address);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (AnyValue groupconf : application.getAppConfig().getAnyValue("resources").getAnyValues("group")) {
|
||||
|
||||
@@ -451,11 +451,6 @@ public abstract class Sncp {
|
||||
Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client");
|
||||
e.setAccessible(true);
|
||||
client = new SncpClient(name, serviceImplClass, rs, transportFactory, false, newClazz, clientSncpAddress);
|
||||
Set<String> diffGroups = groups == null ? new HashSet<>() : new HashSet<>(groups);
|
||||
String sameGroup = transportFactory.findGroupName(clientSncpAddress);
|
||||
if (sameGroup != null) diffGroups.remove(sameGroup);
|
||||
client.setLocalGroup(sameGroup);
|
||||
client.setLocalGroupTransport(transportFactory.loadSameGroupTransport(clientSncpAddress));
|
||||
e.set(rs, client);
|
||||
transportFactory.addSncpService(rs);
|
||||
} catch (NoSuchFieldException ne) {
|
||||
|
||||
@@ -70,12 +70,6 @@ public final class SncpClient {
|
||||
//远程模式
|
||||
protected Transport remoteGroupTransport;
|
||||
|
||||
//本地模式
|
||||
protected String localGroup;
|
||||
|
||||
//本地模式
|
||||
protected Transport localGroupTransport;
|
||||
|
||||
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final TransportFactory factory,
|
||||
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
|
||||
this.remote = remote;
|
||||
@@ -141,22 +135,6 @@ public final class SncpClient {
|
||||
this.remoteGroupTransport = remoteGroupTransport;
|
||||
}
|
||||
|
||||
public String getLocalGroup() {
|
||||
return localGroup;
|
||||
}
|
||||
|
||||
public void setLocalGroup(String localGroup) {
|
||||
this.localGroup = localGroup;
|
||||
}
|
||||
|
||||
public Transport getLocalGroupTransport() {
|
||||
return localGroupTransport;
|
||||
}
|
||||
|
||||
public void setLocalGroupTransport(Transport localGroupTransport) {
|
||||
this.localGroupTransport = localGroupTransport;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String service = serviceClass.getName();
|
||||
@@ -171,8 +149,6 @@ public final class SncpClient {
|
||||
if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX);
|
||||
return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceversion = " + serviceversion
|
||||
+ ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
|
||||
+ ((localGroup == null || localGroup.isEmpty()) ? "" : ", localGroup = " + localGroup)
|
||||
+ (localGroupTransport == null ? "" : ", localGroupTransport = " + Arrays.toString(localGroupTransport.getRemoteAddresses()))
|
||||
+ ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups)
|
||||
+ (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses()))
|
||||
+ ", actions.size = " + actions.length + ")";
|
||||
|
||||
@@ -11,7 +11,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
|
||||
/**
|
||||
* 修饰由SNCP协议动态生成的class、和method
|
||||
* 本地模式:动态生成的_DynLocalXXXXService类其带有@RpcMultiRun方法均会打上@SncpDyn(remote = false, index=N) 的注解
|
||||
* 本地模式:动态生成的_DynLocalXXXXService类会打上@SncpDyn(remote = false) 的注解
|
||||
* 远程模式:动态生成的_DynRemoteXXXService类会打上@SncpDyn(remote = true) 的注解
|
||||
*
|
||||
* <p> 详情见: https://redkale.org
|
||||
|
||||
61
src/org/redkale/service/RpcCallArrayAttribute.java
Normal file
61
src/org/redkale/service/RpcCallArrayAttribute.java
Normal file
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.service;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Array;
|
||||
import org.redkale.util.Attribute;
|
||||
|
||||
/**
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
* @param <T> 对象类型
|
||||
* @param <F> 字段类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class RpcCallArrayAttribute<T, F> implements Attribute<T[], F> {
|
||||
|
||||
public static final RpcCallArrayAttribute instance = new RpcCallArrayAttribute();
|
||||
|
||||
@Override
|
||||
public Class<? extends F> type() {
|
||||
return (Class<F>) Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<T[]> declaringClass() {
|
||||
return (Class<T[]>) (Class) Object[].class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String field() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public F get(final T[] objs) {
|
||||
if (objs == null || objs.length == 0) return null;
|
||||
final Attribute<T, Serializable> attr = RpcCallAttribute.load(objs[0].getClass());
|
||||
final Object keys = Array.newInstance(attr.type(), objs.length);
|
||||
for (int i = 0; i < objs.length; i++) {
|
||||
Array.set(keys, i, attr.get(objs[i]));
|
||||
}
|
||||
return (F) keys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(final T[] objs, final F keys) {
|
||||
if (objs == null || objs.length == 0) return;
|
||||
final Attribute<T, Serializable> attr = RpcCallAttribute.load(objs[0].getClass());
|
||||
for (int i = 0; i < objs.length; i++) {
|
||||
attr.set(objs[i], (Serializable) Array.get(keys, i));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -72,46 +72,4 @@ public class RpcCallAttribute implements Attribute<Object, Serializable> {
|
||||
load(obj.getClass()).set(obj, key);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static class RpcCallArrayAttribute<T, F> implements Attribute<T[], F> {
|
||||
|
||||
public static final RpcCallArrayAttribute instance = new RpcCallArrayAttribute();
|
||||
|
||||
@Override
|
||||
public Class<? extends F> type() {
|
||||
return (Class<F>) Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<T[]> declaringClass() {
|
||||
return (Class<T[]>) (Class) Object[].class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String field() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public F get(final T[] objs) {
|
||||
if (objs == null || objs.length == 0) return null;
|
||||
final Attribute<T, Serializable> attr = RpcCallAttribute.load(objs[0].getClass());
|
||||
final Object keys = Array.newInstance(attr.type(), objs.length);
|
||||
for (int i = 0; i < objs.length; i++) {
|
||||
Array.set(keys, i, attr.get(objs[i]));
|
||||
}
|
||||
return (F) keys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(final T[] objs, final F keys) {
|
||||
if (objs == null || objs.length == 0) return;
|
||||
final Attribute<T, Serializable> attr = RpcCallAttribute.load(objs[0].getClass());
|
||||
for (int i = 0; i < objs.length; i++) {
|
||||
attr.set(objs[i], (Serializable) Array.get(keys, i));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,12 +17,12 @@ public interface SncpTestIService extends Service {
|
||||
public String queryResult(SncpTestBean bean);
|
||||
|
||||
public double queryDoubleResult(String a, int b, double value);
|
||||
|
||||
|
||||
public long queryLongResult(String a, int b, long value);
|
||||
|
||||
public CompletableFuture<String> queryResultAsync(SncpTestBean bean);
|
||||
|
||||
public void insert(@RpcCall(RpcCallAttribute.RpcCallArrayAttribute.class) SncpTestBean... beans);
|
||||
public void insert(@RpcCall(RpcCallArrayAttribute.class) SncpTestBean... beans);
|
||||
|
||||
public String updateBean(@RpcCall(SncpTestServiceImpl.CallAttribute.class) SncpTestBean bean);
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
|
||||
public double queryDoubleResult(String a, int b, double value) {
|
||||
return value + 1;
|
||||
}
|
||||
|
||||
|
||||
public static class CallAttribute implements Attribute<SncpTestBean, Long> {
|
||||
|
||||
@Override
|
||||
@@ -83,7 +83,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(@RpcCall(RpcCallAttribute.RpcCallArrayAttribute.class) SncpTestBean... beans) {
|
||||
public void insert(@RpcCall(RpcCallArrayAttribute.class) SncpTestBean... beans) {
|
||||
for (SncpTestBean bean : beans) {
|
||||
bean.setId(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user