This commit is contained in:
Redkale
2020-05-16 09:03:18 +08:00
parent 292ff63699
commit 081773163b
4 changed files with 22 additions and 106 deletions

View File

@@ -271,8 +271,7 @@ public abstract class NodeServer {
SncpClient client = Sncp.getSncpClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
if (client != null && client.getLocalGroup() != null) groups.add(client.getLocalGroup());
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
}
}
@@ -305,9 +304,8 @@ public abstract class NodeServer {
SncpClient client = Sncp.getSncpClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
if (client != null && client.getLocalGroup() != null) groups.add(client.getLocalGroup());
SimpleEntry<Class, AnyValue> resEntry = cacheResource.get(resourceName);
AnyValue sourceConf = resEntry == null ? null : resEntry.getValue();
if (sourceConf == null) {

View File

@@ -73,13 +73,8 @@ public class TransportWatchService extends AbstractWatchService {
client.getRemoteGroupTransport().addRemoteAddresses(address);
}
} else {
if (group.equals(client.getSameGroup())) {
client.getSameGroupTransport().addRemoteAddresses(address);
}
if (client.getDiffGroups() != null && client.getDiffGroups().contains(group)) {
for (Transport transport : client.getDiffGroupTransports()) {
transport.addRemoteAddresses(address);
}
if (group.equals(client.getLocalGroup())) {
client.getLocalGroupTransport().addRemoteAddresses(address);
}
}
}
@@ -115,13 +110,8 @@ public class TransportWatchService extends AbstractWatchService {
client.getRemoteGroupTransport().removeRemoteAddresses(address);
}
} else {
if (group.equals(client.getSameGroup())) {
client.getSameGroupTransport().removeRemoteAddresses(address);
}
if (client.getDiffGroups() != null && client.getDiffGroups().contains(group)) {
for (Transport transport : client.getDiffGroupTransports()) {
transport.removeRemoteAddresses(address);
}
if (group.equals(client.getLocalGroup())) {
client.getLocalGroupTransport().removeRemoteAddresses(address);
}
}
}

View File

@@ -454,10 +454,8 @@ public abstract class Sncp {
Set<String> diffGroups = groups == null ? new HashSet<>() : new HashSet<>(groups);
String sameGroup = transportFactory.findGroupName(clientSncpAddress);
if (sameGroup != null) diffGroups.remove(sameGroup);
client.setSameGroup(sameGroup);
client.setDiffGroups(diffGroups);
client.setSameGroupTransport(transportFactory.loadSameGroupTransport(clientSncpAddress));
client.setDiffGroupTransports(transportFactory.loadDiffGroupTransports(clientSncpAddress, diffGroups));
client.setLocalGroup(sameGroup);
client.setLocalGroupTransport(transportFactory.loadSameGroupTransport(clientSncpAddress));
e.set(rs, client);
transportFactory.addSncpService(rs);
} catch (NoSuchFieldException ne) {
@@ -519,7 +517,7 @@ public abstract class Sncp {
* @param serviceTypeOrImplClass Service类
* @param transportFactory TransportFactory
* @param clientAddress 本地IP地址
* @param groups0 所有的组节点,包含自身
* @param groups0 所有的组节点,包含自身
* @param conf 启动配置项
*
* @return Service的远程模式实例

View File

@@ -71,16 +71,10 @@ public final class SncpClient {
protected Transport remoteGroupTransport;
//本地模式
protected String sameGroup;
protected String localGroup;
//本地模式
protected Transport sameGroupTransport;
//本地模式
protected Set<String> diffGroups;
//本地模式
protected Transport[] diffGroupTransports;
protected Transport localGroupTransport;
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final TransportFactory factory,
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
@@ -147,36 +141,20 @@ public final class SncpClient {
this.remoteGroupTransport = remoteGroupTransport;
}
public String getSameGroup() {
return sameGroup;
public String getLocalGroup() {
return localGroup;
}
public void setSameGroup(String sameGroup) {
this.sameGroup = sameGroup;
public void setLocalGroup(String localGroup) {
this.localGroup = localGroup;
}
public Transport getSameGroupTransport() {
return sameGroupTransport;
public Transport getLocalGroupTransport() {
return localGroupTransport;
}
public void setSameGroupTransport(Transport sameGroupTransport) {
this.sameGroupTransport = sameGroupTransport;
}
public Set<String> getDiffGroups() {
return diffGroups;
}
public void setDiffGroups(Set<String> diffGroups) {
this.diffGroups = diffGroups;
}
public Transport[] getDiffGroupTransports() {
return diffGroupTransports;
}
public void setDiffGroupTransports(Transport[] diffGroupTransports) {
this.diffGroupTransports = diffGroupTransports;
public void setLocalGroupTransport(Transport localGroupTransport) {
this.localGroupTransport = localGroupTransport;
}
@Override
@@ -191,18 +169,10 @@ public final class SncpClient {
public String toSimpleString() { //给Sncp产生的Service用
String service = serviceClass.getName();
if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX);
List<InetSocketAddress> diffaddrs = new ArrayList<>();
if (diffGroupTransports != null && diffGroupTransports.length > 1) {
for (Transport t : diffGroupTransports) {
diffaddrs.addAll(Utility.ofList(t.getRemoteAddresses()));
}
}
return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceversion = " + serviceversion
+ ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
+ ((sameGroup == null || sameGroup.isEmpty()) ? "" : ", sameGroup = " + sameGroup)
+ (sameGroupTransport == null ? "" : ", sameGroupTransport = " + Arrays.toString(sameGroupTransport.getRemoteAddresses()))
+ ((diffGroups == null || diffGroups.isEmpty()) ? "" : ", diffGroups = " + diffGroups)
+ ((diffGroupTransports == null || diffGroupTransports.length < 1) ? "" : ", diffGroupTransports = " + diffaddrs)
+ ((localGroup == null || localGroup.isEmpty()) ? "" : ", localGroup = " + localGroup)
+ (localGroupTransport == null ? "" : ", localGroupTransport = " + Arrays.toString(localGroupTransport.getRemoteAddresses()))
+ ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups)
+ (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses()))
+ ", actions.size = " + actions.length + ")";
@@ -249,46 +219,6 @@ public final class SncpClient {
return multis;
}
public void remoteSameGroup(final int index, final Object... params) {
if (sameGroupTransport == null) return;
final SncpAction action = actions[index];
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler因为之前本地方法已经调用过了
for (InetSocketAddress addr : sameGroupTransport.getRemoteAddresses()) {
remote0(null, sameGroupTransport, addr, action, params);
}
}
public void asyncRemoteSameGroup(final int index, final Object... params) {
if (sameGroupTransport == null) return;
if (executor != null) {
executor.execute(() -> {
remoteSameGroup(index, params);
});
} else {
remoteSameGroup(index, params);
}
}
public void remoteDiffGroup(final int index, final Object... params) {
if (diffGroupTransports == null || diffGroupTransports.length < 1) return;
final SncpAction action = actions[index];
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler因为之前本地方法已经调用过了
for (Transport transport : diffGroupTransports) {
remote0(null, transport, null, action, params);
}
}
public void asyncRemoteDiffGroup(final int index, final Object... params) {
if (diffGroupTransports == null || diffGroupTransports.length < 1) return;
if (executor != null) {
executor.execute(() -> {
remoteDiffGroup(index, params);
});
} else {
remoteDiffGroup(index, params);
}
}
//只给远程模式调用的
public <T> T remote(final int index, final Object... params) {
final SncpAction action = actions[index];