From 71d179b8c34cc7b7a92b1cada02d560b3b4bcec2 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 12 Jan 2021 17:53:13 +0800 Subject: [PATCH] --- src/org/redkale/mq/HttpMessageClient.java | 7 ++-- src/org/redkale/mq/HttpMessageResponse.java | 3 +- src/org/redkale/mq/MessageClient.java | 41 ++++++++++++++++----- src/org/redkale/mq/MessageRecord.java | 31 +++++++++++++--- src/org/redkale/mq/MessageRecordCoder.java | 6 ++- 5 files changed, 67 insertions(+), 21 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index b3e58ad2e..3647a7c0d 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.*; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; +import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST; /** * 不依赖MessageRecord则可兼容RPC方式 @@ -172,20 +173,20 @@ public class HttpMessageClient extends MessageClient { } public CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index 434e99b37..7566aa090 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -13,6 +13,7 @@ import org.redkale.net.Response; import org.redkale.net.http.*; import org.redkale.service.RetResult; import org.redkale.util.ObjectPool; +import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT; /** * @@ -73,7 +74,7 @@ public class HttpMessageResponse extends HttpResponse { producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); } byte[] content = HttpResultCoder.getInstance().encode(result); - producer.apply(messageClient.createMessageRecord(msg.getSeqid(), resptopic, null, content)); + producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content)); } @Override diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 1328c07dd..d7c3076f8 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -10,6 +10,9 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import org.redkale.convert.Convert; +import org.redkale.convert.json.JsonConvert; +import static org.redkale.mq.MessageRecord.*; +import org.redkale.net.http.*; /** * @@ -111,38 +114,58 @@ public abstract class MessageClient { protected abstract MessageProducers getProducer(); public MessageRecord createMessageRecord(String resptopic, String content) { - return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String content) { - return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { - return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(System.nanoTime(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { - return new MessageRecord(System.nanoTime(), topic, resptopic, content); + return new MessageRecord(System.nanoTime(), (byte) 0, topic, resptopic, content); } public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, topic, resptopic, content); + return new MessageRecord(seqid, (byte) 0, topic, resptopic, content); + } + + protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { + return new MessageRecord(System.nanoTime(), ctype, topic, resptopic, content); + } + + protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { + return new MessageRecord(seqid, ctype, topic, resptopic, content); + } + + private byte ctype(Convert convert, Object bean) { + byte ctype = 0; + if (convert instanceof JsonConvert) { + if (bean instanceof HttpSimpleRequest) { + ctype = CTYPE_HTTP_REQUEST; + } else if (bean instanceof HttpResult) { + ctype = CTYPE_HTTP_RESULT; + } + } + return ctype; } } diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index 69c4d97db..c82b8d681 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -26,6 +26,12 @@ public class MessageRecord implements Serializable { static final byte[] EMPTY_BYTES = new byte[0]; + protected static final byte CTYPE_STRING = 1; + + protected static final byte CTYPE_HTTP_REQUEST = 2; + + protected static final byte CTYPE_HTTP_RESULT = 3; + @ConvertColumn(index = 1) @Comment("消息序列号") protected long seqid; @@ -62,19 +68,24 @@ public class MessageRecord implements Serializable { @Comment("消息内容") protected byte[] content; + @ConvertColumn(index = 10) + @Comment("消息内容的类型") + protected byte ctype; + public MessageRecord() { } - protected MessageRecord(long seqid, String topic, String resptopic, byte[] content) { - this(seqid, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content); + protected MessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { + this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content); } - protected MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { - this(seqid, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); + protected MessageRecord(long seqid, byte ctype, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { + this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); } - protected MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { + protected MessageRecord(long seqid, byte ctype, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { this.seqid = seqid; + this.ctype = ctype; this.version = version; this.flag = flag; this.createtime = createtime; @@ -253,7 +264,15 @@ public class MessageRecord implements Serializable { if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\""); if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\""); if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\""); - if (this.content != null) sb.append(",\"content\":").append(new String(this.content, StandardCharsets.UTF_8)).append("\""); + if (this.content != null) { + if (this.ctype == CTYPE_HTTP_REQUEST) { + sb.append(",\"content\":").append(HttpSimpleRequestCoder.getInstance().decode(this.content)).append("\""); + } else if (this.ctype == CTYPE_HTTP_RESULT) { + sb.append(",\"content\":").append(HttpResultCoder.getInstance().decode(this.content)).append("\""); + } else { + sb.append(",\"content\":").append(new String(this.content, StandardCharsets.UTF_8)).append("\""); + } + } sb.append("}"); return sb.toString(); } diff --git a/src/org/redkale/mq/MessageRecordCoder.java b/src/org/redkale/mq/MessageRecordCoder.java index a2a1b93ae..c3dcb9e0c 100644 --- a/src/org/redkale/mq/MessageRecordCoder.java +++ b/src/org/redkale/mq/MessageRecordCoder.java @@ -31,10 +31,11 @@ public class MessageRecordCoder implements MessageCoder { byte[] stopics = MessageCoder.getBytes(data.getTopic()); byte[] dtopics = MessageCoder.getBytes(data.getResptopic()); byte[] groupid = MessageCoder.getBytes(data.getGroupid()); - int count = 8 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); + int count = 8 + 1 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); final byte[] bs = new byte[count]; ByteBuffer buffer = ByteBuffer.wrap(bs); buffer.putLong(data.getSeqid()); + buffer.put(data.ctype); buffer.putInt(data.getVersion()); buffer.putInt(data.getFlag()); buffer.putLong(data.getCreatetime()); @@ -59,6 +60,7 @@ public class MessageRecordCoder implements MessageCoder { if (data == null) return null; ByteBuffer buffer = ByteBuffer.wrap(data); long seqid = buffer.getLong(); + byte ctype = buffer.get(); int version = buffer.getInt(); int flag = buffer.getInt(); long createtime = buffer.getLong(); @@ -74,7 +76,7 @@ public class MessageRecordCoder implements MessageCoder { content = new byte[contentlen]; buffer.get(content); } - return new MessageRecord(seqid, version, flag, createtime, userid, groupid, topic, resptopic, content); + return new MessageRecord(seqid, ctype, version, flag, createtime, userid, groupid, topic, resptopic, content); } }