This commit is contained in:
Redkale
2021-01-12 17:53:13 +08:00
parent af35e5100e
commit 71d179b8c3
5 changed files with 67 additions and 21 deletions

View File

@@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
/**
* 不依赖MessageRecord则可兼容RPC方式
@@ -172,20 +173,20 @@ public class HttpMessageClient extends MessageClient {
}
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
sendMessage(message, false, counter);
}
public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
sendMessage(message, false, counter);
}

View File

@@ -13,6 +13,7 @@ import org.redkale.net.Response;
import org.redkale.net.http.*;
import org.redkale.service.RetResult;
import org.redkale.util.ObjectPool;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT;
/**
*
@@ -73,7 +74,7 @@ public class HttpMessageResponse extends HttpResponse {
producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders());
}
byte[] content = HttpResultCoder.getInstance().encode(result);
producer.apply(messageClient.createMessageRecord(msg.getSeqid(), resptopic, null, content));
producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content));
}
@Override

View File

@@ -10,6 +10,9 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import static org.redkale.mq.MessageRecord.*;
import org.redkale.net.http.*;
/**
*
@@ -111,38 +114,58 @@ public abstract class MessageClient {
protected abstract MessageProducers getProducer();
public MessageRecord createMessageRecord(String resptopic, String content) {
return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
return new MessageRecord(System.nanoTime(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(System.nanoTime(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
return new MessageRecord(System.nanoTime(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {
return new MessageRecord(System.nanoTime(), topic, resptopic, content);
return new MessageRecord(System.nanoTime(), (byte) 0, topic, resptopic, content);
}
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, topic, resptopic, content);
return new MessageRecord(seqid, (byte) 0, topic, resptopic, content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(System.nanoTime(), ctype, topic, resptopic, content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, content);
}
private byte ctype(Convert convert, Object bean) {
byte ctype = 0;
if (convert instanceof JsonConvert) {
if (bean instanceof HttpSimpleRequest) {
ctype = CTYPE_HTTP_REQUEST;
} else if (bean instanceof HttpResult) {
ctype = CTYPE_HTTP_RESULT;
}
}
return ctype;
}
}

View File

@@ -26,6 +26,12 @@ public class MessageRecord implements Serializable {
static final byte[] EMPTY_BYTES = new byte[0];
protected static final byte CTYPE_STRING = 1;
protected static final byte CTYPE_HTTP_REQUEST = 2;
protected static final byte CTYPE_HTTP_RESULT = 3;
@ConvertColumn(index = 1)
@Comment("消息序列号")
protected long seqid;
@@ -62,19 +68,24 @@ public class MessageRecord implements Serializable {
@Comment("消息内容")
protected byte[] content;
@ConvertColumn(index = 10)
@Comment("消息内容的类型")
protected byte ctype;
public MessageRecord() {
}
protected MessageRecord(long seqid, String topic, String resptopic, byte[] content) {
this(seqid, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content);
protected MessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content);
}
protected MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
this(seqid, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content);
protected MessageRecord(long seqid, byte ctype, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content);
}
protected MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) {
protected MessageRecord(long seqid, byte ctype, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) {
this.seqid = seqid;
this.ctype = ctype;
this.version = version;
this.flag = flag;
this.createtime = createtime;
@@ -253,7 +264,15 @@ public class MessageRecord implements Serializable {
if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\"");
if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\"");
if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\"");
if (this.content != null) sb.append(",\"content\":").append(new String(this.content, StandardCharsets.UTF_8)).append("\"");
if (this.content != null) {
if (this.ctype == CTYPE_HTTP_REQUEST) {
sb.append(",\"content\":").append(HttpSimpleRequestCoder.getInstance().decode(this.content)).append("\"");
} else if (this.ctype == CTYPE_HTTP_RESULT) {
sb.append(",\"content\":").append(HttpResultCoder.getInstance().decode(this.content)).append("\"");
} else {
sb.append(",\"content\":").append(new String(this.content, StandardCharsets.UTF_8)).append("\"");
}
}
sb.append("}");
return sb.toString();
}

View File

@@ -31,10 +31,11 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
byte[] stopics = MessageCoder.getBytes(data.getTopic());
byte[] dtopics = MessageCoder.getBytes(data.getResptopic());
byte[] groupid = MessageCoder.getBytes(data.getGroupid());
int count = 8 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length);
int count = 8 + 1 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length);
final byte[] bs = new byte[count];
ByteBuffer buffer = ByteBuffer.wrap(bs);
buffer.putLong(data.getSeqid());
buffer.put(data.ctype);
buffer.putInt(data.getVersion());
buffer.putInt(data.getFlag());
buffer.putLong(data.getCreatetime());
@@ -59,6 +60,7 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
if (data == null) return null;
ByteBuffer buffer = ByteBuffer.wrap(data);
long seqid = buffer.getLong();
byte ctype = buffer.get();
int version = buffer.getInt();
int flag = buffer.getInt();
long createtime = buffer.getLong();
@@ -74,7 +76,7 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
content = new byte[contentlen];
buffer.get(content);
}
return new MessageRecord(seqid, version, flag, createtime, userid, groupid, topic, resptopic, content);
return new MessageRecord(seqid, ctype, version, flag, createtime, userid, groupid, topic, resptopic, content);
}
}