优化MessageProducer

This commit is contained in:
redkale
2023-09-17 23:42:56 +08:00
parent d748ca1bde
commit bca85e7e5f
3 changed files with 106 additions and 5 deletions

View File

@@ -31,8 +31,8 @@ import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
import org.redkale.source.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.util.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.watch.WatchServlet;
/**
@@ -1156,6 +1156,32 @@ public final class Application {
}
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
}
//------------------------------------ 注册 ResourceProducer MessageProducer ------------------------------------
resourceFactory.register(new ResourceAnnotationProvider<ResourceProducer>() {
@Override
public void load(ResourceFactory rf, String srcResourceName, Object srcObj, ResourceProducer annotation, Field field, Object attachment) {
if (field.getType() != MessageProducer.class) {
throw new RestException("@" + ResourceProducer.class.getSimpleName() + " must on " + MessageProducer.class.getName() + " type field, but on " + field);
}
MessageAgent agent = findMessageAgent(annotation.mq());
if (agent == null) {
throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + annotation.mq() + ") on " + field);
}
try {
MessageProducer producer = agent.loadMessageProducer(annotation);
field.set(srcObj, producer);
} catch (RuntimeException ex) {
throw ex;
} catch (Exception e) {
throw new RedkaleException(field + "inject error", e);
}
}
@Override
public Class<ResourceProducer> annotationType() {
return ResourceProducer.class;
}
});
//------------------------------------ 注册 HttpMessageClient ------------------------------------
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
try {
@@ -1206,6 +1232,18 @@ public final class Application {
return null;
}
private MessageAgent findMessageAgent(String mqName) {
if (this.messageAgents == null) {
return null;
}
for (MessageAgent agent : this.messageAgents) {
if (Objects.equals(mqName, agent.getName())) {
return agent;
}
}
return null;
}
CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
cacheSourceLock.lock();
try {

View File

@@ -5,18 +5,22 @@
*/
package org.redkale.mq;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.*;
import java.util.stream.Collectors;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceListener;
import org.redkale.boot.*;
import static org.redkale.boot.Application.RESNAME_APP_NAME;
import static org.redkale.boot.Application.RESNAME_APP_NODEID;
import org.redkale.boot.*;
import org.redkale.convert.Convert;
import org.redkale.convert.ConvertFactory;
import org.redkale.convert.ConvertType;
import org.redkale.net.Servlet;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
@@ -50,7 +54,11 @@ public abstract class MessageAgent implements Resourcable {
protected AnyValue config;
protected MessageProducer messageProducer;
protected final ReentrantLock messageProducerLock = new ReentrantLock();
protected MessageProducer baseMessageProducer;
protected Map<ConvertType, ConvertMessageProducer> messageProducers = new ConcurrentHashMap<>();
//key: group, sub-key: topic
protected final ConcurrentHashMap<String, ConcurrentHashMap<String, MessageConsumer>> consumerMap = new ConcurrentHashMap<>();
@@ -151,6 +159,7 @@ public abstract class MessageAgent implements Resourcable {
}
}
@Deprecated
protected List<MessageClientConsumer> getMessageClientConsumers() {
List<MessageClientConsumer> consumers = new ArrayList<>();
MessageClientConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer;
@@ -165,6 +174,7 @@ public abstract class MessageAgent implements Resourcable {
return consumers;
}
@Deprecated
protected List<MessageClientProducer> getMessageClientProducers() {
List<MessageClientProducer> producers = new ArrayList<>();
if (this.httpProducer != null) {
@@ -189,6 +199,7 @@ public abstract class MessageAgent implements Resourcable {
return name;
}
@Deprecated
public MessageCoder<MessageRecord> getMessageCoder() {
return this.messageCoder;
}
@@ -209,6 +220,24 @@ public abstract class MessageAgent implements Resourcable {
this.config = config;
}
public MessageProducer loadMessageProducer(ResourceProducer ann) {
MessageProducer baseProducer = this.baseMessageProducer;
if (this.baseMessageProducer == null) {
messageProducerLock.lock();
try {
if (this.baseMessageProducer == null) {
this.baseMessageProducer = createMessageProducer();
}
} finally {
messageProducerLock.unlock();
}
baseProducer = this.baseMessageProducer;
}
MessageProducer producer = baseProducer;
Objects.requireNonNull(producer);
return messageProducers.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t)));
}
public HttpMessageClient getHttpMessageClient() {
return httpMessageClient;
}
@@ -232,6 +261,7 @@ public abstract class MessageAgent implements Resourcable {
return name;
}
@Deprecated
//获取指定topic的生产处理器
public MessageClientProducer getSncpMessageClientProducer() {
if (this.sncpProducer == null) {
@@ -252,6 +282,7 @@ public abstract class MessageAgent implements Resourcable {
return this.sncpProducer;
}
@Deprecated
public MessageClientProducer getHttpMessageClientProducer() {
if (this.httpProducer == null) {
producerLock.lock();
@@ -271,10 +302,16 @@ public abstract class MessageAgent implements Resourcable {
return this.httpProducer;
}
@Deprecated
//创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer(String producerName);
//创建topic如果已存在则跳过
//
protected abstract MessageProducer createMessageProducer();
protected abstract void closeMessageProducer(MessageProducer messageProducer) throws Exception;
//
public abstract boolean createTopic(String... topics);
//删除topic如果不存在则跳过
@@ -421,6 +458,24 @@ public abstract class MessageAgent implements Resourcable {
}
protected static class ConvertMessageProducer implements MessageProducer {
private final MessageProducer producer;
private final Convert convert;
public ConvertMessageProducer(MessageProducer producer, Convert convert) {
this.producer = producer;
this.convert = convert;
}
@Override
public CompletableFuture<Void> sendMessage(String topic, Integer partition, Convert convert0, Type type, Object value) {
return producer.sendMessage(topic, partition, convert0 == null ? this.convert : convert0, type, value);
}
}
protected static class MessageClientConsumerNode {
public final NodeServer server;
@@ -442,4 +497,5 @@ public abstract class MessageAgent implements Resourcable {
}
}
}

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.Array;
import java.util.*;
import java.util.function.*;
import org.redkale.convert.ConvertColumn;
import static org.redkale.source.FilterExpress.*;
import org.redkale.util.*;
@@ -25,17 +26,23 @@ import org.redkale.util.*;
*/
public class FilterNode { //FilterNode 不能实现Serializable接口 否则DataSource很多重载接口会出现冲突
@ConvertColumn(index = 1)
protected boolean readOnly;
@ConvertColumn(index = 2)
protected String column;
@ConvertColumn(index = 3)
protected FilterExpress express;
@ConvertColumn(index = 4)
protected Serializable value;
//----------------------------------------------
@ConvertColumn(index = 5)
protected boolean or;
@ConvertColumn(index = 6)
protected FilterNode[] nodes;
public FilterNode() {