From 71244732f2217ff8175841ca461933c977167692 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 28 May 2020 11:22:30 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 22 ++++++++++++++++++++++ src/org/redkale/mq/HttpMessageContent.java | 21 +++++++++++++++++++++ src/org/redkale/mq/MessageAgent.java | 10 ++++++++++ 3 files changed, 53 insertions(+) create mode 100644 src/org/redkale/mq/HttpMessageContent.java diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index ed4f5a374..9c5053f54 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -25,6 +25,7 @@ import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonFactory; import org.redkale.convert.json.JsonFactory; +import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.http.MimeType; import org.redkale.net.sncp.*; @@ -128,6 +129,9 @@ public final class Application { //第三方服务发现管理接口 final ClusterAgent clusterAgent; + //MQ管理接口 + final MessageAgent messageAgent; + //全局根ResourceFactory final ResourceFactory resourceFactory = ResourceFactory.root(); @@ -269,6 +273,7 @@ public final class Application { final AnyValue resources = config.getAnyValue("resources"); TransportStrategy strategy = null; ClusterAgent cluster = null; + MessageAgent mq = null; int bufferCapacity = 32 * 1024; int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8; int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS; @@ -312,6 +317,7 @@ public final class Application { } logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); } + AnyValue clusterConf = resources.getAnyValue("cluster"); if (clusterConf != null) { try { @@ -326,6 +332,21 @@ public final class Application { logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e); } } + + AnyValue mqConf = resources.getAnyValue("mq"); + if (mqConf != null) { + try { + Class type = classLoader.loadClass(mqConf.getValue("value")); + if (!MessageAgent.class.isAssignableFrom(type)) { + logger.log(Level.SEVERE, "load application mq resource, but not " + MessageAgent.class.getSimpleName() + " error: " + mqConf); + } else { + mq = (MessageAgent) type.getDeclaredConstructor().newInstance(); + mq.setConfig(mqConf); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "load application mq resource error: " + mq, e); + } + } } if (transportGroup == null) { final AtomicInteger counter = new AtomicInteger(); @@ -362,6 +383,7 @@ public final class Application { cluster.init(cluster.getConfig()); } this.clusterAgent = cluster; + this.messageAgent = mq; Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); } diff --git a/src/org/redkale/mq/HttpMessageContent.java b/src/org/redkale/mq/HttpMessageContent.java new file mode 100644 index 000000000..70686e67a --- /dev/null +++ b/src/org/redkale/mq/HttpMessageContent.java @@ -0,0 +1,21 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.io.Serializable; +import org.redkale.convert.json.JsonConvert; + +/** + * + * @author zhangjx + */ +public class HttpMessageContent implements Serializable { + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } +} diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 9b880cdc6..07bc00763 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -22,6 +22,8 @@ public abstract class MessageAgent { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + protected AnyValue config; + //本地Service消息接收处理器, key:topic protected Map localConsumers; @@ -33,6 +35,14 @@ public abstract class MessageAgent { } + public AnyValue getConfig() { + return config; + } + + public void setConfig(AnyValue config) { + this.config = config; + } + protected String checkName(String name) { //不能含特殊字符 if (name.isEmpty()) throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9"); if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");