diff --git a/src/main/java/org/redkale/mq/HttpResultCoder.java b/src/main/java/org/redkale/mq/HttpResultCoder.java index b6a4f95f2..66df18a82 100644 --- a/src/main/java/org/redkale/mq/HttpResultCoder.java +++ b/src/main/java/org/redkale/mq/HttpResultCoder.java @@ -88,7 +88,7 @@ public class HttpResultCoder implements MessageCoder { ByteBuffer buffer = ByteBuffer.wrap(data); HttpResult result = new HttpResult(); result.setStatus(buffer.getInt()); - result.setContentType(MessageCoder.getShortString(buffer)); + result.setContentType(MessageCoder.getSmallString(buffer)); result.setHeaders(MessageCoder.getMap(buffer)); result.setCookies(getCookieList(buffer)); int len = buffer.getInt(); @@ -117,11 +117,11 @@ public class HttpResultCoder implements MessageCoder { final ByteBuffer buffer = ByteBuffer.wrap(bs); buffer.putChar((char) list.size()); list.forEach(cookie -> { - putShortString(buffer, cookie.getName()); - putShortString(buffer, cookie.getValue()); - putShortString(buffer, cookie.getDomain()); - putShortString(buffer, cookie.getPath()); - putShortString(buffer, cookie.getPortlist()); + putSmallString(buffer, cookie.getName()); + putSmallString(buffer, cookie.getValue()); + putSmallString(buffer, cookie.getDomain()); + putSmallString(buffer, cookie.getPath()); + putSmallString(buffer, cookie.getPortlist()); buffer.putLong(cookie.getMaxAge()); buffer.put(cookie.getSecure() ? (byte) 1 : (byte) 0); buffer.put(cookie.isHttpOnly() ? (byte) 1 : (byte) 0); @@ -136,10 +136,10 @@ public class HttpResultCoder implements MessageCoder { } final List list = new ArrayList<>(len); for (int i = 0; i < len; i++) { - HttpCookie cookie = new HttpCookie(getShortString(buffer), getShortString(buffer)); - cookie.setDomain(getShortString(buffer)); - cookie.setPath(getShortString(buffer)); - cookie.setPortlist(getShortString(buffer)); + HttpCookie cookie = new HttpCookie(getSmallString(buffer), getSmallString(buffer)); + cookie.setDomain(getSmallString(buffer)); + cookie.setPath(getSmallString(buffer)); + cookie.setPortlist(getSmallString(buffer)); cookie.setMaxAge(buffer.getLong()); cookie.setSecure(buffer.get() == 1); cookie.setHttpOnly(buffer.get() == 1); diff --git a/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java b/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java index 0ab50854d..c0a95495a 100644 --- a/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java +++ b/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java @@ -117,12 +117,12 @@ public class HttpSimpleRequestCoder implements MessageCoder { if (respformat != 0) { req.setRespConvertType(ConvertType.find(respformat)); } - req.setTraceid(MessageCoder.getShortString(buffer)); - req.setRequestURI(MessageCoder.getLongString(buffer)); - req.setPath(MessageCoder.getShortString(buffer)); - req.setRemoteAddr(MessageCoder.getShortString(buffer)); - req.setSessionid(MessageCoder.getShortString(buffer)); - req.setContentType(MessageCoder.getShortString(buffer)); + req.setTraceid(MessageCoder.getSmallString(buffer)); + req.setRequestURI(MessageCoder.getBigString(buffer)); + req.setPath(MessageCoder.getSmallString(buffer)); + req.setRemoteAddr(MessageCoder.getSmallString(buffer)); + req.setSessionid(MessageCoder.getSmallString(buffer)); + req.setContentType(MessageCoder.getSmallString(buffer)); req.setCurrentUserid(MessageCoder.decodeUserid(buffer)); req.setHeaders(MessageCoder.getMap(buffer)); req.setParams(MessageCoder.getMap(buffer)); diff --git a/src/main/java/org/redkale/mq/MessageCoder.java b/src/main/java/org/redkale/mq/MessageCoder.java index c798c59f8..9371bc17c 100644 --- a/src/main/java/org/redkale/mq/MessageCoder.java +++ b/src/main/java/org/redkale/mq/MessageCoder.java @@ -6,6 +6,7 @@ package org.redkale.mq; import java.io.Serializable; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; @@ -36,7 +37,7 @@ public interface MessageCoder { //消息内容的类型 public byte ctype(); - //type: 1:string, 2:int, 3:long + //type: 1:string, 2:int, 3:long, 4:BigInteger public static byte[] encodeUserid(Serializable value) { if (value == null) { return MessageRecord.EMPTY_BYTES; @@ -48,6 +49,9 @@ public interface MessageCoder { long val = (Long) value; return new byte[]{(byte) 3, (byte) (val >> 56 & 0xFF), (byte) (val >> 48 & 0xFF), (byte) (val >> 40 & 0xFF), (byte) (val >> 32 & 0xFF), (byte) (val >> 24 & 0xFF), (byte) (val >> 16 & 0xFF), (byte) (val >> 8 & 0xFF), (byte) (val & 0xFF)}; + } else if (value instanceof BigInteger) { + BigInteger val = (BigInteger) value; + return Utility.append(new byte[]{4}, val.toByteArray()); } String str = value.toString(); if (str.isEmpty()) { @@ -71,6 +75,9 @@ public interface MessageCoder { } byte[] bs = new byte[len - 1]; buffer.get(bs); + if (type == 4) { + return new BigInteger(bs); + } return new String(bs, StandardCharsets.UTF_8); } @@ -101,13 +108,13 @@ public interface MessageCoder { final ByteBuffer buffer = ByteBuffer.wrap(bs); buffer.putChar((char) map.size()); map.forEach((key, value) -> { - putShortString(buffer, key); - putLongString(buffer, value); + putSmallString(buffer, key); + putBigString(buffer, value); }); return bs; } - public static void putLongString(ByteBuffer buffer, String value) { + public static void putBigString(ByteBuffer buffer, String value) { if (value == null || value.isEmpty()) { buffer.putInt(0); } else { @@ -117,7 +124,7 @@ public interface MessageCoder { } } - public static String getLongString(ByteBuffer buffer) { + public static String getBigString(ByteBuffer buffer) { int len = buffer.getInt(); if (len == 0) { return null; @@ -127,7 +134,8 @@ public interface MessageCoder { return new String(bs, StandardCharsets.UTF_8); } - public static void putShortString(ByteBuffer buffer, String value) { + //一般用于存放类名、字段名、map中的key + public static void putSmallString(ByteBuffer buffer, String value) { if (value == null || value.isEmpty()) { buffer.putChar((char) 0); } else { @@ -137,7 +145,7 @@ public interface MessageCoder { } } - public static String getShortString(ByteBuffer buffer) { + public static String getSmallString(ByteBuffer buffer) { int len = buffer.getChar(); if (len == 0) { return null; @@ -154,7 +162,7 @@ public interface MessageCoder { } Map map = new HashMap<>(len); for (int i = 0; i < len; i++) { - map.put(getShortString(buffer), getLongString(buffer)); + map.put(getSmallString(buffer), getBigString(buffer)); } return map; } diff --git a/src/main/java/org/redkale/mq/MessageRecordSerializer.java b/src/main/java/org/redkale/mq/MessageRecordSerializer.java index c218174b9..1c37f2cff 100644 --- a/src/main/java/org/redkale/mq/MessageRecordSerializer.java +++ b/src/main/java/org/redkale/mq/MessageRecordSerializer.java @@ -108,10 +108,10 @@ public class MessageRecordSerializer implements MessageCoder { long createTime = buffer.getLong(); Serializable userid = MessageCoder.decodeUserid(buffer); - String groupid = MessageCoder.getShortString(buffer); - String topic = MessageCoder.getShortString(buffer); - String respTopic = MessageCoder.getShortString(buffer); - String traceid = MessageCoder.getShortString(buffer); + String groupid = MessageCoder.getSmallString(buffer); + String topic = MessageCoder.getSmallString(buffer); + String respTopic = MessageCoder.getSmallString(buffer); + String traceid = MessageCoder.getSmallString(buffer); byte[] content = null; int contentlen = buffer.getInt();