增加MessageProducer.sendDelayMessage接口

This commit is contained in:
redkale
2024-11-09 18:58:06 +08:00
parent 65b4e66677
commit 6401c74f10
3 changed files with 38 additions and 5 deletions

View File

@@ -4,6 +4,7 @@
package org.redkale.mq;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.redkale.convert.Convert;
@@ -55,10 +56,35 @@ public interface MessageProducer {
}
default CompletableFuture<Void> sendMessage(String topic, Convert convert, Object value) {
return sendMessage(topic, (Integer) null, convert, value);
return sendMessage(topic, (Integer) null, convert, (Type) null, value);
}
default CompletableFuture<Void> sendMessage(String topic, Object value) {
return sendMessage(topic, (Integer) null, value);
return sendMessage(topic, (Integer) null, (Convert) null, (Type) null, value);
}
public CompletableFuture<Void> sendDelayMessage(
String topic, Integer partition, Duration delay, Convert convert, Type type, Object value);
default CompletableFuture<Void> sendDelayMessage(
String topic, Integer partition, Duration delay, Convert convert, Object value) {
return sendDelayMessage(topic, partition, delay, convert, (Type) null, value);
}
default CompletableFuture<Void> sendDelayMessage(String topic, Integer partition, Duration delay, Object value) {
return sendDelayMessage(topic, partition, delay, (Convert) null, (Type) null, value);
}
default CompletableFuture<Void> sendDelayMessage(
String topic, Duration delay, Convert convert, Type type, Object value) {
return sendDelayMessage(topic, (Integer) null, delay, convert, type, value);
}
default CompletableFuture<Void> sendDelayMessage(String topic, Duration delay, Convert convert, Object value) {
return sendDelayMessage(topic, (Integer) null, delay, convert, (Type) null, value);
}
default CompletableFuture<Void> sendDelayMessage(String topic, Duration delay, Object value) {
return sendDelayMessage(topic, (Integer) null, delay, (Convert) null, (Type) null, value);
}
}

View File

@@ -5,14 +5,13 @@
*/
package org.redkale.mq.spi;
import static org.redkale.mq.spi.MessageCoder.*;
import java.net.HttpCookie;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import static org.redkale.mq.spi.MessageCoder.*;
import org.redkale.net.http.HttpResult;
import org.redkale.util.Utility;

View File

@@ -7,6 +7,7 @@ package org.redkale.mq.spi;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -307,7 +308,7 @@ public abstract class MessageAgent implements MessageManager {
if (topicstr.length() > topicMax.get()) {
topicMax.set(topicstr.length());
}
views.put(typestr, new String[]{group, topicstr});
views.put(typestr, new String[] {group, topicstr});
}
views.forEach((typestr, strs) -> {
String groupstr = strs[0];
@@ -636,5 +637,12 @@ public abstract class MessageAgent implements MessageManager {
String topic, Integer partition, Convert convert0, Type type, Object value) {
return producer.sendMessage(topic, partition, convert0 == null ? this.convert : convert0, type, value);
}
@Override
public CompletableFuture<Void> sendDelayMessage(
String topic, Integer partition, Duration delay, Convert convert0, Type type, Object value) {
return producer.sendDelayMessage(
topic, partition, delay, convert0 == null ? this.convert : convert0, type, value);
}
}
}