This commit is contained in:
Redkale
2020-06-04 09:29:04 +08:00
parent f79e49db5a
commit 45f2ce261e
3 changed files with 11 additions and 13 deletions

View File

@@ -62,8 +62,7 @@ public abstract class MessageAgent {
if (this.sncpRespStartms >= 0) return; if (this.sncpRespStartms >= 0) return;
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
if (this.sncpRespConsumer != null) { if (this.sncpRespConsumer != null) {
this.sncpRespConsumer.start(); this.sncpRespConsumer.startup().join();
this.sncpRespConsumer.waitFor();
} }
this.sncpRespStartms = System.currentTimeMillis() - s; this.sncpRespStartms = System.currentTimeMillis() - s;
} }
@@ -78,8 +77,7 @@ public abstract class MessageAgent {
} }
this.messageNodes.values().forEach(node -> { this.messageNodes.values().forEach(node -> {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
node.consumer.start(); node.consumer.startup().join();
node.consumer.waitFor();
sb.append("MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); sb.append("MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n");
}); });
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@@ -87,7 +85,7 @@ public abstract class MessageAgent {
public CompletableFuture<Void> stop() { public CompletableFuture<Void> stop() {
this.messageNodes.values().forEach(node -> { this.messageNodes.values().forEach(node -> {
node.consumer.close(); node.consumer.shutdown().join();
}); });
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@@ -122,8 +120,7 @@ public abstract class MessageAgent {
public synchronized MessageProducer getProducer() { public synchronized MessageProducer getProducer() {
if (this.producer == null) { if (this.producer == null) {
this.producer = createProducer(); this.producer = createProducer();
this.producer.start(); this.producer.startup().join();
this.producer.waitFor();
} }
return this.producer; return this.producer;
} }

View File

@@ -6,6 +6,7 @@
package org.redkale.mq; package org.redkale.mq;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
@@ -18,7 +19,7 @@ import java.util.logging.Logger;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class MessageConsumer extends Thread { public abstract class MessageConsumer {
protected final String topic; protected final String topic;
@@ -47,11 +48,11 @@ public abstract class MessageConsumer extends Thread {
return topic; return topic;
} }
public abstract void waitFor(); public abstract CompletableFuture<Void> startup();
protected boolean isClosed() { protected boolean isClosed() {
return closed; return closed;
} }
protected abstract void close(); protected abstract CompletableFuture<Void> shutdown();
} }

View File

@@ -17,7 +17,7 @@ import java.util.logging.Logger;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class MessageProducer extends Thread { public abstract class MessageProducer {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -25,11 +25,11 @@ public abstract class MessageProducer extends Thread {
public abstract CompletableFuture<Void> apply(MessageRecord message); public abstract CompletableFuture<Void> apply(MessageRecord message);
protected abstract void waitFor(); public abstract CompletableFuture<Void> startup();
protected boolean isClosed() { protected boolean isClosed() {
return closed; return closed;
} }
protected abstract void close(); protected abstract CompletableFuture<Void> shutdown();
} }