From 197c58ef98afd9827909cb8d7252b35de7d732a4 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 30 May 2020 09:24:40 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageConsumer.java | 2 + src/org/redkale/mq/MessageProducer.java | 2 + src/org/redkale/mq/MessageRecord.java | 54 ++++++++++++++++++++++--- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index 565bd3cac..6b49d2cd2 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -41,6 +41,8 @@ public abstract class MessageConsumer extends Thread { return topic; } + public abstract void waitFor(); + public boolean isClosed() { return closed; } diff --git a/src/org/redkale/mq/MessageProducer.java b/src/org/redkale/mq/MessageProducer.java index cce361f09..7b66bdff8 100644 --- a/src/org/redkale/mq/MessageProducer.java +++ b/src/org/redkale/mq/MessageProducer.java @@ -23,6 +23,8 @@ public abstract class MessageProducer extends Thread { public abstract CompletableFuture apply(MessageRecord message); + public abstract void waitFor(); + public boolean isClosed() { return closed; } diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index 737742cd7..e6ed72ad0 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -26,30 +26,38 @@ public class MessageRecord implements Serializable { protected long seqid; @ConvertColumn(index = 2) + @Comment("版本") + protected int version; + + @ConvertColumn(index = 3) @Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null, 普通文本也归于JSON") protected ConvertType format; - @ConvertColumn(index = 3) + @ConvertColumn(index = 4) @Comment("标记位, 自定义时使用") protected int flag; - @ConvertColumn(index = 4) + @ConvertColumn(index = 5) + @Comment("创建时间") + protected long createtime; + + @ConvertColumn(index = 6) @Comment("用户ID,无用户信息视为0") protected int userid; - @ConvertColumn(index = 5) + @ConvertColumn(index = 7) @Comment("组ID") protected String groupid; - @ConvertColumn(index = 6) + @ConvertColumn(index = 8) @Comment("当前topic") protected String topic; - @ConvertColumn(index = 7) + @ConvertColumn(index = 9) @Comment("目标topic, 为空表示无目标topic") protected String resptopic; - @ConvertColumn(index = 8) + @ConvertColumn(index = 10) @Comment("消息内容") protected byte[] content; @@ -97,9 +105,15 @@ public class MessageRecord implements Serializable { } public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { + this(seqid, 1, format, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); + } + + public MessageRecord(long seqid, int version, ConvertType format, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { this.seqid = seqid; + this.version = version; this.format = format; this.flag = flag; + this.createtime = createtime; this.userid = userid; this.groupid = groupid; this.topic = topic; @@ -121,6 +135,11 @@ public class MessageRecord implements Serializable { return this.resptopic == null || this.resptopic.isEmpty(); } + public MessageRecord version(int version) { + this.version = version; + return this; + } + public MessageRecord format(ConvertType format) { this.format = format; return this; @@ -131,6 +150,11 @@ public class MessageRecord implements Serializable { return this; } + public MessageRecord createtime(long createtime) { + this.createtime = createtime; + return this; + } + public MessageRecord userid(int userid) { this.userid = userid; return this; @@ -169,6 +193,14 @@ public class MessageRecord implements Serializable { this.seqid = seqid; } + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + public ConvertType getFormat() { return format; } @@ -185,6 +217,14 @@ public class MessageRecord implements Serializable { this.flag = flag; } + public long getCreatetime() { + return createtime; + } + + public void setCreatetime(long createtime) { + this.createtime = createtime; + } + public int getUserid() { return userid; } @@ -230,8 +270,10 @@ public class MessageRecord implements Serializable { //return JsonConvert.root().convertTo(this); StringBuilder sb = new StringBuilder(128); sb.append("{\"seqid\":").append(this.seqid); + sb.append(",\"version\":").append(this.version); if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\""); if (this.flag != 0) sb.append(",\"flag\":").append(this.flag); + if (this.createtime != 0) sb.append(",\"createtime\":").append(this.createtime); if (this.userid != 0) sb.append(",\"userid\":").append(this.userid); if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\""); if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\"");