From 557c2c7858283334c08be23d6f7dc4fb6b9b171e Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 30 May 2020 16:40:33 +0800 Subject: [PATCH] --- .../redkale/mq/HttpSimpleRequestCoder.java | 39 +++++++++++++++++-- src/org/redkale/mq/MessageAgent.java | 5 ++- src/org/redkale/mq/MessageCoder.java | 35 +++++++++++++++++ .../redkale/net/http/HttpSimpleRequest.java | 21 ++++++---- 4 files changed, 88 insertions(+), 12 deletions(-) diff --git a/src/org/redkale/mq/HttpSimpleRequestCoder.java b/src/org/redkale/mq/HttpSimpleRequestCoder.java index d064fe303..2b0d94245 100644 --- a/src/org/redkale/mq/HttpSimpleRequestCoder.java +++ b/src/org/redkale/mq/HttpSimpleRequestCoder.java @@ -27,13 +27,46 @@ public class HttpSimpleRequestCoder implements MessageCoder { @Override public byte[] encode(HttpSimpleRequest data) { - byte[] requestURI = data.getRequestURI() == null ? new byte[0] : data.getRequestURI().getBytes(StandardCharsets.UTF_8); - return null; + byte[] requestURI = MessageCoder.getBytes(data.getRequestURI()); //long-string + byte[] remoteAddr = MessageCoder.getBytes(data.getRemoteAddr());//short-string + byte[] sessionid = MessageCoder.getBytes(data.getSessionid());//short-string + byte[] headers = MessageCoder.getBytes(data.getHeaders()); + byte[] params = MessageCoder.getBytes(data.getParams()); + byte[] body = MessageCoder.getBytes(data.getBody()); + int count = 4 + requestURI.length + 2 + remoteAddr.length + 2 + sessionid.length + + headers.length + params.length + 4 + body.length; + byte[] bs = new byte[count]; + ByteBuffer buffer = ByteBuffer.wrap(bs); + buffer.putInt(requestURI.length); + if (requestURI.length > 0) buffer.put(requestURI); + buffer.putChar((char) remoteAddr.length); + if (remoteAddr.length > 0) buffer.put(remoteAddr); + buffer.putChar((char) sessionid.length); + if (sessionid.length > 0) buffer.put(sessionid); + buffer.put(headers); + buffer.put(params); + buffer.putInt(body.length); + if (body.length > 0) buffer.put(body); + return bs; } @Override public HttpSimpleRequest decode(byte[] data) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + if (data == null) return null; + ByteBuffer buffer = ByteBuffer.wrap(data); + HttpSimpleRequest req = new HttpSimpleRequest(); + req.setRequestURI(MessageCoder.getLongString(buffer)); + req.setRemoteAddr(MessageCoder.getShortString(buffer)); + req.setSessionid(MessageCoder.getShortString(buffer)); + req.setHeaders(MessageCoder.getMap(buffer)); + req.setParams(MessageCoder.getMap(buffer)); + int len = buffer.getInt(); + if (len > 0) { + byte[] bs = new byte[len]; + buffer.get(bs); + req.setBody(bs); + } + return req; } protected static String getString(ByteBuffer buffer) { diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 72d900abc..98af61cd6 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -9,10 +9,10 @@ import java.util.*; import java.util.function.*; import java.util.logging.Logger; import org.redkale.boot.*; -import org.redkale.net.http.Rest; +import org.redkale.net.http.*; import org.redkale.net.sncp.Sncp; import org.redkale.service.Service; -import org.redkale.util.AnyValue; +import org.redkale.util.*; /** * MQ管理器 @@ -117,4 +117,5 @@ public abstract class MessageAgent { protected static String generateWebSocketRespTopic(Application application) { return "ws:resp:node" + application.getNodeid(); } + } diff --git a/src/org/redkale/mq/MessageCoder.java b/src/org/redkale/mq/MessageCoder.java index c593e86d7..58584a172 100644 --- a/src/org/redkale/mq/MessageCoder.java +++ b/src/org/redkale/mq/MessageCoder.java @@ -7,6 +7,9 @@ package org.redkale.mq; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import org.redkale.util.Utility; /** * 将MessageRecord.content内容加解密 @@ -26,11 +29,33 @@ public interface MessageCoder { //解码 public T decode(byte[] data); + public static byte[] getBytes(byte[] value) { + if (value == null) return MessageRecord.EMPTY_BYTES; + return value; + } + public static byte[] getBytes(String value) { if (value == null || value.isEmpty()) return MessageRecord.EMPTY_BYTES; return value.getBytes(StandardCharsets.UTF_8); } + public static byte[] getBytes(final Map map) { + if (map == null || map.isEmpty()) return new byte[2]; + final AtomicInteger len = new AtomicInteger(2); + map.forEach((key, value) -> { + len.addAndGet(2 + (key == null ? 0 : Utility.encodeUTF8Length(key))); + len.addAndGet(4 + (value == null ? 0 : Utility.encodeUTF8Length(value))); + }); + final byte[] bs = new byte[len.get()]; + final ByteBuffer buffer = ByteBuffer.wrap(bs); + buffer.putChar((char) map.size()); + map.forEach((key, value) -> { + putShortString(buffer, key); + putLongString(buffer, value); + }); + return bs; + } + public static void putLongString(ByteBuffer buffer, String value) { if (value == null || value.isEmpty()) { buffer.putInt(0); @@ -66,4 +91,14 @@ public interface MessageCoder { buffer.get(bs); return new String(bs, StandardCharsets.UTF_8); } + + public static Map getMap(ByteBuffer buffer) { + int len = buffer.getChar(); + if (len == 0) return null; + Map map = new HashMap<>(len); + for (int i = 0; i < len; i++) { + map.put(getShortString(buffer), getLongString(buffer)); + } + return map; + } } diff --git a/src/org/redkale/net/http/HttpSimpleRequest.java b/src/org/redkale/net/http/HttpSimpleRequest.java index 9c43f92dc..d29888355 100644 --- a/src/org/redkale/net/http/HttpSimpleRequest.java +++ b/src/org/redkale/net/http/HttpSimpleRequest.java @@ -6,6 +6,7 @@ package org.redkale.net.http; import java.util.*; +import org.redkale.convert.ConvertColumn; import org.redkale.convert.json.JsonConvert; /** @@ -18,16 +19,22 @@ import org.redkale.convert.json.JsonConvert; */ public class HttpSimpleRequest implements java.io.Serializable { + @ConvertColumn(index = 1) protected String requestURI; - protected String sessionid; - + @ConvertColumn(index = 2) protected String remoteAddr; + @ConvertColumn(index = 3) + protected String sessionid; + + @ConvertColumn(index = 4) protected Map headers; + @ConvertColumn(index = 5) protected Map params; + @ConvertColumn(index = 6) protected byte[] body; //对应HttpRequest.array public HttpSimpleRequest clearParams() { @@ -50,11 +57,6 @@ public class HttpSimpleRequest implements java.io.Serializable { return this; } - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } - public String getRequestURI() { return requestURI; } @@ -103,4 +105,9 @@ public class HttpSimpleRequest implements java.io.Serializable { this.body = body; } + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + }