From 9c5e23090c6ad08e97f20292794e53c7e5fdd783 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 1 Oct 2020 22:49:51 +0800 Subject: [PATCH] --- src/org/redkale/mq/HttpMessageClient.java | 115 +++++------------- .../redkale/mq/HttpMessageClusterClient.java | 13 +- src/org/redkale/mq/HttpMessageRequest.java | 32 ----- src/org/redkale/mq/HttpMessageResponse.java | 4 +- .../redkale/mq/HttpSimpleRequestCoder.java | 15 ++- src/org/redkale/mq/MessageClient.java | 4 - src/org/redkale/mq/MessageRecord.java | 76 ++++-------- src/org/redkale/mq/MessageRecordCoder.java | 8 +- src/org/redkale/mq/SncpMessageClient.java | 2 - src/org/redkale/mq/SncpMessageResponse.java | 5 +- src/org/redkale/net/http/HttpRequest.java | 16 +-- .../redkale/net/http/HttpSimpleRequest.java | 78 ++++++++++-- src/org/redkale/net/sncp/SncpClient.java | 3 +- 13 files changed, 162 insertions(+), 209 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index 334b795d8..e21407ce4 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -10,7 +10,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; -import org.redkale.convert.ConvertType; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; @@ -56,177 +55,129 @@ public class HttpMessageClient extends MessageClient { } public final void produceMessage(HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); + produceMessage(generateHttpReqTopic(request, null), 0, null, request, null); } public final void produceMessage(HttpSimpleRequest request, AtomicLong counter) { - produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); + produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter); } public final void produceMessage(int userid, HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); + produceMessage(generateHttpReqTopic(request, null), userid, null, request, null); } public final void produceMessage(int userid, String groupid, HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); + produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } public final void produceMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); + produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); } public final void produceMessage(String topic, HttpSimpleRequest request) { - produceMessage(topic, ConvertType.JSON, 0, null, request, null); + produceMessage(topic, 0, null, request, null); } public final void produceMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { - produceMessage(topic, ConvertType.JSON, 0, null, request, counter); - } - - public final void produceMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { - produceMessage(topic, convertType, 0, null, request, null); - } - - public final void produceMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { - produceMessage(topic, convertType, 0, null, request, counter); + produceMessage(topic, 0, null, request, counter); } public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { - produceMessage(topic, ConvertType.JSON, userid, groupid, request, null); - } - - public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - produceMessage(topic, ConvertType.JSON, userid, groupid, request, counter); - } - - public final void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { - produceMessage(topic, convertType, userid, groupid, request, null); + produceMessage(topic, userid, groupid, request, null); } public final void broadcastMessage(HttpSimpleRequest request) { - broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); + broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null); } public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) { - broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); + broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter); } public final void broadcastMessage(int userid, HttpSimpleRequest request) { - broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); + broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null); } public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request) { - broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); + broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); + broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); } public final void broadcastMessage(String topic, HttpSimpleRequest request) { - broadcastMessage(topic, ConvertType.JSON, 0, null, request, null); + broadcastMessage(topic, 0, null, request, null); } public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { - broadcastMessage(topic, ConvertType.JSON, 0, null, request, counter); - } - - public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { - broadcastMessage(topic, convertType, 0, null, request, null); - } - - public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { - broadcastMessage(topic, convertType, 0, null, request, counter); + broadcastMessage(topic, 0, null, request, counter); } public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { - broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, null); - } - - public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, counter); - } - - public final void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { - broadcastMessage(topic, convertType, userid, groupid, request, null); + broadcastMessage(topic, userid, groupid, request, null); } public final CompletableFuture sendMessage(HttpSimpleRequest request, Type type) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null).thenApply((HttpResult httbs) -> { + return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult httbs) -> { if (httbs == null || httbs.getResult() == null) return null; return JsonConvert.root().convertFrom(type, httbs.getResult()); }); } public final CompletableFuture sendMessage(int userid, HttpSimpleRequest request, Type type) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null).thenApply((HttpResult httbs) -> { + return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null).thenApply((HttpResult httbs) -> { if (httbs == null || httbs.getResult() == null) return null; return JsonConvert.root().convertFrom(type, httbs.getResult()); }); } public final CompletableFuture> sendMessage(HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); + return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null); } public final CompletableFuture> sendMessage(HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); + return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter); } public final CompletableFuture> sendMessage(int userid, HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); + return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null); } public final CompletableFuture> sendMessage(int userid, String groupid, HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); + return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } public final CompletableFuture> sendMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); + return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); } public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request) { - return sendMessage(topic, ConvertType.JSON, 0, null, request, null); + return sendMessage(topic, 0, null, request, null); } public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, ConvertType.JSON, 0, null, request, counter); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { - return sendMessage(topic, convertType, 0, null, request, null); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, convertType, 0, null, request, counter); + return sendMessage(topic, 0, null, request, counter); } public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { - return sendMessage(topic, ConvertType.JSON, userid, groupid, request, null); + return sendMessage(topic, userid, null, request, (AtomicLong) null); } - public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, ConvertType.JSON, userid, groupid, request, counter); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { - return sendMessage(topic, convertType, userid, groupid, request, null); - } - - public CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + public CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } - public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } - public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } @@ -234,5 +185,5 @@ public class HttpMessageClient extends MessageClient { @Override protected MessageProducers getProducer() { return messageAgent.getHttpProducer(); - } + } } diff --git a/src/org/redkale/mq/HttpMessageClusterClient.java b/src/org/redkale/mq/HttpMessageClusterClient.java index 589ff50a5..01f01628c 100644 --- a/src/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/org/redkale/mq/HttpMessageClusterClient.java @@ -13,7 +13,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.*; import java.util.logging.Level; import org.redkale.cluster.ClusterAgent; -import org.redkale.convert.ConvertType; import org.redkale.net.http.*; /** @@ -45,17 +44,17 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { return httpAsync(userid, request); } @Override - public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { httpAsync(userid, request); } @Override - public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { mqtpAsync(userid, request); } @@ -70,6 +69,9 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture(); java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); + if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); + if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); + if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); if (headers != null) headers.forEach((n, v) -> { if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); @@ -115,6 +117,9 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); + if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); + if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); + if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); if (headers != null) headers.forEach((n, v) -> { if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); diff --git a/src/org/redkale/mq/HttpMessageRequest.java b/src/org/redkale/mq/HttpMessageRequest.java index 30e6deaa3..fbaea9a56 100644 --- a/src/org/redkale/mq/HttpMessageRequest.java +++ b/src/org/redkale/mq/HttpMessageRequest.java @@ -5,7 +5,6 @@ */ package org.redkale.mq; -import org.redkale.convert.*; import org.redkale.net.http.*; /** @@ -21,45 +20,14 @@ public class HttpMessageRequest extends HttpRequest { protected MessageRecord message; - protected Convert diyConvert; - public HttpMessageRequest(HttpContext context, MessageRecord message) { super(context, message.decodeContent(HttpSimpleRequestCoder.getInstance())); this.message = message; this.currentUserid = message.getUserid(); - if (message.getFormat() != ConvertType.JSON) { - this.diyConvert = ConvertFactory.findConvert(message.getFormat()); - } } public void setRequestURI(String uri) { this.requestURI = uri; } - @Override - public T getBodyJson(java.lang.reflect.Type type) { - if (diyConvert != null) return (T) diyConvert.convertFrom(type, getBody()); - return super.getBodyJson(type); - } - - @Override - public String getParameter(String name) { - if (diyConvert != null) return (String) diyConvert.convertFrom(String.class, getBody()); - return super.getParameter(name); - } - - @Override - public String getParameter(String name, String defaultValue) { - if (diyConvert != null) { - String val = (String) diyConvert.convertFrom(String.class, getBody()); - return val == null ? defaultValue : val; - } - return super.getParameter(name, defaultValue); - } - - @Override - public T getJsonParameter(java.lang.reflect.Type type, String name) { - if (diyConvert != null) return (T) diyConvert.convertFrom(type, getBody()); - return super.getJsonParameter(type, name); - } } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index 999403c99..30ad9ed92 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -62,15 +62,13 @@ public class HttpMessageResponse extends HttpResponse { //必须要塞入retcode, 开发者可以无需反序列化ret便可确定操作是否返回成功 if (!ret.isSuccess()) result.header("retcode", String.valueOf(ret.getRetcode())); } - if (msg.format == ConvertType.PROTOBUF && result.convert() == null) result.convert(ConvertFactory.findConvert(msg.format)); - ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType(); if (finest) { Object innerrs = result.getResult(); if (innerrs instanceof byte[]) innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); producer.logger.log(Level.FINEST, "HttpMessageProcessor.process seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); } byte[] content = HttpResultCoder.getInstance().encode(result); - producer.apply(new MessageRecord(msg.getSeqid(), format, resptopic, null, content)); + producer.apply(new MessageRecord(msg.getSeqid(), resptopic, null, content)); } @Override diff --git a/src/org/redkale/mq/HttpSimpleRequestCoder.java b/src/org/redkale/mq/HttpSimpleRequestCoder.java index 3c54c008b..ec636865a 100644 --- a/src/org/redkale/mq/HttpSimpleRequestCoder.java +++ b/src/org/redkale/mq/HttpSimpleRequestCoder.java @@ -7,6 +7,7 @@ package org.redkale.mq; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import org.redkale.convert.ConvertType; import org.redkale.net.http.HttpSimpleRequest; /** @@ -37,11 +38,18 @@ public class HttpSimpleRequestCoder implements MessageCoder { byte[] headers = MessageCoder.getBytes(data.getHeaders()); byte[] params = MessageCoder.getBytes(data.getParams()); byte[] body = MessageCoder.getBytes(data.getBody()); - int count = 1 + 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length + 2 + sessionid.length + int count = 1 //rpc + + 1 //frombody + + 4 //reqConvertType + + 4 //respConvertType + + 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length + 2 + sessionid.length + 2 + contentType.length + 4 + headers.length + params.length + 4 + body.length; byte[] bs = new byte[count]; ByteBuffer buffer = ByteBuffer.wrap(bs); buffer.put((byte) (data.isRpc() ? 'T' : 'F')); + buffer.put((byte) (data.isFrombody() ? 'T' : 'F')); + buffer.putInt(data.getReqConvertType() == null ? 0 : data.getReqConvertType().getValue()); + buffer.putInt(data.getRespConvertType() == null ? 0 : data.getRespConvertType().getValue()); buffer.putInt(requestURI.length); if (requestURI.length > 0) buffer.put(requestURI); buffer.putChar((char) path.length); @@ -66,6 +74,11 @@ public class HttpSimpleRequestCoder implements MessageCoder { ByteBuffer buffer = ByteBuffer.wrap(data); HttpSimpleRequest req = new HttpSimpleRequest(); req.setRpc(buffer.get() == 'T'); + req.setFrombody(buffer.get() == 'T'); + int reqformat = buffer.getInt(); + int respformat = buffer.getInt(); + if (reqformat != 0) req.setReqConvertType(ConvertType.find(reqformat)); + if (respformat != 0) req.setRespConvertType(ConvertType.find(respformat)); req.setRequestURI(MessageCoder.getLongString(buffer)); req.setPath(MessageCoder.getShortString(buffer)); req.setRemoteAddr(MessageCoder.getShortString(buffer)); diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 7b592daaf..34c1253d9 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -8,7 +8,6 @@ package org.redkale.mq; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; -import org.redkale.convert.ConvertType; /** * @@ -31,8 +30,6 @@ public abstract class MessageClient { protected String respConsumerid; - protected ConvertType convertType; - protected MessageClient(MessageAgent messageAgent) { this.messageAgent = messageAgent; } @@ -65,7 +62,6 @@ public abstract class MessageClient { } } } - if (convertType != null) message.setFormat(convertType); if (needresp && (message.getResptopic() == null || message.getResptopic().isEmpty())) { message.setResptopic(respTopic); } diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index e2e366f9b..ff09226a3 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -8,7 +8,6 @@ package org.redkale.mq; import java.io.Serializable; import java.nio.charset.StandardCharsets; import org.redkale.convert.*; -import org.redkale.convert.json.JsonConvert; import org.redkale.util.Comment; /** @@ -36,34 +35,30 @@ public class MessageRecord implements Serializable { protected int version; @ConvertColumn(index = 3) - @Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null, 普通文本也归于JSON") - protected ConvertType format; - - @ConvertColumn(index = 4) @Comment("标记位, 自定义时使用") protected int flag; - @ConvertColumn(index = 5) + @ConvertColumn(index = 4) @Comment("创建时间") protected long createtime; - @ConvertColumn(index = 6) + @ConvertColumn(index = 5) @Comment("用户ID,无用户信息视为0") protected int userid; - @ConvertColumn(index = 7) + @ConvertColumn(index = 6) @Comment("组ID") protected String groupid; - @ConvertColumn(index = 8) + @ConvertColumn(index = 7) @Comment("当前topic") protected String topic; - @ConvertColumn(index = 9) + @ConvertColumn(index = 8) @Comment("目标topic, 为空表示无目标topic") protected String resptopic; - @ConvertColumn(index = 10) + @ConvertColumn(index = 9) @Comment("消息内容") protected byte[] content; @@ -71,53 +66,48 @@ public class MessageRecord implements Serializable { } public MessageRecord(String resptopic, String content) { - this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord(String topic, String resptopic, String content) { - this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord(int userid, String topic, String resptopic, String content) { - this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); - } - - public MessageRecord(ConvertType format, String topic, String resptopic, byte[] content) { - this(System.nanoTime(), format, 0, 0, null, topic, resptopic, content); - } - - public MessageRecord(long seqid, ConvertType format, String topic, String resptopic, byte[] content) { - this(seqid, format, 0, null, topic, resptopic, content); - } - - public MessageRecord(long seqid, ConvertType format, int userid, String groupid, String topic, String resptopic, byte[] content) { - this(seqid, format, 0, userid, groupid, topic, resptopic, content); + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord(String topic, String resptopic, Convert convert, Object bean) { - this(0, null, topic, resptopic, convert, bean); + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { - this(userid, null, topic, resptopic, convert, bean); + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - this(0, userid, groupid, topic, resptopic, convert, bean); + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - this(System.nanoTime(), convert.getFactory().getConvertType(), flag, userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + this(System.nanoTime(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } - public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { - this(seqid, 1, format, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); + public MessageRecord(String topic, String resptopic, byte[] content) { + this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content); } - public MessageRecord(long seqid, int version, ConvertType format, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { + public MessageRecord(long seqid, String topic, String resptopic, byte[] content) { + this(seqid, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content); + } + + public MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { + this(seqid, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); + } + + public MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { this.seqid = seqid; this.version = version; - this.format = format; this.flag = flag; this.createtime = createtime; this.userid = userid; @@ -171,11 +161,6 @@ public class MessageRecord implements Serializable { return this; } - public MessageRecord format(ConvertType format) { - this.format = format; - return this; - } - public MessageRecord flag(int flag) { this.flag = flag; return this; @@ -232,14 +217,6 @@ public class MessageRecord implements Serializable { this.version = version; } - public ConvertType getFormat() { - return format; - } - - public void setFormat(ConvertType format) { - this.format = format; - } - public int getFlag() { return flag; } @@ -302,19 +279,18 @@ public class MessageRecord implements Serializable { StringBuilder sb = new StringBuilder(128); sb.append("{\"seqid\":").append(this.seqid); sb.append(",\"version\":").append(this.version); - if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\""); if (this.flag != 0) sb.append(",\"flag\":").append(this.flag); if (this.createtime != 0) sb.append(",\"createtime\":").append(this.createtime); if (this.userid != 0) sb.append(",\"userid\":").append(this.userid); if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\""); if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\""); if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\""); - if (this.content != null) sb.append(",\"content\":").append(this.format == ConvertType.JSON ? ("\"" + new String(this.content, StandardCharsets.UTF_8) + "\"") : JsonConvert.root().convertTo(this.content)); + if (this.content != null) sb.append(",\"content\":").append(new String(this.content, StandardCharsets.UTF_8)).append("\""); sb.append("}"); return sb.toString(); } // public static void main(String[] args) throws Throwable { -// System.out.println(new MessageRecord(333, ConvertType.JSON, 2, 3, null, "tt", null, "xxx".getBytes())); +// System.out.println(new MessageRecord(333, 2, 3, null, "tt", null, "xxx".getBytes())); // } } diff --git a/src/org/redkale/mq/MessageRecordCoder.java b/src/org/redkale/mq/MessageRecordCoder.java index 45054eff4..a2a1b93ae 100644 --- a/src/org/redkale/mq/MessageRecordCoder.java +++ b/src/org/redkale/mq/MessageRecordCoder.java @@ -6,7 +6,6 @@ package org.redkale.mq; import java.nio.ByteBuffer; -import org.redkale.convert.ConvertType; /** * MessageRecord的MessageCoder实现 @@ -32,12 +31,11 @@ public class MessageRecordCoder implements MessageCoder { byte[] stopics = MessageCoder.getBytes(data.getTopic()); byte[] dtopics = MessageCoder.getBytes(data.getResptopic()); byte[] groupid = MessageCoder.getBytes(data.getGroupid()); - int count = 8 + 4 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); + int count = 8 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); final byte[] bs = new byte[count]; ByteBuffer buffer = ByteBuffer.wrap(bs); buffer.putLong(data.getSeqid()); buffer.putInt(data.getVersion()); - buffer.putInt(data.getFormat() == null ? 0 : data.getFormat().getValue()); buffer.putInt(data.getFlag()); buffer.putLong(data.getCreatetime()); buffer.putInt(data.getUserid()); @@ -62,7 +60,6 @@ public class MessageRecordCoder implements MessageCoder { ByteBuffer buffer = ByteBuffer.wrap(data); long seqid = buffer.getLong(); int version = buffer.getInt(); - ConvertType format = ConvertType.find(buffer.getInt()); int flag = buffer.getInt(); long createtime = buffer.getLong(); int userid = buffer.getInt(); @@ -77,8 +74,7 @@ public class MessageRecordCoder implements MessageCoder { content = new byte[contentlen]; buffer.get(content); } - return new MessageRecord(seqid, version, format, flag, - createtime, userid, groupid, topic, resptopic, content); + return new MessageRecord(seqid, version, flag, createtime, userid, groupid, topic, resptopic, content); } } diff --git a/src/org/redkale/mq/SncpMessageClient.java b/src/org/redkale/mq/SncpMessageClient.java index 6b97ea703..43de5e0e5 100644 --- a/src/org/redkale/mq/SncpMessageClient.java +++ b/src/org/redkale/mq/SncpMessageClient.java @@ -7,7 +7,6 @@ package org.redkale.mq; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; -import org.redkale.convert.ConvertType; /** * @@ -23,7 +22,6 @@ public class SncpMessageClient extends MessageClient { protected SncpMessageClient(MessageAgent messageAgent) { super(messageAgent); this.respTopic = messageAgent.generateSncpRespTopic(); - this.convertType = ConvertType.BSON; } @Override diff --git a/src/org/redkale/mq/SncpMessageResponse.java b/src/org/redkale/mq/SncpMessageResponse.java index 39671f944..067535faf 100644 --- a/src/org/redkale/mq/SncpMessageResponse.java +++ b/src/org/redkale/mq/SncpMessageResponse.java @@ -6,7 +6,6 @@ package org.redkale.mq; import java.nio.ByteBuffer; -import org.redkale.convert.ConvertType; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; import org.redkale.net.sncp.*; @@ -50,12 +49,12 @@ public class SncpMessageResponse extends SncpResponse { if (out == null) { final byte[] result = new byte[SncpRequest.HEADER_SIZE]; fillHeader(ByteBuffer.wrap(result), 0, retcode); - producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, (byte[]) null)); + producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, (byte[]) null)); return; } final int respBodyLength = out.count(); //body总长度 final byte[] result = out.toArray(); fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode); - producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, result)); + producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, result)); } } diff --git a/src/org/redkale/net/http/HttpRequest.java b/src/org/redkale/net/http/HttpRequest.java index 8321f3f78..df8680652 100644 --- a/src/org/redkale/net/http/HttpRequest.java +++ b/src/org/redkale/net/http/HttpRequest.java @@ -110,18 +110,10 @@ public class HttpRequest extends Request { if (req != null) { this.rpc = req.rpc; if (req.getBody() != null) this.array.write(req.getBody()); - if (req.getHeaders() != null) { - this.headers.putAll(req.getHeaders()); - if (this.headers.containsKey(Rest.REST_HEADER_PARAM_FROM_BODY)) { - this.frombody = "true".equals(this.headers.get(Rest.REST_HEADER_RESP_CONVERT_TYPE)); - } - if (this.headers.containsKey(Rest.REST_HEADER_RESP_CONVERT_TYPE)) { - this.respConvert = ConvertFactory.findConvert(ConvertType.valueOf(this.headers.get(Rest.REST_HEADER_RESP_CONVERT_TYPE))); - } - if (this.headers.containsKey(Rest.REST_HEADER_REQ_CONVERT_TYPE)) { - this.reqConvert = ConvertFactory.findConvert(ConvertType.valueOf(this.headers.get(Rest.REST_HEADER_REQ_CONVERT_TYPE))); - } - } + if (req.getHeaders() != null) this.headers.putAll(req.getHeaders()); + this.frombody = req.isFrombody(); + this.reqConvert = req.getReqConvertType() == null ? null : ConvertFactory.findConvert(req.getReqConvertType()); + this.respConvert = req.getRespConvertType() == null ? null : ConvertFactory.findConvert(req.getRespConvertType()); if (req.getParams() != null) this.params.putAll(req.getParams()); if (req.getCurrentUserid() != 0) this.currentUserid = req.getCurrentUserid(); this.contentType = req.getContentType(); diff --git a/src/org/redkale/net/http/HttpSimpleRequest.java b/src/org/redkale/net/http/HttpSimpleRequest.java index 0551cdba6..d7e2b4752 100644 --- a/src/org/redkale/net/http/HttpSimpleRequest.java +++ b/src/org/redkale/net/http/HttpSimpleRequest.java @@ -30,37 +30,49 @@ public class HttpSimpleRequest implements java.io.Serializable { protected boolean rpc = true; @ConvertColumn(index = 2) + @Comment("是否从body中获取参数,比如protobuf数据格式") + protected boolean frombody; + + @ConvertColumn(index = 3) + @Comment("请求参数的ConvertType") + protected ConvertType reqConvertType; + + @ConvertColumn(index = 4) + @Comment("输出结果的ConvertType") + protected ConvertType respConvertType; + + @ConvertColumn(index = 5) @Comment("请求的URI") protected String requestURI; - @ConvertColumn(index = 3) + @ConvertColumn(index = 6) @Comment("请求的前缀") protected String path; - @ConvertColumn(index = 4) + @ConvertColumn(index = 7) @Comment("客户端IP") protected String remoteAddr; - @ConvertColumn(index = 5) + @ConvertColumn(index = 8) @Comment("会话ID") protected String sessionid; - @ConvertColumn(index = 6) + @ConvertColumn(index = 9) @Comment("Content-Type") protected String contentType; - @ConvertColumn(index = 7) + @ConvertColumn(index = 10) protected int currentUserid; - @ConvertColumn(index = 8) + @ConvertColumn(index = 11) @Comment("http header信息") protected Map headers; - @ConvertColumn(index = 9) + @ConvertColumn(index = 12) @Comment("参数信息") protected Map params; - @ConvertColumn(index = 10) + @ConvertColumn(index = 13) @Comment("http body信息") protected byte[] body; //对应HttpRequest.array @@ -105,6 +117,32 @@ public class HttpSimpleRequest implements java.io.Serializable { return this; } + public HttpSimpleRequest requestURI(boolean frombody) { + this.frombody = frombody; + return this; + } + + public HttpSimpleRequest frombody(boolean frombody) { + this.frombody = frombody; + return this; + } + + public HttpSimpleRequest bothConvertType(ConvertType convertType) { + this.reqConvertType = convertType; + this.respConvertType = convertType; + return this; + } + + public HttpSimpleRequest reqConvertType(ConvertType reqConvertType) { + this.reqConvertType = reqConvertType; + return this; + } + + public HttpSimpleRequest respConvertType(ConvertType respConvertType) { + this.respConvertType = respConvertType; + return this; + } + public HttpSimpleRequest remoteAddr(String remoteAddr) { this.remoteAddr = remoteAddr; return this; @@ -309,6 +347,30 @@ public class HttpSimpleRequest implements java.io.Serializable { this.body = body; } + public boolean isFrombody() { + return frombody; + } + + public void setFrombody(boolean frombody) { + this.frombody = frombody; + } + + public ConvertType getReqConvertType() { + return reqConvertType; + } + + public void setReqConvertType(ConvertType reqConvertType) { + this.reqConvertType = reqConvertType; + } + + public ConvertType getRespConvertType() { + return respConvertType; + } + + public void setRespConvertType(ConvertType respConvertType) { + this.respConvertType = respConvertType; + } + @Override public String toString() { return JsonConvert.root().convertTo(this); diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 4aab90060..46157b13d 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -15,7 +15,6 @@ import java.util.concurrent.*; import java.util.function.Supplier; import java.util.logging.*; import javax.annotation.Resource; -import org.redkale.convert.ConvertType; import org.redkale.convert.bson.*; import org.redkale.convert.json.*; import org.redkale.mq.*; @@ -288,7 +287,7 @@ public final class SncpClient { fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; if (targetTopic == null) targetTopic = this.topic; - MessageRecord message = new MessageRecord(ConvertType.BSON, targetTopic, null, reqbytes); + MessageRecord message = new MessageRecord(targetTopic, null, reqbytes); return messageClient.sendMessage(message).thenApply(msg -> { ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); checkResult(seqid, action, buffer);