This commit is contained in:
@@ -25,6 +25,7 @@ import org.redkale.boot.ClassFilter.FilterEntry;
|
|||||||
import org.redkale.convert.Convert;
|
import org.redkale.convert.Convert;
|
||||||
import org.redkale.convert.bson.BsonFactory;
|
import org.redkale.convert.bson.BsonFactory;
|
||||||
import org.redkale.convert.json.JsonFactory;
|
import org.redkale.convert.json.JsonFactory;
|
||||||
|
import org.redkale.mq.MessageAgent;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.*;
|
||||||
import org.redkale.net.http.MimeType;
|
import org.redkale.net.http.MimeType;
|
||||||
import org.redkale.net.sncp.*;
|
import org.redkale.net.sncp.*;
|
||||||
@@ -128,6 +129,9 @@ public final class Application {
|
|||||||
//第三方服务发现管理接口
|
//第三方服务发现管理接口
|
||||||
final ClusterAgent clusterAgent;
|
final ClusterAgent clusterAgent;
|
||||||
|
|
||||||
|
//MQ管理接口
|
||||||
|
final MessageAgent messageAgent;
|
||||||
|
|
||||||
//全局根ResourceFactory
|
//全局根ResourceFactory
|
||||||
final ResourceFactory resourceFactory = ResourceFactory.root();
|
final ResourceFactory resourceFactory = ResourceFactory.root();
|
||||||
|
|
||||||
@@ -269,6 +273,7 @@ public final class Application {
|
|||||||
final AnyValue resources = config.getAnyValue("resources");
|
final AnyValue resources = config.getAnyValue("resources");
|
||||||
TransportStrategy strategy = null;
|
TransportStrategy strategy = null;
|
||||||
ClusterAgent cluster = null;
|
ClusterAgent cluster = null;
|
||||||
|
MessageAgent mq = null;
|
||||||
int bufferCapacity = 32 * 1024;
|
int bufferCapacity = 32 * 1024;
|
||||||
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8;
|
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8;
|
||||||
int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS;
|
int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS;
|
||||||
@@ -312,6 +317,7 @@ public final class Application {
|
|||||||
}
|
}
|
||||||
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
||||||
}
|
}
|
||||||
|
|
||||||
AnyValue clusterConf = resources.getAnyValue("cluster");
|
AnyValue clusterConf = resources.getAnyValue("cluster");
|
||||||
if (clusterConf != null) {
|
if (clusterConf != null) {
|
||||||
try {
|
try {
|
||||||
@@ -326,6 +332,21 @@ public final class Application {
|
|||||||
logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e);
|
logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AnyValue mqConf = resources.getAnyValue("mq");
|
||||||
|
if (mqConf != null) {
|
||||||
|
try {
|
||||||
|
Class type = classLoader.loadClass(mqConf.getValue("value"));
|
||||||
|
if (!MessageAgent.class.isAssignableFrom(type)) {
|
||||||
|
logger.log(Level.SEVERE, "load application mq resource, but not " + MessageAgent.class.getSimpleName() + " error: " + mqConf);
|
||||||
|
} else {
|
||||||
|
mq = (MessageAgent) type.getDeclaredConstructor().newInstance();
|
||||||
|
mq.setConfig(mqConf);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.SEVERE, "load application mq resource error: " + mq, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (transportGroup == null) {
|
if (transportGroup == null) {
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
@@ -362,6 +383,7 @@ public final class Application {
|
|||||||
cluster.init(cluster.getConfig());
|
cluster.init(cluster.getConfig());
|
||||||
}
|
}
|
||||||
this.clusterAgent = cluster;
|
this.clusterAgent = cluster;
|
||||||
|
this.messageAgent = mq;
|
||||||
Thread.currentThread().setContextClassLoader(this.classLoader);
|
Thread.currentThread().setContextClassLoader(this.classLoader);
|
||||||
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
||||||
}
|
}
|
||||||
|
|||||||
21
src/org/redkale/mq/HttpMessageContent.java
Normal file
21
src/org/redkale/mq/HttpMessageContent.java
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
/*
|
||||||
|
* To change this license header, choose License Headers in Project Properties.
|
||||||
|
* To change this template file, choose Tools | Templates
|
||||||
|
* and open the template in the editor.
|
||||||
|
*/
|
||||||
|
package org.redkale.mq;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author zhangjx
|
||||||
|
*/
|
||||||
|
public class HttpMessageContent implements Serializable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return JsonConvert.root().convertTo(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,6 +22,8 @@ public abstract class MessageAgent {
|
|||||||
|
|
||||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
|
|
||||||
|
protected AnyValue config;
|
||||||
|
|
||||||
//本地Service消息接收处理器, key:topic
|
//本地Service消息接收处理器, key:topic
|
||||||
protected Map<String, Service> localConsumers;
|
protected Map<String, Service> localConsumers;
|
||||||
|
|
||||||
@@ -33,6 +35,14 @@ public abstract class MessageAgent {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AnyValue getConfig() {
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConfig(AnyValue config) {
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
protected String checkName(String name) { //不能含特殊字符
|
protected String checkName(String name) { //不能含特殊字符
|
||||||
if (name.isEmpty()) throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
|
if (name.isEmpty()) throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
|
||||||
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
|
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
|
||||||
|
|||||||
Reference in New Issue
Block a user