From 45f2ce261eb525cac3fa6100efc7810c82e8dc61 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 4 Jun 2020 09:29:04 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageAgent.java | 11 ++++------- src/org/redkale/mq/MessageConsumer.java | 7 ++++--- src/org/redkale/mq/MessageProducer.java | 6 +++--- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index dfa3c25e5..0edd63172 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -62,8 +62,7 @@ public abstract class MessageAgent { if (this.sncpRespStartms >= 0) return; long s = System.currentTimeMillis(); if (this.sncpRespConsumer != null) { - this.sncpRespConsumer.start(); - this.sncpRespConsumer.waitFor(); + this.sncpRespConsumer.startup().join(); } this.sncpRespStartms = System.currentTimeMillis() - s; } @@ -78,8 +77,7 @@ public abstract class MessageAgent { } this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); - node.consumer.start(); - node.consumer.waitFor(); + node.consumer.startup().join(); sb.append("MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); }); return CompletableFuture.completedFuture(null); @@ -87,7 +85,7 @@ public abstract class MessageAgent { public CompletableFuture stop() { this.messageNodes.values().forEach(node -> { - node.consumer.close(); + node.consumer.shutdown().join(); }); return CompletableFuture.completedFuture(null); } @@ -122,8 +120,7 @@ public abstract class MessageAgent { public synchronized MessageProducer getProducer() { if (this.producer == null) { this.producer = createProducer(); - this.producer.start(); - this.producer.waitFor(); + this.producer.startup().join(); } return this.producer; } diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index 6b6adfa54..de2bc6da0 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -6,6 +6,7 @@ package org.redkale.mq; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.logging.Logger; /** @@ -18,7 +19,7 @@ import java.util.logging.Logger; * * @since 2.1.0 */ -public abstract class MessageConsumer extends Thread { +public abstract class MessageConsumer { protected final String topic; @@ -47,11 +48,11 @@ public abstract class MessageConsumer extends Thread { return topic; } - public abstract void waitFor(); + public abstract CompletableFuture startup(); protected boolean isClosed() { return closed; } - protected abstract void close(); + protected abstract CompletableFuture shutdown(); } diff --git a/src/org/redkale/mq/MessageProducer.java b/src/org/redkale/mq/MessageProducer.java index c1024752b..95e7ec00f 100644 --- a/src/org/redkale/mq/MessageProducer.java +++ b/src/org/redkale/mq/MessageProducer.java @@ -17,7 +17,7 @@ import java.util.logging.Logger; * * @since 2.1.0 */ -public abstract class MessageProducer extends Thread { +public abstract class MessageProducer { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); @@ -25,11 +25,11 @@ public abstract class MessageProducer extends Thread { public abstract CompletableFuture apply(MessageRecord message); - protected abstract void waitFor(); + public abstract CompletableFuture startup(); protected boolean isClosed() { return closed; } - protected abstract void close(); + protected abstract CompletableFuture shutdown(); }