This commit is contained in:
@@ -12,11 +12,11 @@ import org.redkale.net.http.*;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class MessageHttpRequest extends HttpRequest {
|
||||
public class HttpMessageRequest extends HttpRequest {
|
||||
|
||||
protected String remoteAddr;
|
||||
|
||||
public MessageHttpRequest(HttpContext context) {
|
||||
public HttpMessageRequest(HttpContext context) {
|
||||
super(context, null);
|
||||
}
|
||||
|
||||
@@ -16,18 +16,18 @@ import org.redkale.util.ObjectPool;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class MessageHttpResponse extends HttpResponse {
|
||||
public class HttpMessageResponse extends HttpResponse {
|
||||
|
||||
protected MessageRecord message;
|
||||
|
||||
protected BiConsumer<MessageRecord, byte[]> resultConsumer;
|
||||
|
||||
public MessageHttpResponse(HttpContext context, MessageHttpRequest request,
|
||||
public HttpMessageResponse(HttpContext context, HttpMessageRequest request,
|
||||
ObjectPool<Response> responsePool, HttpResponseConfig config) {
|
||||
super(context, request, responsePool, config);
|
||||
}
|
||||
|
||||
public MessageHttpResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
|
||||
public HttpMessageResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
|
||||
this.message = message;
|
||||
this.resultConsumer = resultConsumer;
|
||||
return this;
|
||||
@@ -5,12 +5,14 @@
|
||||
*/
|
||||
package org.redkale.mq;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.logging.Logger;
|
||||
import org.redkale.boot.MessageAgentRoot;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
* MQ管理
|
||||
* MQ管理器
|
||||
*
|
||||
*
|
||||
* 详情见: https://redkale.org
|
||||
@@ -19,9 +21,16 @@ import org.redkale.util.AnyValue;
|
||||
*/
|
||||
public abstract class MessageAgent {
|
||||
|
||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||
|
||||
//MQ管理器名称
|
||||
protected String name;
|
||||
|
||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||
//application根MQ管理器
|
||||
protected MessageAgentRoot root;
|
||||
|
||||
//本地Service消息接收处理器, key:topic
|
||||
protected Map<String, Service> localConsumers;
|
||||
|
||||
public void init(AnyValue config) {
|
||||
|
||||
@@ -35,6 +44,14 @@ public abstract class MessageAgent {
|
||||
return name;
|
||||
}
|
||||
|
||||
public MessageAgentRoot getRoot() {
|
||||
return root;
|
||||
}
|
||||
|
||||
public void setRoot(MessageAgentRoot root) {
|
||||
this.root = root;
|
||||
}
|
||||
|
||||
protected String checkName(String name) { //不能含特殊字符
|
||||
if (name.isEmpty()) throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
|
||||
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
|
||||
|
||||
@@ -21,17 +21,13 @@ import org.redkale.util.Comment;
|
||||
*/
|
||||
public class MessageRecord implements Serializable {
|
||||
|
||||
public static final byte FORMAT_TEXT = 1;
|
||||
|
||||
public static final byte FORMAT_BINARY = 2;
|
||||
|
||||
@ConvertColumn(index = 1)
|
||||
@Comment("消息序列号")
|
||||
protected long seqid;
|
||||
|
||||
@ConvertColumn(index = 2)
|
||||
@Comment("内容的格式")
|
||||
protected byte format;
|
||||
@Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null")
|
||||
protected ConvertType format;
|
||||
|
||||
@ConvertColumn(index = 3)
|
||||
@Comment("标记位, 自定义时使用")
|
||||
@@ -61,26 +57,26 @@ public class MessageRecord implements Serializable {
|
||||
}
|
||||
|
||||
public MessageRecord(String resptopic, String content) {
|
||||
this(System.nanoTime(), content == null ? 0 : FORMAT_TEXT, 0, 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord(String topic, String resptopic, String content) {
|
||||
this(System.nanoTime(), content == null ? 0 : FORMAT_TEXT, 0, 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord(int userid, String topic, String resptopic, String content) {
|
||||
this(System.nanoTime(), content == null ? 0 : FORMAT_TEXT, 0, userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord(byte format, String topic, String resptopic, byte[] content) {
|
||||
public MessageRecord(ConvertType format, String topic, String resptopic, byte[] content) {
|
||||
this(System.nanoTime(), format, 0, 0, null, topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord(long seqid, byte format, String topic, String resptopic, byte[] content) {
|
||||
public MessageRecord(long seqid, ConvertType format, String topic, String resptopic, byte[] content) {
|
||||
this(seqid, format, 0, null, topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord(long seqid, byte format, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
public MessageRecord(long seqid, ConvertType format, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
this(seqid, format, 0, userid, groupid, topic, resptopic, content);
|
||||
}
|
||||
|
||||
@@ -97,10 +93,10 @@ public class MessageRecord implements Serializable {
|
||||
}
|
||||
|
||||
public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
this(System.nanoTime(), convert instanceof TextConvert ? FORMAT_TEXT : FORMAT_BINARY, flag, userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
this(System.nanoTime(), convert.getFactory().getConvertType(), flag, userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord(long seqid, byte format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
this.seqid = seqid;
|
||||
this.format = format;
|
||||
this.flag = flag;
|
||||
@@ -125,7 +121,7 @@ public class MessageRecord implements Serializable {
|
||||
return this.resptopic == null || this.resptopic.isEmpty();
|
||||
}
|
||||
|
||||
public MessageRecord format(byte format) {
|
||||
public MessageRecord format(ConvertType format) {
|
||||
this.format = format;
|
||||
return this;
|
||||
}
|
||||
@@ -173,11 +169,11 @@ public class MessageRecord implements Serializable {
|
||||
this.seqid = seqid;
|
||||
}
|
||||
|
||||
public byte getFormat() {
|
||||
public ConvertType getFormat() {
|
||||
return format;
|
||||
}
|
||||
|
||||
public void setFormat(byte format) {
|
||||
public void setFormat(ConvertType format) {
|
||||
this.format = format;
|
||||
}
|
||||
|
||||
@@ -234,18 +230,18 @@ public class MessageRecord implements Serializable {
|
||||
//return JsonConvert.root().convertTo(this);
|
||||
StringBuilder sb = new StringBuilder(128);
|
||||
sb.append("{\"seqid\":").append(this.seqid);
|
||||
if (this.format != 0) sb.append(",\"format\":").append(this.format);
|
||||
if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\"");
|
||||
if (this.flag != 0) sb.append(",\"flag\":").append(this.flag);
|
||||
if (this.userid != 0) sb.append(",\"userid\":").append(this.userid);
|
||||
if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\"");
|
||||
if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\"");
|
||||
if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\"");
|
||||
if (this.content != null) sb.append(",\"content\":").append(this.format == FORMAT_TEXT ? ("\"" + new String(this.content, StandardCharsets.UTF_8) + "\"") : JsonConvert.root().convertTo(this.content));
|
||||
if (this.content != null) sb.append(",\"content\":").append(this.format == ConvertType.JSON ? ("\"" + new String(this.content, StandardCharsets.UTF_8) + "\"") : JsonConvert.root().convertTo(this.content));
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Throwable {
|
||||
System.out.println(new MessageRecord(333, FORMAT_TEXT, 2, 3, null, "tt", null, "xxx".getBytes()));
|
||||
System.out.println(new MessageRecord(333, ConvertType.JSON, 2, 3, null, "tt", null, "xxx".getBytes()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,9 +15,9 @@ import org.redkale.net.sncp.*;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class MessageSncpRequest extends SncpRequest {
|
||||
public class SncpMessageRequest extends SncpRequest {
|
||||
|
||||
public MessageSncpRequest(SncpContext context) {
|
||||
public SncpMessageRequest(SncpContext context) {
|
||||
super(context, null);
|
||||
}
|
||||
|
||||
@@ -20,17 +20,17 @@ import org.redkale.util.ObjectPool;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class MessageSncpResponse extends SncpResponse {
|
||||
public class SncpMessageResponse extends SncpResponse {
|
||||
|
||||
protected MessageRecord message;
|
||||
|
||||
protected BiConsumer<MessageRecord, byte[]> resultConsumer;
|
||||
|
||||
public MessageSncpResponse(SncpContext context, MessageSncpRequest request, ObjectPool<Response> responsePool) {
|
||||
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool<Response> responsePool) {
|
||||
super(context, request, responsePool);
|
||||
}
|
||||
|
||||
public MessageSncpResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
|
||||
public SncpMessageResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
|
||||
this.message = message;
|
||||
this.resultConsumer = resultConsumer;
|
||||
return this;
|
||||
@@ -37,26 +37,26 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
public static final String SESSIONID_NAME = "JSESSIONID";
|
||||
|
||||
@Comment("Method GET/POST/...")
|
||||
private String method;
|
||||
protected String method;
|
||||
|
||||
private String protocol;
|
||||
protected String protocol;
|
||||
|
||||
protected String requestURI;
|
||||
|
||||
private byte[] queryBytes;
|
||||
protected byte[] queryBytes;
|
||||
|
||||
private long contentLength = -1;
|
||||
protected long contentLength = -1;
|
||||
|
||||
private String contentType;
|
||||
protected String contentType;
|
||||
|
||||
private String host;
|
||||
protected String host;
|
||||
|
||||
private String connection;
|
||||
protected String connection;
|
||||
|
||||
@Comment("原始的cookie字符串,解析后值赋给HttpCookie[] cookies")
|
||||
protected String cookie;
|
||||
|
||||
private HttpCookie[] cookies;
|
||||
protected HttpCookie[] cookies;
|
||||
|
||||
protected String newsessionid;
|
||||
|
||||
@@ -64,10 +64,6 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
|
||||
protected final DefaultAnyValue params = new DefaultAnyValue();
|
||||
|
||||
private final ByteArray array = new ByteArray();
|
||||
|
||||
private boolean bodyparsed = false;
|
||||
|
||||
protected boolean boundary = false;
|
||||
|
||||
protected int moduleid;
|
||||
@@ -78,6 +74,10 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
|
||||
protected Object currentUser;
|
||||
|
||||
private final ByteArray array = new ByteArray();
|
||||
|
||||
private boolean bodyparsed = false;
|
||||
|
||||
private final String remoteAddrHeader;
|
||||
|
||||
Object attachment; //仅供HttpServlet传递Entry使用
|
||||
|
||||
Reference in New Issue
Block a user