MessageAgent优化

This commit is contained in:
redkale
2024-07-03 16:12:27 +08:00
parent 20d400178b
commit 7bbf05af21
3 changed files with 8 additions and 7 deletions

View File

@@ -74,12 +74,12 @@ serviceid1_name1 serviceid1_name2 serviceid2_name1 serviceid2_name2
name: 服务的名称用于监控识别多个mq节点时只能有一个name为空的节点mq.name不能重复,命名规则: 字母、数字、下划线
type 实现类名必须是org.redkale.mq.MessageAgent的子类
threads线程数为0表示使用workExecutor。默认: CPU核数, 核数=1的情况下默认值为2JDK 21以上版本默认使用虚拟线程池
rpcfirstcluster和mq同名组件时HttpRpcClient优先使用MQ默认不优先走MQ。
rpccluster和mq同名组件时HttpRpcClient优先使用MQ默认不优先走MQ。
coder: MessageRecord的解析器类必须是org.redkale.mq.MessageCoder<MessageRecord>的实现类,
可对数据包进行加密解密默认值org.redkale.mq.MessageRecordCoder
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
-->
<mq name="" type="org.redkalex.mq.kafka.KafkaMessageAgent" rpcfirst="false" threads="4">
<mq name="" type="org.redkalex.mq.kafka.KafkaMessageAgent" rpc="false" threads="4">
<servers value="127.0.0.1:9101"/>
<!--
加载所有的MessageConsumer实例;

View File

@@ -663,7 +663,7 @@ public final class Application {
if (messageAgent != null) {
if (clusterAgent == null
|| !Objects.equals(clusterAgent.getName(), resourceName)
|| messageAgent.isRpcFirst()) {
|| messageAgent.isRpc()) {
HttpRpcClient rpcClient = messageAgent.getHttpRpcClient();
if (field != null) {
field.set(srcObj, rpcClient);

View File

@@ -85,7 +85,8 @@ public abstract class MessageAgent implements MessageManager {
protected final Map<String, Map<String, MessageConsumerWrapper>> messageConsumerMap = new HashMap<>();
// -------------------------- HttpRpcClient、SncpMessageClient --------------------------
private boolean rpcFirst;
// cluster和mq同名组件时HttpRpcClient优先使用MQ默认不优先走MQ。
private boolean rpc;
private HttpRpcMessageClient httpRpcClient;
@@ -109,7 +110,7 @@ public abstract class MessageAgent implements MessageManager {
public void init(AnyValue config) {
this.name = checkName(config.getValue("name", ""));
this.rpcFirst = config.getBoolValue("rpcfirst", false);
this.rpc = config.getBoolValue("rpc", false);
this.httpAppRespTopic = generateHttpAppRespTopic();
this.sncpAppRespTopic = generateSncpAppRespTopic();
int threads = config.getIntValue("threads", application.isVirtualWorkExecutor() ? 0 : -1);
@@ -363,8 +364,8 @@ public abstract class MessageAgent implements MessageManager {
return sncpMessageClient;
}
public boolean isRpcFirst() {
return rpcFirst;
public boolean isRpc() {
return rpc;
}
protected String checkName(String name) { // 不能含特殊字符