DynForMessage

This commit is contained in:
redkale
2023-12-24 22:42:34 +08:00
parent 88392fd5dd
commit a5fc08a821
7 changed files with 127 additions and 7 deletions

View File

@@ -4,6 +4,7 @@
package org.redkale.mq;
import org.redkale.annotation.Component;
import org.redkale.asm.AsmDepends;
import org.redkale.service.Local;
import org.redkale.util.AnyValue;
@@ -20,6 +21,7 @@ import org.redkale.util.AnyValue;
*/
@Local
@Component
@AsmDepends
public interface MessageConsumer<T> {
default void init(AnyValue config) {

View File

@@ -6,6 +6,7 @@ package org.redkale.mq;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import org.redkale.asm.AsmDepends;
import org.redkale.convert.ConvertType;
/**
@@ -18,6 +19,7 @@ import org.redkale.convert.ConvertType;
*
* @since 2.8.0
*/
@AsmDepends
@Documented
@Target({TYPE})
@Retention(RUNTIME)

View File

@@ -0,0 +1,34 @@
/*
*
*/
package org.redkale.mq.spi;
import java.lang.annotation.Documented;
import static java.lang.annotation.ElementType.TYPE;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Target;
import org.redkale.mq.MessageConsumer;
/**
* 只标准在类上面,因动态方法不作变动,只增加内部类
*
* @author zhangjx
*/
@Documented
@Target({TYPE})
@Retention(RUNTIME)
@Repeatable(DynForMessage.DynForMessages.class)
public @interface DynForMessage {
Class<? extends MessageConsumer> value();
@Documented
@Target({TYPE})
@Retention(RUNTIME)
@interface DynForMessages {
DynForMessage[] value();
}
}

View File

@@ -3,9 +3,6 @@
*/
package org.redkale.mq.spi;
import org.redkale.mq.spi.HttpResultCoder;
import org.redkale.mq.spi.MessageCoder;
import org.redkale.mq.spi.HttpSimpleRequestCoder;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import org.redkale.cluster.HttpRpcClient;

View File

@@ -0,0 +1,82 @@
/*
*
*/
package org.redkale.mq.spi;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.util.List;
import org.redkale.asm.AsmMethodBoost;
import org.redkale.asm.ClassWriter;
import org.redkale.convert.ConvertFactory;
import org.redkale.inject.ResourceFactory;
import org.redkale.mq.MessageConext;
import org.redkale.mq.Messaged;
import org.redkale.util.RedkaleException;
/**
*
* @author zhangjx
*/
public class MessageAsmMethodBoost extends AsmMethodBoost {
private static final List<Class<? extends Annotation>> FILTER_ANN = List.of(Messaged.class);
public MessageAsmMethodBoost(Class serviceType) {
super(serviceType);
}
@Override
public List<Class<? extends Annotation>> filterMethodAnnotations(Method method) {
return FILTER_ANN;
}
@Override
public String doMethod(ClassWriter cw, String newDynName, String fieldPrefix, List filterAnns, Method method, String newMethodName) {
if (serviceType.getAnnotation(DynForMessage.class) != null) {
return newMethodName;
}
Messaged messaged = method.getAnnotation(Messaged.class);
if (messaged == null) {
return newMethodName;
}
if (Modifier.isFinal(method.getModifiers()) || Modifier.isStatic(method.getModifiers())) {
throw new RedkaleException("@" + Messaged.class.getSimpleName() + " cannot on final or static method, but on " + method);
}
if (!Modifier.isProtected(method.getModifiers()) && !Modifier.isPublic(method.getModifiers())) {
throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on protected or public method, but on " + method);
}
int paramCount = method.getParameterCount();
if (paramCount != 1 && paramCount != 2) {
throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on one or two parameter method, but on " + method);
}
int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext;
Type messageType;
Type[] paramTypes = method.getGenericParameterTypes();
if (paramCount == 1) {
messageType = paramTypes[0];
paramKind = 1;
} else {
if (paramTypes[0] == MessageConext.class) {
messageType = paramTypes[1];
paramKind = 2;
} else if (paramTypes[1] == MessageConext.class) {
messageType = paramTypes[0];
paramKind = 3;
} else {
throw new RedkaleException("@" + Messaged.class.getSimpleName() + " on two-parameter method must contains "
+ MessageConext.class.getSimpleName() + " parameter type, but on " + method);
}
}
ConvertFactory factory = ConvertFactory.findConvert(messaged.convertType()).getFactory();
factory.loadDecoder(messageType);
return newMethodName;
}
@Override
public void doInstance(ResourceFactory resourceFactory, Object service) {
}
}

View File

@@ -9,10 +9,13 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import org.redkale.boot.Application;
import org.redkale.boot.ClassFilter;
@@ -45,6 +48,9 @@ public class MessageModuleEngine extends ModuleEngine {
//@since 2.8.0
private Properties messageProperties = new Properties();
//
private final Map<String, List<MessageConsumer>> agentConsumers = new ConcurrentHashMap<>();
//MQ管理接口
//@since 2.1.0
private MessageAgent[] messageAgents;
@@ -352,7 +358,7 @@ public class MessageModuleEngine extends ModuleEngine {
}, Object.class);
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
List<MessageConsumer> consumers = new ArrayList<>();
List<MessageConsumer> consumers = agentConsumers.getOrDefault(agent.getName(), new CopyOnWriteArrayList<>());
AnyValue consumerConf = agent.getConfig().getAnyValue("consumer");
if (consumerConf != null) { //加载 MessageConsumer
ClassFilter filter = new ClassFilter(application.getServerClassLoader(), ResourceConsumer.class, MessageConsumer.class, null, null);

View File

@@ -5,9 +5,6 @@
*/
package org.redkale.mq.spi;
import org.redkale.mq.spi.HttpResultCoder;
import org.redkale.mq.spi.MessageCoder;
import org.redkale.mq.spi.HttpSimpleRequestCoder;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;