This commit is contained in:
Redkale
2021-01-20 11:45:56 +08:00
parent 983c667725
commit 53b7f9b26f
2 changed files with 15 additions and 17 deletions

View File

@@ -225,22 +225,16 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
} }
@Override @Override
protected void register(NodeServer ns, String protocol, Service service) { protected ClusterEntry register(NodeServer ns, String protocol, Service service) {
deregister(ns, protocol, service, false); deregister(ns, protocol, service, false);
// //
String serviceid = generateServiceId(ns, protocol, service); ClusterEntry clusterEntry = new ClusterEntry(ns, protocol, service);
String servicename = generateServiceName(ns, protocol, service);
InetSocketAddress address = ns.isSNCP() ? ns.getSncpAddress() : ns.getServer().getSocketAddress();
String host = address.getHostString();
if ("0.0.0.0".equals(host)) {
host = this.appAddress.getHostString();
address = new InetSocketAddress(host, address.getPort());
}
AddressEntry entry = new AddressEntry(); AddressEntry entry = new AddressEntry();
entry.addr = address; entry.addr = clusterEntry.address;
entry.nodeid = this.nodeid; entry.nodeid = this.nodeid;
entry.time = System.currentTimeMillis(); entry.time = System.currentTimeMillis();
source.hset(servicename, serviceid, AddressEntry.class, entry); source.hset(clusterEntry.servicename, clusterEntry.serviceid, AddressEntry.class, entry);
return clusterEntry;
} }
@Override @Override

View File

@@ -107,14 +107,12 @@ public abstract class ClusterAgent {
//注册本地模式 //注册本地模式
for (Service service : localServices) { for (Service service : localServices) {
if (!canRegister(protocol, service)) continue; if (!canRegister(protocol, service)) continue;
register(ns, protocol, service); ClusterEntry htentry = register(ns, protocol, service);
ClusterEntry htentry = new ClusterEntry(ns, protocol, service);
localEntrys.put(htentry.serviceid, htentry); localEntrys.put(htentry.serviceid, htentry);
if (protocol.toLowerCase().startsWith("http")) { if (protocol.toLowerCase().startsWith("http")) {
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
if (mmc != null) { if (mmc != null) {
register(ns, "mqtp", service); ClusterEntry mqentry = register(ns, "mqtp", service);
ClusterEntry mqentry = new ClusterEntry(ns, "mqtp", service);
localEntrys.put(mqentry.serviceid, mqentry); localEntrys.put(mqentry.serviceid, mqentry);
htentry.submqtp = true; htentry.submqtp = true;
} }
@@ -177,7 +175,7 @@ public abstract class ClusterAgent {
protected abstract CompletableFuture<Collection<InetSocketAddress>> queryAddress(ClusterEntry entry); protected abstract CompletableFuture<Collection<InetSocketAddress>> queryAddress(ClusterEntry entry);
//注册服务 //注册服务
protected abstract void register(NodeServer ns, String protocol, Service service); protected abstract ClusterEntry register(NodeServer ns, String protocol, Service service);
//注销服务 //注销服务
protected abstract void deregister(NodeServer ns, String protocol, Service service); protected abstract void deregister(NodeServer ns, String protocol, Service service);
@@ -322,7 +320,13 @@ public abstract class ClusterAgent {
this.checkid = generateCheckId(ns, protocol, service); this.checkid = generateCheckId(ns, protocol, service);
this.checkname = generateCheckName(ns, protocol, service); this.checkname = generateCheckName(ns, protocol, service);
this.protocol = protocol; this.protocol = protocol;
this.address = ns.getSocketAddress(); InetSocketAddress addr = ns.getSocketAddress();
String host = addr.getHostString();
if ("0.0.0.0".equals(host)) {
host = appAddress.getHostString();
addr = new InetSocketAddress(host, addr.getPort());
}
this.address = addr;
this.serviceref = new WeakReference(service); this.serviceref = new WeakReference(service);
Server server = ns.getServer(); Server server = ns.getServer();
this.netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL; this.netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL;