From eab480f0be0018932ce43411605ad5f71e6438cd Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 20 Sep 2023 19:57:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EMessageConext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/mq/MessageAgent.java | 146 +++++++++--------- .../java/org/redkale/mq/MessageConext.java | 64 ++++++++ .../java/org/redkale/mq/MessageConsumer.java | 5 +- .../java/org/redkale/mq/ResourceConsumer.java | 4 +- .../org/redkale/net/client/ClientCodec.java | 2 +- src/main/java/org/redkale/service/Local.java | 11 -- .../java/org/redkale/service/RetResult.java | 4 +- src/main/java/org/redkale/util/LogLevel.java | 3 +- 8 files changed, 152 insertions(+), 87 deletions(-) create mode 100644 src/main/java/org/redkale/mq/MessageConext.java diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 1575222c7..5de5bdab1 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -159,6 +159,76 @@ public abstract class MessageAgent implements Resourcable { } } + public MessageConext createMessageConext(String topic, Integer partition) { + return new MessageConext(topic, partition); + } + + public MessageProducer loadMessageProducer(ResourceProducer ann) { + MessageProducer baseProducer = this.baseMessageProducer; + if (this.baseMessageProducer == null) { + messageProducerLock.lock(); + try { + if (this.baseMessageProducer == null) { + this.baseMessageProducer = createMessageProducer(); + } + } finally { + messageProducerLock.unlock(); + } + baseProducer = this.baseMessageProducer; + } + MessageProducer producer = baseProducer; + Objects.requireNonNull(producer); + return messageProducers.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t))); + } + + public void loadMessageConsumer(MessageConsumer consumer) { + + } + + @Override + public String resourceName() { + return name; + } + + public Logger getLogger() { + return logger; + } + + public String getName() { + return name; + } + + public AnyValue getConfig() { + return config; + } + + public void setConfig(AnyValue config) { + this.config = config; + } + + public HttpMessageClient getHttpMessageClient() { + return httpMessageClient; + } + + public SncpMessageClient getSncpMessageClient() { + return sncpMessageClient; + } + + protected String checkName(String name) { //不能含特殊字符 + if (name.isEmpty()) { + return name; + } + if (name.charAt(0) >= '0' && name.charAt(0) <= '9') { + throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9"); + } + for (char ch : name.toCharArray()) { + if (!((ch >= '0' && ch <= '9') || ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符 + throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9"); + } + } + return name; + } + @Deprecated protected List getMessageClientConsumers() { List consumers = new ArrayList<>(); @@ -194,73 +264,11 @@ public abstract class MessageAgent implements Resourcable { return producers; } - @Override - public String resourceName() { - return name; - } - @Deprecated public MessageCoder getMessageCoder() { return this.messageCoder; } - public Logger getLogger() { - return logger; - } - - public String getName() { - return name; - } - - public AnyValue getConfig() { - return config; - } - - public void setConfig(AnyValue config) { - this.config = config; - } - - public MessageProducer loadMessageProducer(ResourceProducer ann) { - MessageProducer baseProducer = this.baseMessageProducer; - if (this.baseMessageProducer == null) { - messageProducerLock.lock(); - try { - if (this.baseMessageProducer == null) { - this.baseMessageProducer = createMessageProducer(); - } - } finally { - messageProducerLock.unlock(); - } - baseProducer = this.baseMessageProducer; - } - MessageProducer producer = baseProducer; - Objects.requireNonNull(producer); - return messageProducers.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t))); - } - - public HttpMessageClient getHttpMessageClient() { - return httpMessageClient; - } - - public SncpMessageClient getSncpMessageClient() { - return sncpMessageClient; - } - - protected String checkName(String name) { //不能含特殊字符 - if (name.isEmpty()) { - return name; - } - if (name.charAt(0) >= '0' && name.charAt(0) <= '9') { - throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9"); - } - for (char ch : name.toCharArray()) { - if (!((ch >= '0' && ch <= '9') || ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符 - throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9"); - } - } - return name; - } - @Deprecated //获取指定topic的生产处理器 public MessageClientProducer getSncpMessageClientProducer() { @@ -302,15 +310,14 @@ public abstract class MessageAgent implements Resourcable { return this.httpProducer; } - @Deprecated - //创建指定topic的生产处理器 - protected abstract MessageClientProducer createMessageClientProducer(String producerName); - // protected abstract MessageProducer createMessageProducer(); protected abstract void closeMessageProducer(MessageProducer messageProducer) throws Exception; + @ResourceListener + public abstract void onResourceChange(ResourceEvent[] events); + // public abstract boolean createTopic(String... topics); @@ -323,12 +330,13 @@ public abstract class MessageAgent implements Resourcable { //ServiceLoader时判断配置是否符合当前实现类 public abstract boolean acceptsConf(AnyValue config); + @Deprecated + //创建指定topic的生产处理器 + protected abstract MessageClientProducer createMessageClientProducer(String producerName); + //创建指定topic的消费处理器 public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor); - @ResourceListener - public abstract void onResourceChange(ResourceEvent[] events); - public void addMessageConsumer(ResourceConsumer res, MessageConsumer consumer) { consumerLock.lock(); try { diff --git a/src/main/java/org/redkale/mq/MessageConext.java b/src/main/java/org/redkale/mq/MessageConext.java new file mode 100644 index 000000000..ce90bf3b8 --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageConext.java @@ -0,0 +1,64 @@ +/* + * + */ +package org.redkale.mq; + +import java.util.Objects; +import org.redkale.convert.ConvertColumn; +import org.redkale.convert.json.JsonConvert; + +/** + * MessageConsumer回调的上下文 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public class MessageConext { + + @ConvertColumn(index = 1) + protected String topic; + + @ConvertColumn(index = 2) + protected Integer partition; + + protected MessageConext(String topic, Integer partition) { + this.topic = topic; + this.partition = partition; + } + + public String getTopic() { + return topic; + } + + public Integer getPartition() { + return partition; + } + + @Override + public int hashCode() { + return Objects.hash(this.topic, this.partition); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final MessageConext other = (MessageConext) obj; + return Objects.equals(this.topic, other.topic) + && Objects.equals(this.partition, other.partition); + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + +} diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index c006af927..c820a6f68 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -8,7 +8,7 @@ import org.redkale.service.Local; import org.redkale.util.AnyValue; /** - * MQ资源注解 + * MQ消费器, 实现类必须标记{@link org.redkale.mq.ResourceConsumer} * *

* 详情见: https://redkale.org @@ -25,8 +25,9 @@ public interface MessageConsumer { default void init(AnyValue config) { } - public void onMessage(String topic, Integer partition, T[] messages); + public void onMessage(MessageConext context, T[] messages); default void destroy(AnyValue config) { } + } diff --git a/src/main/java/org/redkale/mq/ResourceConsumer.java b/src/main/java/org/redkale/mq/ResourceConsumer.java index e702ca375..30030ce93 100644 --- a/src/main/java/org/redkale/mq/ResourceConsumer.java +++ b/src/main/java/org/redkale/mq/ResourceConsumer.java @@ -3,13 +3,13 @@ */ package org.redkale.mq; +import java.lang.annotation.*; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; import org.redkale.convert.ConvertType; /** - * MQ资源注解, 只能标记在MessageConsumer子类上 + * MQ资源注解, 只能标记在{@link org.redkale.mq.MessageConsumer}子类上 * *

* 详情见: https://redkale.org diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 5676991ae..59efe05a3 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -143,7 +143,7 @@ public abstract class ClientCodec implements Complet respFuture.complete(rs); }); } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO() && false) { + if (workThread.inIO()) { Traces.computeIfAbsent(request.traceid); respFuture.complete(rs); } else { diff --git a/src/main/java/org/redkale/service/Local.java b/src/main/java/org/redkale/service/Local.java index 67933ca40..7af93d92e 100644 --- a/src/main/java/org/redkale/service/Local.java +++ b/src/main/java/org/redkale/service/Local.java @@ -25,15 +25,4 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; @Retention(RUNTIME) public @interface Local { - /** - * 标记全局唯一性 - *

- * 有些Service可能只能启动一个实例, 比如凌晨定时清除一些数据的Service, 在整个系统部署中应该只被部署一次 - * - * @since 2.1.0 - * @return boolean - */ - //boolean unique() default false; - - String comment() default ""; //备注描述 } diff --git a/src/main/java/org/redkale/service/RetResult.java b/src/main/java/org/redkale/service/RetResult.java index 5b81df950..47c3c3f53 100644 --- a/src/main/java/org/redkale/service/RetResult.java +++ b/src/main/java/org/redkale/service/RetResult.java @@ -39,7 +39,9 @@ public class RetResult implements Serializable { public static final Type TYPE_RET_STRING = new TypeToken>() { }.getType(); - //success index = 1 + //@ConvertColumn(index = 1) + //success + // @ConvertColumn(index = 2) @Column(nullable = false) protected int retcode; diff --git a/src/main/java/org/redkale/util/LogLevel.java b/src/main/java/org/redkale/util/LogLevel.java index 4ef2d1a9d..25b3b30d1 100644 --- a/src/main/java/org/redkale/util/LogLevel.java +++ b/src/main/java/org/redkale/util/LogLevel.java @@ -5,9 +5,9 @@ */ package org.redkale.util; +import java.lang.annotation.*; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; /** * 被标记的日志级别以上的才会被记录 @@ -15,6 +15,7 @@ import java.lang.annotation.*; *

* 详情见: https://redkale.org * @see org.redkale.annotation.LogLevel + * @deprecated * * @author zhangjx */