This commit is contained in:
Redkale
2020-05-30 10:18:00 +08:00
parent cac8662c01
commit c108ab196c
4 changed files with 39 additions and 21 deletions

View File

@@ -58,13 +58,13 @@
</cluster>
-->
<!--
【节点全局唯一】
MQ管理接口配置
value 实现类名必须是org.redkale.mq.MessageAgent的子类
name: 服务的名称用于监控识别多个mq节点时只能有一个name为空的节点mq.name不能重复,命名规则: 字母、数字、下划线
value 实现类名必须是org.redkale.mq.MessageAgent的子类
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
-->
<!--
<mq value="org.redkalex.mq.kafka.KafkaMessageAgent">
<mq name="" value="org.redkalex.mq.kafka.KafkaMessageAgent">
<servers value="127.0.0.1:9101"/>
<consumer>
<property name="xxxxxx" value="XXXXXXXX"/>

View File

@@ -130,7 +130,7 @@ public final class Application {
final ClusterAgent clusterAgent;
//MQ管理接口
final MessageAgent messageAgent;
final MessageAgent[] messageAgents;
//全局根ResourceFactory
final ResourceFactory resourceFactory = ResourceFactory.root();
@@ -273,7 +273,7 @@ public final class Application {
final AnyValue resources = config.getAnyValue("resources");
TransportStrategy strategy = null;
ClusterAgent cluster = null;
MessageAgent mq = null;
MessageAgent[] mqs = null;
int bufferCapacity = 32 * 1024;
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8;
int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS;
@@ -333,18 +333,26 @@ public final class Application {
}
}
AnyValue mqConf = resources.getAnyValue("mq");
if (mqConf != null) {
try {
Class type = classLoader.loadClass(mqConf.getValue("value"));
if (!MessageAgent.class.isAssignableFrom(type)) {
logger.log(Level.SEVERE, "load application mq resource, but not " + MessageAgent.class.getSimpleName() + " error: " + mqConf);
} else {
mq = (MessageAgent) type.getDeclaredConstructor().newInstance();
mq.setConfig(mqConf);
AnyValue[] mqConfs = resources.getAnyValues("mq");
if (mqConfs != null && mqConfs.length > 0) {
mqs = new MessageAgent[mqConfs.length];
Set<String> mqnames = new HashSet<>();
for (int i = 0; i < mqConfs.length; i++) {
AnyValue mqConf = mqConfs[0];
String mqname = mqConf.getValue("name", "");
if (mqnames.contains(mqname)) throw new RuntimeException("mq.name(" + mqname + ") is repeat");
try {
Class type = classLoader.loadClass(mqConf.getValue("value"));
if (!MessageAgent.class.isAssignableFrom(type)) {
logger.log(Level.SEVERE, "load application mq resource, but not " + MessageAgent.class.getSimpleName() + " error: " + mqConf);
} else {
mqs[i] = (MessageAgent) type.getDeclaredConstructor().newInstance();
mqs[i].setConfig(mqConf);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "load application mq resource error: " + mqs[i], e);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "load application mq resource error: " + mq, e);
mqnames.add(mqname);
}
}
}
@@ -383,7 +391,7 @@ public final class Application {
cluster.init(cluster.getConfig());
}
this.clusterAgent = cluster;
this.messageAgent = mq;
this.messageAgents = mqs;
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
}
@@ -400,6 +408,10 @@ public final class Application {
return clusterAgent;
}
public MessageAgent[] getMessageAgents() {
return messageAgents;
}
public RedkaleClassLoader getClassLoader() {
return classLoader;
}

View File

@@ -542,15 +542,15 @@ public abstract class NodeServer {
agent.deregister(this, protocol, localServices, remoteServices);
}
}
if (application.messageAgent != null) { //MQ
if (application.messageAgents != null) { //MQ
}
}
//Server.start执行之后调用
protected void postStartServer(Set<Service> localServices, Set<Service> remoteServices) {
if (application.messageAgent != null) { //MQ
final MessageAgent agent = application.messageAgent;
if (application.messageAgents != null) { //MQ
final MessageAgent agent = application.messageAgents[0];
}
}

View File

@@ -26,6 +26,8 @@ public abstract class MessageAgent {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected String name;
protected AnyValue config;
protected MessageProducer producer;
@@ -41,6 +43,10 @@ public abstract class MessageAgent {
}
public String getName() {
return name;
}
public AnyValue getConfig() {
return config;
}
@@ -50,7 +56,7 @@ public abstract class MessageAgent {
}
protected String checkName(String name) { //不能含特殊字符
if (name.isEmpty()) throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
if (name.isEmpty()) return name;
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
for (char ch : name.toCharArray()) {
if (!((ch >= '0' && ch <= '9') || ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符