From cf5224a5f63a2fe7167d14e130b850510224bfce Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 28 May 2020 09:38:18 +0800 Subject: [PATCH] --- ...tpRequest.java => HttpMessageRequest.java} | 4 +-- ...Response.java => HttpMessageResponse.java} | 6 ++-- src/org/redkale/mq/MessageAgent.java | 23 ++++++++++-- src/org/redkale/mq/MessageRecord.java | 36 +++++++++---------- ...cpRequest.java => SncpMessageRequest.java} | 4 +-- ...Response.java => SncpMessageResponse.java} | 6 ++-- src/org/redkale/net/http/HttpRequest.java | 24 ++++++------- 7 files changed, 58 insertions(+), 45 deletions(-) rename src/org/redkale/mq/{MessageHttpRequest.java => HttpMessageRequest.java} (80%) rename src/org/redkale/mq/{MessageHttpResponse.java => HttpMessageResponse.java} (84%) rename src/org/redkale/mq/{MessageSncpRequest.java => SncpMessageRequest.java} (78%) rename src/org/redkale/mq/{MessageSncpResponse.java => SncpMessageResponse.java} (86%) diff --git a/src/org/redkale/mq/MessageHttpRequest.java b/src/org/redkale/mq/HttpMessageRequest.java similarity index 80% rename from src/org/redkale/mq/MessageHttpRequest.java rename to src/org/redkale/mq/HttpMessageRequest.java index 2fb799d5a..cbb1d81a3 100644 --- a/src/org/redkale/mq/MessageHttpRequest.java +++ b/src/org/redkale/mq/HttpMessageRequest.java @@ -12,11 +12,11 @@ import org.redkale.net.http.*; * * @author zhangjx */ -public class MessageHttpRequest extends HttpRequest { +public class HttpMessageRequest extends HttpRequest { protected String remoteAddr; - public MessageHttpRequest(HttpContext context) { + public HttpMessageRequest(HttpContext context) { super(context, null); } diff --git a/src/org/redkale/mq/MessageHttpResponse.java b/src/org/redkale/mq/HttpMessageResponse.java similarity index 84% rename from src/org/redkale/mq/MessageHttpResponse.java rename to src/org/redkale/mq/HttpMessageResponse.java index 6eb20ecab..a1eb2e181 100644 --- a/src/org/redkale/mq/MessageHttpResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -16,18 +16,18 @@ import org.redkale.util.ObjectPool; * * @author zhangjx */ -public class MessageHttpResponse extends HttpResponse { +public class HttpMessageResponse extends HttpResponse { protected MessageRecord message; protected BiConsumer resultConsumer; - public MessageHttpResponse(HttpContext context, MessageHttpRequest request, + public HttpMessageResponse(HttpContext context, HttpMessageRequest request, ObjectPool responsePool, HttpResponseConfig config) { super(context, request, responsePool, config); } - public MessageHttpResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { + public HttpMessageResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { this.message = message; this.resultConsumer = resultConsumer; return this; diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 5792289be..6904ffd83 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -5,12 +5,14 @@ */ package org.redkale.mq; -import java.util.List; +import java.util.*; import java.util.logging.Logger; +import org.redkale.boot.MessageAgentRoot; +import org.redkale.service.Service; import org.redkale.util.AnyValue; /** - * MQ管理 + * MQ管理器 * * * 详情见: https://redkale.org @@ -19,9 +21,16 @@ import org.redkale.util.AnyValue; */ public abstract class MessageAgent { + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + //MQ管理器名称 protected String name; - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + //application根MQ管理器 + protected MessageAgentRoot root; + + //本地Service消息接收处理器, key:topic + protected Map localConsumers; public void init(AnyValue config) { @@ -35,6 +44,14 @@ public abstract class MessageAgent { return name; } + public MessageAgentRoot getRoot() { + return root; + } + + public void setRoot(MessageAgentRoot root) { + this.root = root; + } + protected String checkName(String name) { //不能含特殊字符 if (name.isEmpty()) throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9"); if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9"); diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index d8cb714c8..0fae9319a 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -21,17 +21,13 @@ import org.redkale.util.Comment; */ public class MessageRecord implements Serializable { - public static final byte FORMAT_TEXT = 1; - - public static final byte FORMAT_BINARY = 2; - @ConvertColumn(index = 1) @Comment("消息序列号") protected long seqid; @ConvertColumn(index = 2) - @Comment("内容的格式") - protected byte format; + @Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null") + protected ConvertType format; @ConvertColumn(index = 3) @Comment("标记位, 自定义时使用") @@ -61,26 +57,26 @@ public class MessageRecord implements Serializable { } public MessageRecord(String resptopic, String content) { - this(System.nanoTime(), content == null ? 0 : FORMAT_TEXT, 0, 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord(String topic, String resptopic, String content) { - this(System.nanoTime(), content == null ? 0 : FORMAT_TEXT, 0, 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord(int userid, String topic, String resptopic, String content) { - this(System.nanoTime(), content == null ? 0 : FORMAT_TEXT, 0, userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } - public MessageRecord(byte format, String topic, String resptopic, byte[] content) { + public MessageRecord(ConvertType format, String topic, String resptopic, byte[] content) { this(System.nanoTime(), format, 0, 0, null, topic, resptopic, content); } - public MessageRecord(long seqid, byte format, String topic, String resptopic, byte[] content) { + public MessageRecord(long seqid, ConvertType format, String topic, String resptopic, byte[] content) { this(seqid, format, 0, null, topic, resptopic, content); } - public MessageRecord(long seqid, byte format, int userid, String groupid, String topic, String resptopic, byte[] content) { + public MessageRecord(long seqid, ConvertType format, int userid, String groupid, String topic, String resptopic, byte[] content) { this(seqid, format, 0, userid, groupid, topic, resptopic, content); } @@ -97,10 +93,10 @@ public class MessageRecord implements Serializable { } public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - this(System.nanoTime(), convert instanceof TextConvert ? FORMAT_TEXT : FORMAT_BINARY, flag, userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + this(System.nanoTime(), convert.getFactory().getConvertType(), flag, userid, groupid, topic, resptopic, convert.convertToBytes(bean)); } - public MessageRecord(long seqid, byte format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { + public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { this.seqid = seqid; this.format = format; this.flag = flag; @@ -125,7 +121,7 @@ public class MessageRecord implements Serializable { return this.resptopic == null || this.resptopic.isEmpty(); } - public MessageRecord format(byte format) { + public MessageRecord format(ConvertType format) { this.format = format; return this; } @@ -173,11 +169,11 @@ public class MessageRecord implements Serializable { this.seqid = seqid; } - public byte getFormat() { + public ConvertType getFormat() { return format; } - public void setFormat(byte format) { + public void setFormat(ConvertType format) { this.format = format; } @@ -234,18 +230,18 @@ public class MessageRecord implements Serializable { //return JsonConvert.root().convertTo(this); StringBuilder sb = new StringBuilder(128); sb.append("{\"seqid\":").append(this.seqid); - if (this.format != 0) sb.append(",\"format\":").append(this.format); + if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\""); if (this.flag != 0) sb.append(",\"flag\":").append(this.flag); if (this.userid != 0) sb.append(",\"userid\":").append(this.userid); if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\""); if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\""); if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\""); - if (this.content != null) sb.append(",\"content\":").append(this.format == FORMAT_TEXT ? ("\"" + new String(this.content, StandardCharsets.UTF_8) + "\"") : JsonConvert.root().convertTo(this.content)); + if (this.content != null) sb.append(",\"content\":").append(this.format == ConvertType.JSON ? ("\"" + new String(this.content, StandardCharsets.UTF_8) + "\"") : JsonConvert.root().convertTo(this.content)); sb.append("}"); return sb.toString(); } public static void main(String[] args) throws Throwable { - System.out.println(new MessageRecord(333, FORMAT_TEXT, 2, 3, null, "tt", null, "xxx".getBytes())); + System.out.println(new MessageRecord(333, ConvertType.JSON, 2, 3, null, "tt", null, "xxx".getBytes())); } } diff --git a/src/org/redkale/mq/MessageSncpRequest.java b/src/org/redkale/mq/SncpMessageRequest.java similarity index 78% rename from src/org/redkale/mq/MessageSncpRequest.java rename to src/org/redkale/mq/SncpMessageRequest.java index ec3e74bda..6ced5225b 100644 --- a/src/org/redkale/mq/MessageSncpRequest.java +++ b/src/org/redkale/mq/SncpMessageRequest.java @@ -15,9 +15,9 @@ import org.redkale.net.sncp.*; * * @author zhangjx */ -public class MessageSncpRequest extends SncpRequest { +public class SncpMessageRequest extends SncpRequest { - public MessageSncpRequest(SncpContext context) { + public SncpMessageRequest(SncpContext context) { super(context, null); } diff --git a/src/org/redkale/mq/MessageSncpResponse.java b/src/org/redkale/mq/SncpMessageResponse.java similarity index 86% rename from src/org/redkale/mq/MessageSncpResponse.java rename to src/org/redkale/mq/SncpMessageResponse.java index 3b5c500f4..163226a93 100644 --- a/src/org/redkale/mq/MessageSncpResponse.java +++ b/src/org/redkale/mq/SncpMessageResponse.java @@ -20,17 +20,17 @@ import org.redkale.util.ObjectPool; * * @author zhangjx */ -public class MessageSncpResponse extends SncpResponse { +public class SncpMessageResponse extends SncpResponse { protected MessageRecord message; protected BiConsumer resultConsumer; - public MessageSncpResponse(SncpContext context, MessageSncpRequest request, ObjectPool responsePool) { + public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool responsePool) { super(context, request, responsePool); } - public MessageSncpResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { + public SncpMessageResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { this.message = message; this.resultConsumer = resultConsumer; return this; diff --git a/src/org/redkale/net/http/HttpRequest.java b/src/org/redkale/net/http/HttpRequest.java index 36fe79053..691659d02 100644 --- a/src/org/redkale/net/http/HttpRequest.java +++ b/src/org/redkale/net/http/HttpRequest.java @@ -37,26 +37,26 @@ public class HttpRequest extends Request { public static final String SESSIONID_NAME = "JSESSIONID"; @Comment("Method GET/POST/...") - private String method; + protected String method; - private String protocol; + protected String protocol; protected String requestURI; - private byte[] queryBytes; + protected byte[] queryBytes; - private long contentLength = -1; + protected long contentLength = -1; - private String contentType; + protected String contentType; - private String host; + protected String host; - private String connection; + protected String connection; @Comment("原始的cookie字符串,解析后值赋给HttpCookie[] cookies") protected String cookie; - private HttpCookie[] cookies; + protected HttpCookie[] cookies; protected String newsessionid; @@ -64,10 +64,6 @@ public class HttpRequest extends Request { protected final DefaultAnyValue params = new DefaultAnyValue(); - private final ByteArray array = new ByteArray(); - - private boolean bodyparsed = false; - protected boolean boundary = false; protected int moduleid; @@ -78,6 +74,10 @@ public class HttpRequest extends Request { protected Object currentUser; + private final ByteArray array = new ByteArray(); + + private boolean bodyparsed = false; + private final String remoteAddrHeader; Object attachment; //仅供HttpServlet传递Entry使用