This commit is contained in:
Redkale
2020-06-06 23:37:59 +08:00
parent 8d5e61a9a2
commit 1e15eb31c7
7 changed files with 40 additions and 16 deletions

View File

@@ -65,6 +65,7 @@
MQ管理接口配置
不同MQ节点所配置的MQ集群不能重复。
name: 服务的名称用于监控识别多个mq节点时只能有一个name为空的节点mq.name不能重复,命名规则: 字母、数字、下划线
names: 服务的扩展名称用于监控识别一个mq可以使用多个资源名称多个名称用分号;隔开, 且name需要全局唯一
value 实现类名必须是org.redkale.mq.MessageAgent的子类
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
-->

View File

@@ -357,6 +357,15 @@ public final class Application {
AnyValue mqConf = mqConfs[0];
String mqname = mqConf.getValue("name", "");
if (mqnames.contains(mqname)) throw new RuntimeException("mq.name(" + mqname + ") is repeat");
mqnames.add(mqname);
String namex = mqConf.getValue("names");
if (namex != null && !namex.isEmpty()) {
for (String n : namex.split(";")) {
if (n.trim().isEmpty()) continue;
if (mqnames.contains(n.trim())) throw new RuntimeException("mq.name(" + n.trim() + ") is repeat");
mqnames.add(n.trim());
}
}
try {
String classval = mqConf.getValue("value");
if (classval == null || classval.isEmpty()) {
@@ -382,7 +391,6 @@ public final class Application {
} catch (Exception e) {
logger.log(Level.SEVERE, "load application mq resource error: " + mqs[i], e);
}
mqnames.add(mqname);
}
}
}
@@ -426,6 +434,11 @@ public final class Application {
this.resourceFactory.inject(agent);
agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
if (agent.getNames() != null) {
for (String n : agent.getNames()) {
this.resourceFactory.register(n, MessageAgent.class, agent);
}
}
}
}
this.messageAgents = mqs;
@@ -449,6 +462,11 @@ public final class Application {
if (messageAgents == null) return null;
for (MessageAgent agent : messageAgents) {
if (agent.getName().equals(name)) return agent;
if (agent.getNames() != null) {
for (String n : agent.getNames()) {
if (n.equals(name)) return agent;
}
}
}
return null;
}

View File

@@ -332,7 +332,7 @@ public class NodeHttpServer extends NodeServer {
return;
}
restedObjects.add(stype); //避免重复创建Rest对象
WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty(), messageAgent);
WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty());
if (servlet == null) return; //没有RestOnMessage方法的HttpServlet调用Rest.createRestWebSocketServlet就会返回null
String prefix2 = prefix;
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);

View File

@@ -38,6 +38,8 @@ public abstract class MessageAgent {
protected String name;
protected String[] names;
protected AnyValue config;
protected MessageProducer producer;
@@ -55,6 +57,16 @@ public abstract class MessageAgent {
protected HashMap<String, MessageNode> messageNodes = new LinkedHashMap<>();
public void init(AnyValue config) {
this.name = checkName(config.getValue("name", ""));
String namex = config.getValue("names");
if (namex != null && !namex.isEmpty()) {
List<String> list = new ArrayList<>();
for (String n : namex.split(";")) {
if (n.trim().isEmpty()) continue;
list.add(n.trim());
}
if (!list.isEmpty()) this.names = list.toArray(new String[list.size()]);
}
}
//ServiceLoader时判断配置是否符合当前实现类
@@ -103,6 +115,10 @@ public abstract class MessageAgent {
return name;
}
public String[] getNames() {
return names;
}
public AnyValue getConfig() {
return config;
}

View File

@@ -16,7 +16,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import org.redkale.net.http.HttpContext.HttpContextConfig;
import org.redkale.net.http.HttpResponse.HttpResponseConfig;
@@ -218,12 +217,11 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
* @param webSocketType WebSocket的类型
* @param prefix url前缀
* @param conf 配置信息
* @param messageAgent MessageAgent
*
* @return RestServlet
*/
public <S extends WebSocket, T extends WebSocketServlet> T addRestWebSocketServlet(final ClassLoader classLoader, final Class<S> webSocketType, final String prefix, final AnyValue conf, final MessageAgent messageAgent) {
T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType, messageAgent);
public <S extends WebSocket, T extends WebSocketServlet> T addRestWebSocketServlet(final ClassLoader classLoader, final Class<S> webSocketType, final String prefix, final AnyValue conf) {
T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType);
if (servlet != null) this.prepare.addServlet(servlet, prefix, conf);
return servlet;
}

View File

@@ -22,7 +22,6 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.*;
@@ -219,7 +218,7 @@ public final class Rest {
return t == null ? defValue : t;
}
public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType, final MessageAgent messageAgent) {
public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType) {
if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet");
if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet");
if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet");
@@ -720,11 +719,6 @@ public final class Rest {
Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray());
try {
T servlet = (T) newClazz.getDeclaredConstructor().newInstance();
if (messageAgent != null) {
Field agentField = servlet.getClass().getDeclaredField("messageAgent");
agentField.setAccessible(true);
agentField.set(servlet, messageAgent);
}
newClazz.getField("_redkale_annotations").set(null, msgclassToAnnotations);
if (rws.cryptor() != Cryptor.class) {
Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance();

View File

@@ -18,7 +18,6 @@ import java.util.logging.*;
import java.util.zip.*;
import javax.annotation.*;
import org.redkale.convert.Convert;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -116,8 +115,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Resource(name = "$")
protected WebSocketNode node;
protected MessageAgent messageAgent;
@Resource(name = "SERVER_RESFACTORY")
protected ResourceFactory resourceFactory;