This commit is contained in:
@@ -7,6 +7,7 @@ package org.redkale.mq;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Resource;
|
||||
@@ -47,6 +48,8 @@ public abstract class MessageAgent {
|
||||
|
||||
protected final Object sncpProducerLock = new Object();
|
||||
|
||||
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
|
||||
|
||||
protected HttpMessageClient httpMessageClient;
|
||||
|
||||
protected SncpMessageClient sncpMessageClient;
|
||||
|
||||
@@ -29,6 +29,8 @@ public abstract class MessageClient {
|
||||
|
||||
protected final MessageAgent messageAgent;
|
||||
|
||||
protected final AtomicLong msgSeqno;
|
||||
|
||||
protected MessageConsumer respConsumer;
|
||||
|
||||
protected String respTopic;
|
||||
@@ -45,6 +47,7 @@ public abstract class MessageClient {
|
||||
|
||||
protected MessageClient(MessageAgent messageAgent) {
|
||||
this.messageAgent = messageAgent;
|
||||
this.msgSeqno = messageAgent.msgSeqno;
|
||||
this.finest = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINEST);
|
||||
this.finer = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINER);
|
||||
this.fine = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINE);
|
||||
@@ -114,35 +117,35 @@ public abstract class MessageClient {
|
||||
protected abstract MessageProducers getProducer();
|
||||
|
||||
public MessageRecord createMessageRecord(String resptopic, String content) {
|
||||
return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
|
||||
return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
|
||||
return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {
|
||||
return new MessageRecord(System.nanoTime(), (byte) 0, topic, resptopic, content);
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) {
|
||||
@@ -150,7 +153,7 @@ public abstract class MessageClient {
|
||||
}
|
||||
|
||||
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) {
|
||||
return new MessageRecord(System.nanoTime(), ctype, topic, resptopic, content);
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, content);
|
||||
}
|
||||
|
||||
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
|
||||
|
||||
Reference in New Issue
Block a user