diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index e09511473..21dd77435 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -31,8 +31,8 @@ import org.redkale.net.http.*; import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.source.*; -import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.*; +import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.watch.WatchServlet; /** @@ -1156,6 +1156,32 @@ public final class Application { } logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); } + //------------------------------------ 注册 ResourceProducer MessageProducer ------------------------------------ + resourceFactory.register(new ResourceAnnotationProvider() { + @Override + public void load(ResourceFactory rf, String srcResourceName, Object srcObj, ResourceProducer annotation, Field field, Object attachment) { + if (field.getType() != MessageProducer.class) { + throw new RestException("@" + ResourceProducer.class.getSimpleName() + " must on " + MessageProducer.class.getName() + " type field, but on " + field); + } + MessageAgent agent = findMessageAgent(annotation.mq()); + if (agent == null) { + throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + annotation.mq() + ") on " + field); + } + try { + MessageProducer producer = agent.loadMessageProducer(annotation); + field.set(srcObj, producer); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception e) { + throw new RedkaleException(field + "inject error", e); + } + } + + @Override + public Class annotationType() { + return ResourceProducer.class; + } + }); //------------------------------------ 注册 HttpMessageClient ------------------------------------ resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { try { @@ -1206,6 +1232,18 @@ public final class Application { return null; } + private MessageAgent findMessageAgent(String mqName) { + if (this.messageAgents == null) { + return null; + } + for (MessageAgent agent : this.messageAgents) { + if (Objects.equals(mqName, agent.getName())) { + return agent; + } + } + return null; + } + CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { cacheSourceLock.lock(); try { diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index fb04fb38c..1575222c7 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -5,18 +5,22 @@ */ package org.redkale.mq; +import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.*; import java.util.stream.Collectors; -import org.redkale.annotation.AutoLoad; import org.redkale.annotation.*; +import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ResourceListener; +import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NAME; import static org.redkale.boot.Application.RESNAME_APP_NODEID; -import org.redkale.boot.*; +import org.redkale.convert.Convert; +import org.redkale.convert.ConvertFactory; +import org.redkale.convert.ConvertType; import org.redkale.net.Servlet; import org.redkale.net.http.*; import org.redkale.net.sncp.*; @@ -50,7 +54,11 @@ public abstract class MessageAgent implements Resourcable { protected AnyValue config; - protected MessageProducer messageProducer; + protected final ReentrantLock messageProducerLock = new ReentrantLock(); + + protected MessageProducer baseMessageProducer; + + protected Map messageProducers = new ConcurrentHashMap<>(); //key: group, sub-key: topic protected final ConcurrentHashMap> consumerMap = new ConcurrentHashMap<>(); @@ -151,6 +159,7 @@ public abstract class MessageAgent implements Resourcable { } } + @Deprecated protected List getMessageClientConsumers() { List consumers = new ArrayList<>(); MessageClientConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer; @@ -165,6 +174,7 @@ public abstract class MessageAgent implements Resourcable { return consumers; } + @Deprecated protected List getMessageClientProducers() { List producers = new ArrayList<>(); if (this.httpProducer != null) { @@ -189,6 +199,7 @@ public abstract class MessageAgent implements Resourcable { return name; } + @Deprecated public MessageCoder getMessageCoder() { return this.messageCoder; } @@ -209,6 +220,24 @@ public abstract class MessageAgent implements Resourcable { this.config = config; } + public MessageProducer loadMessageProducer(ResourceProducer ann) { + MessageProducer baseProducer = this.baseMessageProducer; + if (this.baseMessageProducer == null) { + messageProducerLock.lock(); + try { + if (this.baseMessageProducer == null) { + this.baseMessageProducer = createMessageProducer(); + } + } finally { + messageProducerLock.unlock(); + } + baseProducer = this.baseMessageProducer; + } + MessageProducer producer = baseProducer; + Objects.requireNonNull(producer); + return messageProducers.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t))); + } + public HttpMessageClient getHttpMessageClient() { return httpMessageClient; } @@ -232,6 +261,7 @@ public abstract class MessageAgent implements Resourcable { return name; } + @Deprecated //获取指定topic的生产处理器 public MessageClientProducer getSncpMessageClientProducer() { if (this.sncpProducer == null) { @@ -252,6 +282,7 @@ public abstract class MessageAgent implements Resourcable { return this.sncpProducer; } + @Deprecated public MessageClientProducer getHttpMessageClientProducer() { if (this.httpProducer == null) { producerLock.lock(); @@ -271,10 +302,16 @@ public abstract class MessageAgent implements Resourcable { return this.httpProducer; } + @Deprecated //创建指定topic的生产处理器 protected abstract MessageClientProducer createMessageClientProducer(String producerName); - //创建topic,如果已存在则跳过 + // + protected abstract MessageProducer createMessageProducer(); + + protected abstract void closeMessageProducer(MessageProducer messageProducer) throws Exception; + + // public abstract boolean createTopic(String... topics); //删除topic,如果不存在则跳过 @@ -421,6 +458,24 @@ public abstract class MessageAgent implements Resourcable { } + protected static class ConvertMessageProducer implements MessageProducer { + + private final MessageProducer producer; + + private final Convert convert; + + public ConvertMessageProducer(MessageProducer producer, Convert convert) { + this.producer = producer; + this.convert = convert; + } + + @Override + public CompletableFuture sendMessage(String topic, Integer partition, Convert convert0, Type type, Object value) { + return producer.sendMessage(topic, partition, convert0 == null ? this.convert : convert0, type, value); + } + + } + protected static class MessageClientConsumerNode { public final NodeServer server; @@ -442,4 +497,5 @@ public abstract class MessageAgent implements Resourcable { } } + } diff --git a/src/main/java/org/redkale/source/FilterNode.java b/src/main/java/org/redkale/source/FilterNode.java index d47c7b6a7..c92f65951 100644 --- a/src/main/java/org/redkale/source/FilterNode.java +++ b/src/main/java/org/redkale/source/FilterNode.java @@ -9,6 +9,7 @@ import java.io.Serializable; import java.lang.reflect.Array; import java.util.*; import java.util.function.*; +import org.redkale.convert.ConvertColumn; import static org.redkale.source.FilterExpress.*; import org.redkale.util.*; @@ -25,17 +26,23 @@ import org.redkale.util.*; */ public class FilterNode { //FilterNode 不能实现Serializable接口, 否则DataSource很多重载接口会出现冲突 + @ConvertColumn(index = 1) protected boolean readOnly; + @ConvertColumn(index = 2) protected String column; + @ConvertColumn(index = 3) protected FilterExpress express; + @ConvertColumn(index = 4) protected Serializable value; //---------------------------------------------- + @ConvertColumn(index = 5) protected boolean or; + @ConvertColumn(index = 6) protected FilterNode[] nodes; public FilterNode() {