优化日志

This commit is contained in:
redkale
2023-10-16 16:42:13 +08:00
parent 6d63798ccf
commit c4b91ebbc6
3 changed files with 19 additions and 70 deletions

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.mq;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
@@ -20,7 +19,6 @@ import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT;
import static org.redkale.mq.MessageRecord.CTYPE_STRING;
import org.redkale.net.http.HttpResult;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.util.RedkaleException;
@@ -123,77 +121,19 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
return messageProcessors.size() < 1;
}
public MessageRecord createMessageRecord(String respTopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, null, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String respTopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String respTopic, String traceid, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, respTopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(int userid, String topic, String respTopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(int userid, String topic, String respTopic, String traceid, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, respTopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String respTopic, String traceid, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, respTopic, traceid, convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int userid, String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
null, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
groupid, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid,
groupid, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String respTopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, respTopic, Traces.currentTraceid(), content);
}
public MessageRecord createMessageRecord(long seqid, String topic, String respTopic, byte[] content) {
return new MessageRecord(seqid, (byte) 0, topic, respTopic, Traces.currentTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, byte[] content) {
public MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, respTopic, Traces.currentTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, String traceid, byte[] content) {
public MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, String traceid, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, respTopic, traceid, content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, byte[] content) {
public MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, respTopic, Traces.currentTraceid(), content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, String traceid, byte[] content) {
public MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, String traceid, byte[] content) {
return new MessageRecord(seqid, ctype, topic, respTopic, traceid, content);
}

View File

@@ -7,12 +7,15 @@ package org.redkale.mq;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.redkale.annotation.Comment;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.net.http.WebSocketPacket;
import org.redkale.net.sncp.SncpHeader;
import org.redkale.util.RedkaleException;
import org.redkale.util.StringWrapper;
/**
* 存在MQ里面的数据结构<p>
@@ -30,16 +33,16 @@ public class MessageRecord implements Serializable {
static final byte[] EMPTY_BYTES = new byte[0];
protected static final byte CTYPE_STRING = 1;
public static final byte CTYPE_STRING = 1;
//Bson bytes
protected static final byte CTYPE_BSON = 2;
public static final byte CTYPE_BSON = 2;
//HttpSimpleRequest
protected static final byte CTYPE_HTTP_REQUEST = 3;
public static final byte CTYPE_HTTP_REQUEST = 3;
//HttpResult<byte[]>
protected static final byte CTYPE_HTTP_RESULT = 4;
public static final byte CTYPE_HTTP_RESULT = 4;
@ConvertColumn(index = 1)
@Comment("消息序列号")
@@ -358,7 +361,13 @@ public class MessageRecord implements Serializable {
sb.append(",\"actionName\":").append(localActionName);
}
if (localParams != null) {
sb.append(",\"params\":").append(JsonConvert.root().convertTo(localParams));
Object[] ps = Arrays.copyOf(localParams, localParams.length);
for (int i = 0; i < ps.length; i++) {
if (ps[i] instanceof WebSocketPacket || ps[i] instanceof MessageRecord) {
ps[i] = new StringWrapper(ps[i].toString());
}
}
sb.append(",\"params\":").append(JsonConvert.root().convertTo(ps));
}
} else {
sb.append(",\"content\":\"").append(new String(this.content, StandardCharsets.UTF_8)).append("\"");

View File

@@ -163,7 +163,7 @@ public class SncpRemoteInfo<T extends Service> {
}
ByteArray array = new ByteArray();
request.writeTo(null, array);
MessageRecord message = messageAgent.getSncpMessageClient().createMessageRecord(targetTopic, null, array.getBytes());
MessageRecord message = messageAgent.getSncpMessageClient().createMessageRecord(MessageRecord.CTYPE_BSON, targetTopic, null, array.getBytes());
final String tt = targetTopic;
message.localActionName(action.actionName());
message.localParams(params);