diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 2b3567c98..6b5a24304 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -7,6 +7,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import java.util.stream.Collectors; import javax.annotation.Resource; @@ -47,6 +48,8 @@ public abstract class MessageAgent { protected final Object sncpProducerLock = new Object(); + protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); + protected HttpMessageClient httpMessageClient; protected SncpMessageClient sncpMessageClient; diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index d7c3076f8..020f9c3b8 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -29,6 +29,8 @@ public abstract class MessageClient { protected final MessageAgent messageAgent; + protected final AtomicLong msgSeqno; + protected MessageConsumer respConsumer; protected String respTopic; @@ -45,6 +47,7 @@ public abstract class MessageClient { protected MessageClient(MessageAgent messageAgent) { this.messageAgent = messageAgent; + this.msgSeqno = messageAgent.msgSeqno; this.finest = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINEST); this.finer = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINER); this.fine = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINE); @@ -114,35 +117,35 @@ public abstract class MessageClient { protected abstract MessageProducers getProducer(); public MessageRecord createMessageRecord(String resptopic, String content) { - return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String content) { - return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { - return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { - return new MessageRecord(System.nanoTime(), (byte) 0, topic, resptopic, content); + return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, content); } public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { @@ -150,7 +153,7 @@ public abstract class MessageClient { } protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(System.nanoTime(), ctype, topic, resptopic, content); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, content); } protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {