This commit is contained in:
Redkale
2020-05-30 09:24:40 +08:00
parent d4c2723759
commit 197c58ef98
3 changed files with 52 additions and 6 deletions

View File

@@ -41,6 +41,8 @@ public abstract class MessageConsumer extends Thread {
return topic; return topic;
} }
public abstract void waitFor();
public boolean isClosed() { public boolean isClosed() {
return closed; return closed;
} }

View File

@@ -23,6 +23,8 @@ public abstract class MessageProducer extends Thread {
public abstract CompletableFuture apply(MessageRecord message); public abstract CompletableFuture apply(MessageRecord message);
public abstract void waitFor();
public boolean isClosed() { public boolean isClosed() {
return closed; return closed;
} }

View File

@@ -26,30 +26,38 @@ public class MessageRecord implements Serializable {
protected long seqid; protected long seqid;
@ConvertColumn(index = 2) @ConvertColumn(index = 2)
@Comment("版本")
protected int version;
@ConvertColumn(index = 3)
@Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null, 普通文本也归于JSON") @Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null, 普通文本也归于JSON")
protected ConvertType format; protected ConvertType format;
@ConvertColumn(index = 3) @ConvertColumn(index = 4)
@Comment("标记位, 自定义时使用") @Comment("标记位, 自定义时使用")
protected int flag; protected int flag;
@ConvertColumn(index = 4) @ConvertColumn(index = 5)
@Comment("创建时间")
protected long createtime;
@ConvertColumn(index = 6)
@Comment("用户ID无用户信息视为0") @Comment("用户ID无用户信息视为0")
protected int userid; protected int userid;
@ConvertColumn(index = 5) @ConvertColumn(index = 7)
@Comment("组ID") @Comment("组ID")
protected String groupid; protected String groupid;
@ConvertColumn(index = 6) @ConvertColumn(index = 8)
@Comment("当前topic") @Comment("当前topic")
protected String topic; protected String topic;
@ConvertColumn(index = 7) @ConvertColumn(index = 9)
@Comment("目标topic, 为空表示无目标topic") @Comment("目标topic, 为空表示无目标topic")
protected String resptopic; protected String resptopic;
@ConvertColumn(index = 8) @ConvertColumn(index = 10)
@Comment("消息内容") @Comment("消息内容")
protected byte[] content; protected byte[] content;
@@ -97,9 +105,15 @@ public class MessageRecord implements Serializable {
} }
public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
this(seqid, 1, format, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content);
}
public MessageRecord(long seqid, int version, ConvertType format, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) {
this.seqid = seqid; this.seqid = seqid;
this.version = version;
this.format = format; this.format = format;
this.flag = flag; this.flag = flag;
this.createtime = createtime;
this.userid = userid; this.userid = userid;
this.groupid = groupid; this.groupid = groupid;
this.topic = topic; this.topic = topic;
@@ -121,6 +135,11 @@ public class MessageRecord implements Serializable {
return this.resptopic == null || this.resptopic.isEmpty(); return this.resptopic == null || this.resptopic.isEmpty();
} }
public MessageRecord version(int version) {
this.version = version;
return this;
}
public MessageRecord format(ConvertType format) { public MessageRecord format(ConvertType format) {
this.format = format; this.format = format;
return this; return this;
@@ -131,6 +150,11 @@ public class MessageRecord implements Serializable {
return this; return this;
} }
public MessageRecord createtime(long createtime) {
this.createtime = createtime;
return this;
}
public MessageRecord userid(int userid) { public MessageRecord userid(int userid) {
this.userid = userid; this.userid = userid;
return this; return this;
@@ -169,6 +193,14 @@ public class MessageRecord implements Serializable {
this.seqid = seqid; this.seqid = seqid;
} }
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public ConvertType getFormat() { public ConvertType getFormat() {
return format; return format;
} }
@@ -185,6 +217,14 @@ public class MessageRecord implements Serializable {
this.flag = flag; this.flag = flag;
} }
public long getCreatetime() {
return createtime;
}
public void setCreatetime(long createtime) {
this.createtime = createtime;
}
public int getUserid() { public int getUserid() {
return userid; return userid;
} }
@@ -230,8 +270,10 @@ public class MessageRecord implements Serializable {
//return JsonConvert.root().convertTo(this); //return JsonConvert.root().convertTo(this);
StringBuilder sb = new StringBuilder(128); StringBuilder sb = new StringBuilder(128);
sb.append("{\"seqid\":").append(this.seqid); sb.append("{\"seqid\":").append(this.seqid);
sb.append(",\"version\":").append(this.version);
if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\""); if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\"");
if (this.flag != 0) sb.append(",\"flag\":").append(this.flag); if (this.flag != 0) sb.append(",\"flag\":").append(this.flag);
if (this.createtime != 0) sb.append(",\"createtime\":").append(this.createtime);
if (this.userid != 0) sb.append(",\"userid\":").append(this.userid); if (this.userid != 0) sb.append(",\"userid\":").append(this.userid);
if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\""); if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\"");
if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\""); if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\"");