From 6401c74f10f7dd1deaf0a822efbfb259e16714e3 Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 9 Nov 2024 18:58:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0MessageProducer.sendDelayMess?= =?UTF-8?q?age=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/mq/MessageProducer.java | 30 +++++++++++++++++-- .../org/redkale/mq/spi/HttpResultCoder.java | 3 +- .../java/org/redkale/mq/spi/MessageAgent.java | 10 ++++++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/redkale/mq/MessageProducer.java b/src/main/java/org/redkale/mq/MessageProducer.java index fd6b13b31..8a7148eac 100644 --- a/src/main/java/org/redkale/mq/MessageProducer.java +++ b/src/main/java/org/redkale/mq/MessageProducer.java @@ -4,6 +4,7 @@ package org.redkale.mq; import java.lang.reflect.Type; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.redkale.convert.Convert; @@ -55,10 +56,35 @@ public interface MessageProducer { } default CompletableFuture sendMessage(String topic, Convert convert, Object value) { - return sendMessage(topic, (Integer) null, convert, value); + return sendMessage(topic, (Integer) null, convert, (Type) null, value); } default CompletableFuture sendMessage(String topic, Object value) { - return sendMessage(topic, (Integer) null, value); + return sendMessage(topic, (Integer) null, (Convert) null, (Type) null, value); + } + + public CompletableFuture sendDelayMessage( + String topic, Integer partition, Duration delay, Convert convert, Type type, Object value); + + default CompletableFuture sendDelayMessage( + String topic, Integer partition, Duration delay, Convert convert, Object value) { + return sendDelayMessage(topic, partition, delay, convert, (Type) null, value); + } + + default CompletableFuture sendDelayMessage(String topic, Integer partition, Duration delay, Object value) { + return sendDelayMessage(topic, partition, delay, (Convert) null, (Type) null, value); + } + + default CompletableFuture sendDelayMessage( + String topic, Duration delay, Convert convert, Type type, Object value) { + return sendDelayMessage(topic, (Integer) null, delay, convert, type, value); + } + + default CompletableFuture sendDelayMessage(String topic, Duration delay, Convert convert, Object value) { + return sendDelayMessage(topic, (Integer) null, delay, convert, (Type) null, value); + } + + default CompletableFuture sendDelayMessage(String topic, Duration delay, Object value) { + return sendDelayMessage(topic, (Integer) null, delay, (Convert) null, (Type) null, value); } } diff --git a/src/main/java/org/redkale/mq/spi/HttpResultCoder.java b/src/main/java/org/redkale/mq/spi/HttpResultCoder.java index febb4825a..ac2dcb2f9 100644 --- a/src/main/java/org/redkale/mq/spi/HttpResultCoder.java +++ b/src/main/java/org/redkale/mq/spi/HttpResultCoder.java @@ -5,14 +5,13 @@ */ package org.redkale.mq.spi; -import static org.redkale.mq.spi.MessageCoder.*; - import java.net.HttpCookie; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import org.redkale.convert.Convert; import org.redkale.convert.json.JsonConvert; +import static org.redkale.mq.spi.MessageCoder.*; import org.redkale.net.http.HttpResult; import org.redkale.util.Utility; diff --git a/src/main/java/org/redkale/mq/spi/MessageAgent.java b/src/main/java/org/redkale/mq/spi/MessageAgent.java index 1f13be361..a09eae165 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAgent.java +++ b/src/main/java/org/redkale/mq/spi/MessageAgent.java @@ -7,6 +7,7 @@ package org.redkale.mq.spi; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -307,7 +308,7 @@ public abstract class MessageAgent implements MessageManager { if (topicstr.length() > topicMax.get()) { topicMax.set(topicstr.length()); } - views.put(typestr, new String[]{group, topicstr}); + views.put(typestr, new String[] {group, topicstr}); } views.forEach((typestr, strs) -> { String groupstr = strs[0]; @@ -636,5 +637,12 @@ public abstract class MessageAgent implements MessageManager { String topic, Integer partition, Convert convert0, Type type, Object value) { return producer.sendMessage(topic, partition, convert0 == null ? this.convert : convert0, type, value); } + + @Override + public CompletableFuture sendDelayMessage( + String topic, Integer partition, Duration delay, Convert convert0, Type type, Object value) { + return producer.sendDelayMessage( + topic, partition, delay, convert0 == null ? this.convert : convert0, type, value); + } } }