新增MessageConext

This commit is contained in:
redkale
2023-09-20 19:57:42 +08:00
parent 2939556800
commit eab480f0be
8 changed files with 152 additions and 87 deletions

View File

@@ -159,6 +159,76 @@ public abstract class MessageAgent implements Resourcable {
}
}
public MessageConext createMessageConext(String topic, Integer partition) {
return new MessageConext(topic, partition);
}
public MessageProducer loadMessageProducer(ResourceProducer ann) {
MessageProducer baseProducer = this.baseMessageProducer;
if (this.baseMessageProducer == null) {
messageProducerLock.lock();
try {
if (this.baseMessageProducer == null) {
this.baseMessageProducer = createMessageProducer();
}
} finally {
messageProducerLock.unlock();
}
baseProducer = this.baseMessageProducer;
}
MessageProducer producer = baseProducer;
Objects.requireNonNull(producer);
return messageProducers.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t)));
}
public void loadMessageConsumer(MessageConsumer consumer) {
}
@Override
public String resourceName() {
return name;
}
public Logger getLogger() {
return logger;
}
public String getName() {
return name;
}
public AnyValue getConfig() {
return config;
}
public void setConfig(AnyValue config) {
this.config = config;
}
public HttpMessageClient getHttpMessageClient() {
return httpMessageClient;
}
public SncpMessageClient getSncpMessageClient() {
return sncpMessageClient;
}
protected String checkName(String name) { //不能含特殊字符
if (name.isEmpty()) {
return name;
}
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') {
throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9");
}
for (char ch : name.toCharArray()) {
if (!((ch >= '0' && ch <= '9') || ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符
throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9");
}
}
return name;
}
@Deprecated
protected List<MessageClientConsumer> getMessageClientConsumers() {
List<MessageClientConsumer> consumers = new ArrayList<>();
@@ -194,73 +264,11 @@ public abstract class MessageAgent implements Resourcable {
return producers;
}
@Override
public String resourceName() {
return name;
}
@Deprecated
public MessageCoder<MessageRecord> getMessageCoder() {
return this.messageCoder;
}
public Logger getLogger() {
return logger;
}
public String getName() {
return name;
}
public AnyValue getConfig() {
return config;
}
public void setConfig(AnyValue config) {
this.config = config;
}
public MessageProducer loadMessageProducer(ResourceProducer ann) {
MessageProducer baseProducer = this.baseMessageProducer;
if (this.baseMessageProducer == null) {
messageProducerLock.lock();
try {
if (this.baseMessageProducer == null) {
this.baseMessageProducer = createMessageProducer();
}
} finally {
messageProducerLock.unlock();
}
baseProducer = this.baseMessageProducer;
}
MessageProducer producer = baseProducer;
Objects.requireNonNull(producer);
return messageProducers.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t)));
}
public HttpMessageClient getHttpMessageClient() {
return httpMessageClient;
}
public SncpMessageClient getSncpMessageClient() {
return sncpMessageClient;
}
protected String checkName(String name) { //不能含特殊字符
if (name.isEmpty()) {
return name;
}
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') {
throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9");
}
for (char ch : name.toCharArray()) {
if (!((ch >= '0' && ch <= '9') || ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符
throw new RedkaleException("name only 0-9 a-z A-Z _ cannot begin 0-9");
}
}
return name;
}
@Deprecated
//获取指定topic的生产处理器
public MessageClientProducer getSncpMessageClientProducer() {
@@ -302,15 +310,14 @@ public abstract class MessageAgent implements Resourcable {
return this.httpProducer;
}
@Deprecated
//创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer(String producerName);
//
protected abstract MessageProducer createMessageProducer();
protected abstract void closeMessageProducer(MessageProducer messageProducer) throws Exception;
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
//
public abstract boolean createTopic(String... topics);
@@ -323,12 +330,13 @@ public abstract class MessageAgent implements Resourcable {
//ServiceLoader时判断配置是否符合当前实现类
public abstract boolean acceptsConf(AnyValue config);
@Deprecated
//创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer(String producerName);
//创建指定topic的消费处理器
public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor);
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public void addMessageConsumer(ResourceConsumer res, MessageConsumer consumer) {
consumerLock.lock();
try {

View File

@@ -0,0 +1,64 @@
/*
*
*/
package org.redkale.mq;
import java.util.Objects;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
* MessageConsumer回调的上下文
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class MessageConext {
@ConvertColumn(index = 1)
protected String topic;
@ConvertColumn(index = 2)
protected Integer partition;
protected MessageConext(String topic, Integer partition) {
this.topic = topic;
this.partition = partition;
}
public String getTopic() {
return topic;
}
public Integer getPartition() {
return partition;
}
@Override
public int hashCode() {
return Objects.hash(this.topic, this.partition);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final MessageConext other = (MessageConext) obj;
return Objects.equals(this.topic, other.topic)
&& Objects.equals(this.partition, other.partition);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -8,7 +8,7 @@ import org.redkale.service.Local;
import org.redkale.util.AnyValue;
/**
* MQ资源注解
* MQ消费器, 实现类必须标记{@link org.redkale.mq.ResourceConsumer}
*
* <p>
* 详情见: https://redkale.org
@@ -25,8 +25,9 @@ public interface MessageConsumer<T> {
default void init(AnyValue config) {
}
public void onMessage(String topic, Integer partition, T[] messages);
public void onMessage(MessageConext context, T[] messages);
default void destroy(AnyValue config) {
}
}

View File

@@ -3,13 +3,13 @@
*/
package org.redkale.mq;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
/**
* MQ资源注解, 只能标记在MessageConsumer子类上
* MQ资源注解, 只能标记在{@link org.redkale.mq.MessageConsumer}子类上
*
* <p>
* 详情见: https://redkale.org

View File

@@ -143,7 +143,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
respFuture.complete(rs);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO() && false) {
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
} else {

View File

@@ -25,15 +25,4 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Retention(RUNTIME)
public @interface Local {
/**
* 标记全局唯一性
* <p>
* 有些Service可能只能启动一个实例 比如凌晨定时清除一些数据的Service, 在整个系统部署中应该只被部署一次
*
* @since 2.1.0
* @return boolean
*/
//boolean unique() default false;
String comment() default ""; //备注描述
}

View File

@@ -39,7 +39,9 @@ public class RetResult<T> implements Serializable {
public static final Type TYPE_RET_STRING = new TypeToken<RetResult<String>>() {
}.getType();
//success index = 1
//@ConvertColumn(index = 1)
//success
//
@ConvertColumn(index = 2)
@Column(nullable = false)
protected int retcode;

View File

@@ -5,9 +5,9 @@
*/
package org.redkale.util;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
/**
* 被标记的日志级别以上的才会被记录
@@ -15,6 +15,7 @@ import java.lang.annotation.*;
* <p>
* 详情见: https://redkale.org
* @see org.redkale.annotation.LogLevel
* @deprecated
*
* @author zhangjx
*/