This commit is contained in:
Redkale
2020-05-30 15:57:56 +08:00
parent ed1b642d5b
commit cb2d355bc9
5 changed files with 213 additions and 0 deletions

View File

@@ -31,6 +31,10 @@ public class HttpMessageResponse extends HttpResponse {
super(context, request, responsePool, config);
}
public HttpMessageResponse(HttpContext context, HttpSimpleRequest req, HttpResponseConfig config) {
super(context, new HttpMessageRequest(context, req), null, config);
}
public HttpMessageResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
this.message = message;
this.resultConsumer = resultConsumer;

View File

@@ -0,0 +1,46 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.redkale.net.http.HttpSimpleRequest;
/**
* HttpSimpleRequest的MessageCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
private static final HttpSimpleRequestCoder instance = new HttpSimpleRequestCoder();
public static HttpSimpleRequestCoder getInstance() {
return instance;
}
@Override
public byte[] encode(HttpSimpleRequest data) {
byte[] requestURI = data.getRequestURI() == null ? new byte[0] : data.getRequestURI().getBytes(StandardCharsets.UTF_8);
return null;
}
@Override
public HttpSimpleRequest decode(byte[] data) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
protected static String getString(ByteBuffer buffer) {
int len = buffer.getInt();
if (len == 0) return null;
byte[] bs = new byte[len];
buffer.get(bs);
return new String(bs, StandardCharsets.UTF_8);
}
}

View File

@@ -0,0 +1,69 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* 将MessageRecord.content内容加解密
*
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <T> 泛型
*/
public interface MessageCoder<T> {
//编码
public byte[] encode(T data);
//解码
public T decode(byte[] data);
public static byte[] getBytes(String value) {
if (value == null || value.isEmpty()) return MessageRecord.EMPTY_BYTES;
return value.getBytes(StandardCharsets.UTF_8);
}
public static void putLongString(ByteBuffer buffer, String value) {
if (value == null || value.isEmpty()) {
buffer.putInt(0);
} else {
byte[] bs = value.getBytes(StandardCharsets.UTF_8);
buffer.putInt(bs.length);
buffer.put(bs);
}
}
public static String getLongString(ByteBuffer buffer) {
int len = buffer.getInt();
if (len == 0) return null;
byte[] bs = new byte[len];
buffer.get(bs);
return new String(bs, StandardCharsets.UTF_8);
}
public static void putShortString(ByteBuffer buffer, String value) {
if (value == null || value.isEmpty()) {
buffer.putChar((char) 0);
} else {
byte[] bs = value.getBytes(StandardCharsets.UTF_8);
buffer.putChar((char) bs.length);
buffer.put(bs);
}
}
public static String getShortString(ByteBuffer buffer) {
int len = buffer.getChar();
if (len == 0) return null;
byte[] bs = new byte[len];
buffer.get(bs);
return new String(bs, StandardCharsets.UTF_8);
}
}

View File

@@ -21,6 +21,8 @@ import org.redkale.util.Comment;
*/
public class MessageRecord implements Serializable {
static final byte[] EMPTY_BYTES = new byte[0];
@ConvertColumn(index = 1)
@Comment("消息序列号")
protected long seqid;
@@ -140,6 +142,16 @@ public class MessageRecord implements Serializable {
return (T) convert.convertFrom(type, this.content);
}
public <T> T decodeContent(MessageCoder<T> coder, java.lang.reflect.Type type) {
if (this.content == null || this.content.length == 0) return null;
return (T) coder.decode(this.content);
}
public <T> MessageRecord encodeContent(MessageCoder<T> coder, T data) {
this.content = coder.encode(data);
return this;
}
public MessageRecord version(int version) {
this.version = version;
return this;

View File

@@ -0,0 +1,82 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.nio.ByteBuffer;
import org.redkale.convert.ConvertType;
/**
* MessageRecord的MessageCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class MessageRecordCoder implements MessageCoder<MessageRecord> {
private static final MessageRecordCoder instance = new MessageRecordCoder();
public static MessageRecordCoder getInstance() {
return instance;
}
@Override
public byte[] encode(MessageRecord data) {
if (data == null) return null;
byte[] stopics = MessageCoder.getBytes(data.getTopic());
byte[] dtopics = MessageCoder.getBytes(data.getResptopic());
byte[] groupid = MessageCoder.getBytes(data.getGroupid());
int count = 8 + 4 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length);
final byte[] bs = new byte[count];
ByteBuffer buffer = ByteBuffer.wrap(bs);
buffer.putLong(data.getSeqid());
buffer.putInt(data.getVersion());
buffer.putInt(data.getFormat() == null ? 0 : data.getFormat().getValue());
buffer.putInt(data.getFlag());
buffer.putLong(data.getCreatetime());
buffer.putInt(data.getUserid());
buffer.putChar((char) groupid.length);
if (groupid.length > 0) buffer.put(groupid);
buffer.putChar((char) stopics.length);
if (stopics.length > 0) buffer.put(stopics);
buffer.putChar((char) dtopics.length);
if (dtopics.length > 0) buffer.put(dtopics);
if (data.getContent() == null) {
buffer.putInt(0);
} else {
buffer.putInt(data.getContent().length);
buffer.put(data.getContent());
}
return bs;
}
@Override
public MessageRecord decode(byte[] data) {
if (data == null) return null;
ByteBuffer buffer = ByteBuffer.wrap(data);
long seqid = buffer.getLong();
int version = buffer.getInt();
ConvertType format = ConvertType.find(buffer.getInt());
int flag = buffer.getInt();
long createtime = buffer.getLong();
int userid = buffer.getInt();
String groupid = MessageCoder.getShortString(buffer);
String topic = MessageCoder.getShortString(buffer);
String resptopic = MessageCoder.getShortString(buffer);
byte[] content = null;
int contentlen = buffer.getInt();
if (contentlen > 0) {
content = new byte[contentlen];
buffer.get(content);
}
return new MessageRecord(seqid, version, format, flag,
createtime, userid, groupid, topic, resptopic, content);
}
}