diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index cb3901599..ce035553e 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -31,6 +31,10 @@ public class HttpMessageResponse extends HttpResponse { super(context, request, responsePool, config); } + public HttpMessageResponse(HttpContext context, HttpSimpleRequest req, HttpResponseConfig config) { + super(context, new HttpMessageRequest(context, req), null, config); + } + public HttpMessageResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { this.message = message; this.resultConsumer = resultConsumer; diff --git a/src/org/redkale/mq/HttpSimpleRequestCoder.java b/src/org/redkale/mq/HttpSimpleRequestCoder.java new file mode 100644 index 000000000..d064fe303 --- /dev/null +++ b/src/org/redkale/mq/HttpSimpleRequestCoder.java @@ -0,0 +1,46 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.redkale.net.http.HttpSimpleRequest; + +/** + * HttpSimpleRequest的MessageCoder实现 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class HttpSimpleRequestCoder implements MessageCoder { + + private static final HttpSimpleRequestCoder instance = new HttpSimpleRequestCoder(); + + public static HttpSimpleRequestCoder getInstance() { + return instance; + } + + @Override + public byte[] encode(HttpSimpleRequest data) { + byte[] requestURI = data.getRequestURI() == null ? new byte[0] : data.getRequestURI().getBytes(StandardCharsets.UTF_8); + return null; + } + + @Override + public HttpSimpleRequest decode(byte[] data) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + protected static String getString(ByteBuffer buffer) { + int len = buffer.getInt(); + if (len == 0) return null; + byte[] bs = new byte[len]; + buffer.get(bs); + return new String(bs, StandardCharsets.UTF_8); + } +} diff --git a/src/org/redkale/mq/MessageCoder.java b/src/org/redkale/mq/MessageCoder.java new file mode 100644 index 000000000..c593e86d7 --- /dev/null +++ b/src/org/redkale/mq/MessageCoder.java @@ -0,0 +1,69 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * 将MessageRecord.content内容加解密 + * + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 泛型 + */ +public interface MessageCoder { + + //编码 + public byte[] encode(T data); + + //解码 + public T decode(byte[] data); + + public static byte[] getBytes(String value) { + if (value == null || value.isEmpty()) return MessageRecord.EMPTY_BYTES; + return value.getBytes(StandardCharsets.UTF_8); + } + + public static void putLongString(ByteBuffer buffer, String value) { + if (value == null || value.isEmpty()) { + buffer.putInt(0); + } else { + byte[] bs = value.getBytes(StandardCharsets.UTF_8); + buffer.putInt(bs.length); + buffer.put(bs); + } + } + + public static String getLongString(ByteBuffer buffer) { + int len = buffer.getInt(); + if (len == 0) return null; + byte[] bs = new byte[len]; + buffer.get(bs); + return new String(bs, StandardCharsets.UTF_8); + } + + public static void putShortString(ByteBuffer buffer, String value) { + if (value == null || value.isEmpty()) { + buffer.putChar((char) 0); + } else { + byte[] bs = value.getBytes(StandardCharsets.UTF_8); + buffer.putChar((char) bs.length); + buffer.put(bs); + } + } + + public static String getShortString(ByteBuffer buffer) { + int len = buffer.getChar(); + if (len == 0) return null; + byte[] bs = new byte[len]; + buffer.get(bs); + return new String(bs, StandardCharsets.UTF_8); + } +} diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index d70e1d09c..094eccea4 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -21,6 +21,8 @@ import org.redkale.util.Comment; */ public class MessageRecord implements Serializable { + static final byte[] EMPTY_BYTES = new byte[0]; + @ConvertColumn(index = 1) @Comment("消息序列号") protected long seqid; @@ -140,6 +142,16 @@ public class MessageRecord implements Serializable { return (T) convert.convertFrom(type, this.content); } + public T decodeContent(MessageCoder coder, java.lang.reflect.Type type) { + if (this.content == null || this.content.length == 0) return null; + return (T) coder.decode(this.content); + } + + public MessageRecord encodeContent(MessageCoder coder, T data) { + this.content = coder.encode(data); + return this; + } + public MessageRecord version(int version) { this.version = version; return this; diff --git a/src/org/redkale/mq/MessageRecordCoder.java b/src/org/redkale/mq/MessageRecordCoder.java new file mode 100644 index 000000000..11f391007 --- /dev/null +++ b/src/org/redkale/mq/MessageRecordCoder.java @@ -0,0 +1,82 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.nio.ByteBuffer; +import org.redkale.convert.ConvertType; + +/** + * MessageRecord的MessageCoder实现 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class MessageRecordCoder implements MessageCoder { + + private static final MessageRecordCoder instance = new MessageRecordCoder(); + + public static MessageRecordCoder getInstance() { + return instance; + } + + @Override + public byte[] encode(MessageRecord data) { + if (data == null) return null; + byte[] stopics = MessageCoder.getBytes(data.getTopic()); + byte[] dtopics = MessageCoder.getBytes(data.getResptopic()); + byte[] groupid = MessageCoder.getBytes(data.getGroupid()); + int count = 8 + 4 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); + final byte[] bs = new byte[count]; + ByteBuffer buffer = ByteBuffer.wrap(bs); + buffer.putLong(data.getSeqid()); + buffer.putInt(data.getVersion()); + buffer.putInt(data.getFormat() == null ? 0 : data.getFormat().getValue()); + buffer.putInt(data.getFlag()); + buffer.putLong(data.getCreatetime()); + buffer.putInt(data.getUserid()); + buffer.putChar((char) groupid.length); + if (groupid.length > 0) buffer.put(groupid); + buffer.putChar((char) stopics.length); + if (stopics.length > 0) buffer.put(stopics); + buffer.putChar((char) dtopics.length); + if (dtopics.length > 0) buffer.put(dtopics); + if (data.getContent() == null) { + buffer.putInt(0); + } else { + buffer.putInt(data.getContent().length); + buffer.put(data.getContent()); + } + return bs; + } + + @Override + public MessageRecord decode(byte[] data) { + if (data == null) return null; + ByteBuffer buffer = ByteBuffer.wrap(data); + long seqid = buffer.getLong(); + int version = buffer.getInt(); + ConvertType format = ConvertType.find(buffer.getInt()); + int flag = buffer.getInt(); + long createtime = buffer.getLong(); + int userid = buffer.getInt(); + + String groupid = MessageCoder.getShortString(buffer); + String topic = MessageCoder.getShortString(buffer); + String resptopic = MessageCoder.getShortString(buffer); + + byte[] content = null; + int contentlen = buffer.getInt(); + if (contentlen > 0) { + content = new byte[contentlen]; + buffer.get(content); + } + return new MessageRecord(seqid, version, format, flag, + createtime, userid, groupid, topic, resptopic, content); + } + +}