Application.nodeid改为string类型

This commit is contained in:
redkale
2023-12-26 18:36:03 +08:00
parent fb9bdd22c4
commit 1d80041c16
21 changed files with 44 additions and 41 deletions

View File

@@ -54,7 +54,7 @@ class AppConfig {
boolean configFromCache;
//本进程节点ID
int nodeid;
String nodeid;
//本进程节点ID
String name;
@@ -97,7 +97,7 @@ class AppConfig {
private void init(AnyValue conf) {
this.config = conf;
this.name = checkName(config.getValue("name", ""));
this.nodeid = config.getIntValue("nodeid", 0);
this.nodeid = config.getValue("nodeid", String.valueOf(Math.abs(System.nanoTime())));
this.configFromCache = "true".equals(config.getValue("[config-from-cache]"));
//初始化classLoader、serverClassLoader
this.initClassLoader();

View File

@@ -88,7 +88,7 @@ public final class Application {
public static final String RESNAME_APP_CONF_DIR = "APP_CONF_DIR";
/**
* 当前进程节点的nodeid 类型int
* 当前进程节点的nodeid 类型:String
*/
public static final String RESNAME_APP_NODEID = "APP_NODEID";
@@ -156,7 +156,7 @@ public final class Application {
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
//本进程节点ID
final int nodeid;
final String nodeid;
//本进程节点ID
final String name;
@@ -271,8 +271,10 @@ public final class Application {
//设置基础信息资源
this.resourceFactory.register(RESNAME_APP_NAME, String.class, this.name);
this.resourceFactory.register(RESNAME_APP_NODEID, int.class, this.nodeid);
this.resourceFactory.register(RESNAME_APP_NODEID, Integer.class, this.nodeid);
this.resourceFactory.register(RESNAME_APP_NODEID, String.class, this.nodeid);
if (Utility.isNumeric(this.nodeid)) {
this.resourceFactory.register(RESNAME_APP_NODEID, int.class, Integer.parseInt(this.nodeid));
}
this.resourceFactory.register(RESNAME_APP_TIME, long.class, this.startTime);
this.resourceFactory.register(RESNAME_APP_TIME, Long.class, this.startTime);
@@ -1437,7 +1439,7 @@ public final class Application {
return sncpRpcGroups;
}
public int getNodeid() {
public String getNodeid() {
return nodeid;
}

View File

@@ -119,6 +119,6 @@ public abstract class HttpRpcClient implements ClusterRpcClient<HttpSimpleReques
public abstract CompletableFuture<Void> produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request);
protected abstract int getNodeid();
protected abstract String getNodeid();
}

View File

@@ -354,7 +354,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
public InetSocketAddress addr;
public int nodeid;
public String nodeid;
public long time;

View File

@@ -39,7 +39,7 @@ public abstract class ClusterAgent {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Resource(name = RESNAME_APP_NODEID)
protected int nodeid;
protected String nodeid;
@Resource(name = RESNAME_APP_NAME)
protected String appName = "";

View File

@@ -54,7 +54,7 @@ public class HttpClusterRpcClient extends HttpRpcClient {
}
@Override
protected int getNodeid() {
protected String getNodeid() {
return localClient.getNodeid();
}

View File

@@ -73,7 +73,7 @@ public class HttpLocalRpcClient extends HttpRpcClient {
}
@Override
protected int getNodeid() {
protected String getNodeid() {
return application.getNodeid();
}

View File

@@ -18,11 +18,11 @@ final class HttpRpcMessageClient extends HttpRpcClient {
private final MessageCoder<HttpSimpleRequest> requestCoder = HttpSimpleRequestCoder.getInstance();
private final int nodeid;
private final String nodeid;
private final MessageClient messageClient;
public HttpRpcMessageClient(MessageClient messageClient, final int nodeid) {
public HttpRpcMessageClient(MessageClient messageClient, final String nodeid) {
this.messageClient = messageClient;
this.nodeid = nodeid;
}
@@ -42,7 +42,7 @@ final class HttpRpcMessageClient extends HttpRpcClient {
}
@Override
protected int getNodeid() {
protected String getNodeid() {
return nodeid;
}

View File

@@ -58,7 +58,7 @@ public abstract class MessageAgent implements MessageManager {
protected Environment environment;
@Resource(name = RESNAME_APP_NODEID)
protected int nodeid;
protected String nodeid;
@Resource(name = RESNAME_APP_NAME)
protected String nodeName;
@@ -432,7 +432,7 @@ public abstract class MessageAgent implements MessageManager {
return;
}
}
if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && nodeid == 0) {
if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && Utility.isEmpty(nodeid)) {
throw new RedkaleException("Application.node not config in WebSocket Cluster");
}
String topic = Rest.generateHttpReqTopic(service, this.nodeid);
@@ -449,7 +449,7 @@ public abstract class MessageAgent implements MessageManager {
if (al2 != null && !al2.value() && service.getClass().getAnnotation(Local.class) != null) {
return;
}
if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && nodeid == 0) {
if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && Utility.isEmpty(nodeid)) {
throw new RedkaleException("Application.node not config in WebSocket Cluster");
}
String topic = Sncp.generateSncpReqTopic(service, this.nodeid);
@@ -486,7 +486,7 @@ public abstract class MessageAgent implements MessageManager {
return this.sncpAppRespTopic;
}
public final int getNodeid() {
public final String getNodeid() {
return this.nodeid;
}

View File

@@ -29,7 +29,7 @@ public class Context {
protected final long serverStartTime;
//Application节点id
protected final int nodeid;
protected final String nodeid;
//Server的线程池
protected final ExecutorService workExecutor;
@@ -85,7 +85,7 @@ public class Context {
config.dispatcher, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
}
public Context(long serverStartTime, int nodeid, Logger logger, ExecutorService workExecutor, SSLBuilder sslBuilder, SSLContext sslContext,
public Context(long serverStartTime, String nodeid, Logger logger, ExecutorService workExecutor, SSLBuilder sslBuilder, SSLContext sslContext,
int bufferCapacity, final int maxConns, final int maxBody, Charset charset, InetSocketAddress address,
ResourceFactory resourceFactory, DispatcherServlet dispatcher, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
this.serverStartTime = serverStartTime;
@@ -178,7 +178,7 @@ public class Context {
return serverStartTime;
}
public int getNodeid() {
public String getNodeid() {
return nodeid;
}
@@ -220,7 +220,7 @@ public class Context {
public long serverStartTime;
//Application节点id
public int nodeid;
public String nodeid;
//Server的线程池
public ExecutorService workExecutor;

View File

@@ -269,16 +269,16 @@ public final class Rest {
}
//格式: http.req.module.user
public static String generateHttpReqTopic(String module, int nodeid) {
public static String generateHttpReqTopic(String module, String nodeid) {
return getHttpReqTopicPrefix() + "module." + module.toLowerCase();
}
//格式: http.req.module.user
public static String generateHttpReqTopic(String module, String resname, int nodeid) {
public static String generateHttpReqTopic(String module, String resname, String nodeid) {
return getHttpReqTopicPrefix() + "module." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
}
public static String generateHttpReqTopic(Service service, int nodeid) {
public static String generateHttpReqTopic(Service service, String nodeid) {
String resname = Sncp.getResourceName(service);
String module = getRestModule(service).toLowerCase();
return getHttpReqTopicPrefix() + "module." + module + (resname.isEmpty() ? "" : ("-" + resname));

View File

@@ -43,7 +43,7 @@ public abstract class WebSocketNode implements Service {
protected final Logger logger = Logger.getLogger(WebSocketNode.class.getSimpleName());
@Resource(name = RESNAME_APP_NODEID)
protected int nodeid;
protected String nodeid;
//"SNCP_ADDR" 如果不是分布式(没有SNCP) 值为null
@Resource(name = Application.RESNAME_SNCP_ADDRESS, required = false)

View File

@@ -261,12 +261,12 @@ public abstract class Sncp {
}
//格式: sncp.req.module.user
public static String generateSncpReqTopic(Service service, int nodeid) {
public static String generateSncpReqTopic(Service service, String nodeid) {
return generateSncpReqTopic(getResourceName(service), getResourceType(service), nodeid);
}
//格式: sncp.req.module.user
public static String generateSncpReqTopic(String resourceName, Class resourceType, int nodeid) {
public static String generateSncpReqTopic(String resourceName, Class resourceType, String nodeid) {
if (WebSocketNode.class.isAssignableFrom(resourceType)) {
return getSncpReqTopicPrefix() + "module.wsnode" + nodeid + (isEmpty(resourceName) ? "" : ("-" + resourceName));
}

View File

@@ -23,11 +23,11 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
private final AtomicLong seqno = new AtomicLong();
final int nodeid;
final String nodeid;
final InetSocketAddress clientSncpAddress;
public SncpClient(String name, AsyncGroup group, int nodeid,
public SncpClient(String name, AsyncGroup group, String nodeid,
InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) {
super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns
this.clientSncpAddress = clientSncpAddress;
@@ -46,7 +46,7 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
return clientSncpAddress;
}
public int getNodeid() {
public String getNodeid() {
return nodeid;
}

View File

@@ -53,7 +53,7 @@ public class CacheInstanceTest {
CacheAsmMethodBoost boost = new CacheAsmMethodBoost(false, serviceClass);
SncpRpcGroups grous = new SncpRpcGroups();
AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0);
SncpClient client = new SncpClient("", iGroup, 0, new InetSocketAddress("127.0.0.1", 8080), new ClientAddress(), "TCP", 1, 16);
SncpClient client = new SncpClient("", iGroup, "0", new InetSocketAddress("127.0.0.1", 8080), new ClientAddress(), "TCP", 1, 16);
CacheInstance instance = Sncp.createLocalService(null, "", serviceClass, boost, resourceFactory,
grous, client, null, null, null);
//System.out.println(instance.getName());

View File

@@ -43,7 +43,7 @@ public class ABMainService implements Service {
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", abport);
final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final SncpClient client = new SncpClient("", asyncGroup, "0", sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final ResourceFactory resFactory = ResourceFactory.create();
resFactory.register(JsonConvert.root());
resFactory.register(BsonConvert.root());
@@ -94,7 +94,7 @@ public class ABMainService implements Service {
server.init(AnyValueWriter.create("port", abport));
server.addRestServlet(null, service, null, HttpServlet.class, "/pipes");
server.start();
Thread.sleep(100);
Utility.sleep(100);
System.out.println("开始请求");
//不声明一个新的HttpClient会导致Utility.postHttpContent操作

View File

@@ -32,7 +32,7 @@ public class SncpClientCodecTest {
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
SncpClient client = new SncpClient("test", asyncGroup, "0", sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
SncpClientConnection conn = client.createClientConnection(asyncGroup.newTCPClientConnection());
SncpClientCodec codec = new SncpClientCodec(conn);
List respResults = new ArrayList();

View File

@@ -32,7 +32,7 @@ public class SncpRequestParseTest {
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
SncpClient client = new SncpClient("test", asyncGroup, "0", sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
SncpClientConnection conn = client.createClientConnection(asyncGroup.newTCPClientConnection());
SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig();

View File

@@ -52,7 +52,7 @@ public class SncpSleepTest {
int port = server.getSocketAddress().getPort();
System.out.println("SNCP服务器启动端口: " + port);
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", port);
final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final SncpClient client = new SncpClient("", asyncGroup, "0", sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
rpcGroups.computeIfAbsent("cs", "TCP").putAddress(sncpAddress);
SncpSleepService remoteCService = Sncp.createSimpleRemoteService(SncpSleepService.class, resFactory, rpcGroups, client, "cs");

View File

@@ -81,7 +81,7 @@ public class SncpTest {
asyncGroup.start();
InetSocketAddress sncpAddress = addr;
final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100);
final SncpClient client = new SncpClient("", asyncGroup, "0", sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100);
final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "client");//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client");
factory.inject(service);

View File

@@ -16,6 +16,7 @@ import org.redkale.net.AsyncIOGroup;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.util.Utility;
/**
*
@@ -31,7 +32,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
@Override
public void run() {
try {
Thread.sleep(200);
Utility.sleep(200);
System.out.println(Thread.currentThread().getName() + " sleep 200ms后运行了异步方法-----------queryResultAsync方法");
future.complete("异步 result: " + bean);
} catch (Exception e) {
@@ -93,7 +94,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
final SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 7070);
rpcGroups.computeIfAbsent("g70", "TCP").putAddress(sncpAddress);
final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final SncpClient client = new SncpClient("", asyncGroup, "0", sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory);
for (Method method : service.getClass().getDeclaredMethods()) {