MessageAgent增加rpcFirst配置
This commit is contained in:
@@ -70,11 +70,12 @@
|
|||||||
name: 服务的名称,用于监控识别,多个mq节点时只能有一个name为空的节点,mq.name不能重复,命名规则: 字母、数字、下划线
|
name: 服务的名称,用于监控识别,多个mq节点时只能有一个name为空的节点,mq.name不能重复,命名规则: 字母、数字、下划线
|
||||||
type: 实现类名,必须是org.redkale.mq.MessageAgent的子类
|
type: 实现类名,必须是org.redkale.mq.MessageAgent的子类
|
||||||
threads:线程数,为0表示使用workExecutor。默认: CPU核数, 核数=1的情况下默认值为2,JDK 21以上版本默认使用虚拟线程池
|
threads:线程数,为0表示使用workExecutor。默认: CPU核数, 核数=1的情况下默认值为2,JDK 21以上版本默认使用虚拟线程池
|
||||||
|
rpcfirst:cluster和mq同名组件时,HttpRpcClient优先使用MQ,默认不优先走MQ。
|
||||||
coder: MessageRecord的解析器类,必须是org.redkale.mq.MessageCoder<MessageRecord>的实现类,
|
coder: MessageRecord的解析器类,必须是org.redkale.mq.MessageCoder<MessageRecord>的实现类,
|
||||||
可对数据包进行加密解密,默认值:org.redkale.mq.MessageRecordCoder
|
可对数据包进行加密解密,默认值:org.redkale.mq.MessageRecordCoder
|
||||||
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
|
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
|
||||||
-->
|
-->
|
||||||
<mq name="" type="org.redkalex.mq.kafka.KafkaMessageAgent" threads="4">
|
<mq name="" type="org.redkalex.mq.kafka.KafkaMessageAgent" rpcfirst="false" threads="4">
|
||||||
<servers value="127.0.0.1:9101"/>
|
<servers value="127.0.0.1:9101"/>
|
||||||
<!--
|
<!--
|
||||||
加载所有的MessageConsumer实例;
|
加载所有的MessageConsumer实例;
|
||||||
|
|||||||
@@ -1212,11 +1212,14 @@ public final class Application {
|
|||||||
if (this.messageAgents != null) {
|
if (this.messageAgents != null) {
|
||||||
MessageAgent messageAgent = this.resourceFactory.find(resourceName, MessageAgent.class);
|
MessageAgent messageAgent = this.resourceFactory.find(resourceName, MessageAgent.class);
|
||||||
if (messageAgent != null) {
|
if (messageAgent != null) {
|
||||||
HttpRpcClient rpcClient = messageAgent.getHttpRpcClient();
|
if (clusterAgent == null || !Objects.equals(clusterAgent.getName(), resourceName)
|
||||||
field.set(srcObj, rpcClient);
|
|| messageAgent.isRpcFirst()) {
|
||||||
rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值;
|
HttpRpcClient rpcClient = messageAgent.getHttpRpcClient();
|
||||||
rf.register(resourceName, HttpRpcClient.class, rpcClient);
|
field.set(srcObj, rpcClient);
|
||||||
return rpcClient;
|
rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值;
|
||||||
|
rf.register(resourceName, HttpRpcClient.class, rpcClient);
|
||||||
|
return rpcClient;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (clusterAgent == null) {
|
if (clusterAgent == null) {
|
||||||
|
|||||||
@@ -78,6 +78,8 @@ public abstract class MessageAgent implements Resourcable {
|
|||||||
protected final Map<String, Map<String, MessageConsumerWrapper>> messageConsumerMap = new HashMap<>();
|
protected final Map<String, Map<String, MessageConsumerWrapper>> messageConsumerMap = new HashMap<>();
|
||||||
|
|
||||||
//-------------------------- HttpRpcClient、SncpMessageClient --------------------------
|
//-------------------------- HttpRpcClient、SncpMessageClient --------------------------
|
||||||
|
private boolean rpcFirst;
|
||||||
|
|
||||||
private HttpRpcMessageClient httpRpcClient;
|
private HttpRpcMessageClient httpRpcClient;
|
||||||
|
|
||||||
private String httpAppRespTopic;
|
private String httpAppRespTopic;
|
||||||
@@ -100,6 +102,7 @@ public abstract class MessageAgent implements Resourcable {
|
|||||||
|
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
this.name = checkName(config.getValue("name", ""));
|
this.name = checkName(config.getValue("name", ""));
|
||||||
|
this.rpcFirst = config.getBoolValue("rpcfirst", false);
|
||||||
this.httpAppRespTopic = generateHttpAppRespTopic();
|
this.httpAppRespTopic = generateHttpAppRespTopic();
|
||||||
this.sncpAppRespTopic = generateSncpAppRespTopic();
|
this.sncpAppRespTopic = generateSncpAppRespTopic();
|
||||||
int threads = config.getIntValue("threads", application.isVirtualWorkExecutor() ? 0 : -1);
|
int threads = config.getIntValue("threads", application.isVirtualWorkExecutor() ? 0 : -1);
|
||||||
@@ -344,6 +347,10 @@ public abstract class MessageAgent implements Resourcable {
|
|||||||
return sncpMessageClient;
|
return sncpMessageClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isRpcFirst() {
|
||||||
|
return rpcFirst;
|
||||||
|
}
|
||||||
|
|
||||||
protected String checkName(String name) { //不能含特殊字符
|
protected String checkName(String name) { //不能含特殊字符
|
||||||
if (name.isEmpty()) {
|
if (name.isEmpty()) {
|
||||||
return name;
|
return name;
|
||||||
|
|||||||
Reference in New Issue
Block a user