HttpSimpleRequest和client模块增加链路ID

This commit is contained in:
Redkale
2022-11-24 15:31:24 +08:00
parent cc397ed07b
commit 050ebc673a
10 changed files with 87 additions and 29 deletions

View File

@@ -133,6 +133,7 @@
System.setProperty("redkale.convert.tiny", "true");
System.setProperty("redkale.convert.pool.size", "128");
System.setProperty("redkale.convert.writer.buffer.defsize", "4096");
System.setProperty("redkale.trace.enable", "false");
<properties>节点下也可包含非<property>节点.
非<property>其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[]

View File

@@ -677,6 +677,7 @@ public final class Application {
System.setProperty("redkale.convert.tiny", "true");
System.setProperty("redkale.convert.pool.size", "128");
System.setProperty("redkale.convert.writer.buffer.defsize", "4096");
System.setProperty("redkale.trace.enable", "false");
final String confDir = this.confPath.toString();
// String pidstr = "";

View File

@@ -174,20 +174,20 @@ public class HttpMessageClient extends MessageClient {
}
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
sendMessage(message, false, counter);
}
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
sendMessage(message, false, counter);
}

View File

@@ -30,6 +30,7 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
@Override
public byte[] encode(HttpSimpleRequest data) {
byte[] traceid = MessageCoder.getBytes(data.getTraceid());//short-string
byte[] requestURI = MessageCoder.getBytes(data.getRequestURI()); //long-string
byte[] path = MessageCoder.getBytes(data.getPath()); //short-string
byte[] remoteAddr = MessageCoder.getBytes(data.getRemoteAddr());//short-string
@@ -39,11 +40,11 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
byte[] params = MessageCoder.getBytes(data.getParams());
byte[] body = MessageCoder.getBytes(data.getBody());
byte[] userid = MessageCoder.encodeUserid(data.getCurrentUserid());
int count = 1 //rpc
+ 1 //frombody
int count = 1 //rpc + frombody
+ 4 //hashid
+ 4 //reqConvertType
+ 4 //respConvertType
+ 2 + traceid.length
+ 4 + requestURI.length
+ 2 + path.length
+ 2 + remoteAddr.length
@@ -54,11 +55,12 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
+ 4 + body.length;
byte[] bs = new byte[count];
ByteBuffer buffer = ByteBuffer.wrap(bs);
buffer.put((byte) (data.isRpc() ? 'T' : 'F'));
buffer.put((byte) (data.isFrombody() ? 'T' : 'F'));
buffer.put((byte) ((data.isRpc() ? 0b01 : 0) | (data.isFrombody() ? 0b10 : 0)));
buffer.putInt(data.getHashid());
buffer.putInt(data.getReqConvertType() == null ? 0 : data.getReqConvertType().getValue());
buffer.putInt(data.getRespConvertType() == null ? 0 : data.getRespConvertType().getValue());
buffer.putChar((char) traceid.length);
if (traceid.length > 0) buffer.put(traceid);
buffer.putInt(requestURI.length);
if (requestURI.length > 0) buffer.put(requestURI);
buffer.putChar((char) path.length);
@@ -83,13 +85,15 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
if (data == null) return null;
ByteBuffer buffer = ByteBuffer.wrap(data);
HttpSimpleRequest req = new HttpSimpleRequest();
req.setRpc(buffer.get() == 'T');
req.setFrombody(buffer.get() == 'T');
byte opt = buffer.get();
req.setRpc((opt & 0b01) > 0);
req.setFrombody((opt & 0b10) > 0);
req.setHashid(buffer.getInt());
int reqformat = buffer.getInt();
int respformat = buffer.getInt();
if (reqformat != 0) req.setReqConvertType(ConvertType.find(reqformat));
if (respformat != 0) req.setRespConvertType(ConvertType.find(respformat));
req.setTraceid(MessageCoder.getShortString(buffer));
req.setRequestURI(MessageCoder.getLongString(buffer));
req.setPath(MessageCoder.getShortString(buffer));
req.setRemoteAddr(MessageCoder.getShortString(buffer));

View File

@@ -178,10 +178,18 @@ public abstract class MessageClient {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.createTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, traceid, content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.createTraceid(), content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, traceid, content);
}
private byte ctype(Convert convert, Object bean) {
byte ctype = 0;
if (convert instanceof JsonConvert) {

View File

@@ -188,16 +188,24 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (rs.exc != null) {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(rs.exc);
} else {
workThread.execute(() -> respFuture.completeExceptionally(rs.exc));
workThread.execute(() -> {
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(rs.exc);
});
}
} else {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
Traces.currTraceid(request.traceid);
respFuture.complete(rs.result);
} else {
workThread.execute(() -> respFuture.complete(rs.result));
workThread.execute(() -> {
Traces.currTraceid(request.traceid);
respFuture.complete(rs.result);
});
}
}
} catch (Throwable t) {

View File

@@ -7,7 +7,7 @@ package org.redkale.net.client;
import java.util.function.*;
import org.redkale.net.WorkThread;
import org.redkale.util.ByteArray;
import org.redkale.util.*;
/**
*
@@ -24,12 +24,18 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
protected WorkThread workThread;
protected String traceid;
ClientFuture respFuture;
public long getCreateTime() {
return createTime;
}
public String getTraceid() {
return traceid;
}
public <T extends ClientRequest> T currThread(WorkThread thread) {
this.workThread = thread;
return (T) this;
@@ -56,10 +62,12 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
protected void prepare() {
this.createTime = System.currentTimeMillis();
this.traceid = Traces.currTraceid();
}
protected boolean recycle() {
this.createTime = 0;
this.traceid = null;
return true;
}
}

View File

@@ -87,6 +87,8 @@ public class HttpRequest extends Request<HttpContext> {
protected boolean rpc;
protected String traceid;
protected int readState = READ_STATE_ROUTE;
// @since 2.1.0
@@ -183,6 +185,7 @@ public class HttpRequest extends Request<HttpContext> {
protected HttpRequest initSimpleRequest(HttpSimpleRequest req, boolean needPath) {
if (req != null) {
this.rpc = req.rpc;
this.traceid = req.traceid;
if (req.getBody() != null) this.array.put(req.getBody());
if (req.getHeaders() != null) this.headers.putAll(req.getHeaders());
this.frombody = req.isFrombody();
@@ -236,6 +239,7 @@ public class HttpRequest extends Request<HttpContext> {
req.setRequestURI(uri);
req.setSessionid(getSessionid(false));
req.setRpc(this.rpc);
req.setTraceid(this.traceid);
return req;
}
@@ -303,6 +307,7 @@ public class HttpRequest extends Request<HttpContext> {
this.keepAlive = httplast.keepAlive;
this.maybews = httplast.maybews;
this.rpc = httplast.rpc;
this.traceid = httplast.traceid;
this.hashid = httplast.hashid;
this.currentUserid = httplast.currentUserid;
this.frombody = httplast.frombody;
@@ -760,6 +765,7 @@ public class HttpRequest extends Request<HttpContext> {
req.keepAlive = this.keepAlive;
req.maybews = this.maybews;
req.rpc = this.rpc;
req.traceid = this.traceid;
req.hashid = this.hashid;
req.currentUserid = this.currentUserid;
req.currentUserSupplier = this.currentUserSupplier;
@@ -791,6 +797,7 @@ public class HttpRequest extends Request<HttpContext> {
this.cookies = null;
this.maybews = false;
this.rpc = false;
this.traceid = null;
this.readState = READ_STATE_ROUTE;
this.currentUserid = CURRUSERID_NIL;
this.currentUserSupplier = null;

View File

@@ -35,61 +35,65 @@ public class HttpSimpleRequest implements java.io.Serializable {
protected boolean frombody;
@ConvertColumn(index = 3)
@Comment("链路ID")
protected String traceid;
@ConvertColumn(index = 4)
@Comment("请求参数的ConvertType")
protected ConvertType reqConvertType;
@ConvertColumn(index = 4)
@ConvertColumn(index = 5)
@Comment("输出结果的ConvertType")
protected ConvertType respConvertType;
@ConvertColumn(index = 5)
@ConvertColumn(index = 6)
@Comment("请求的URI")
protected String requestURI;
@ConvertColumn(index = 6)
@ConvertColumn(index = 7)
@Comment("请求的前缀")
protected String path;
@ConvertColumn(index = 7)
@ConvertColumn(index = 8)
@Comment("客户端IP")
protected String remoteAddr;
@ConvertColumn(index = 8)
@ConvertColumn(index = 9)
@Comment("Locale国际化")
protected String locale;
@ConvertColumn(index = 9)
@ConvertColumn(index = 10)
@Comment("会话ID")
protected String sessionid;
@ConvertColumn(index = 10)
@ConvertColumn(index = 11)
@Comment("Content-Type")
protected String contentType;
@ConvertColumn(index = 11)
@ConvertColumn(index = 12)
protected int hashid;
@ConvertColumn(index = 12) //@since 2.5.0 由int改成Serializable, 具体数据类型只能是int、long、String
@ConvertColumn(index = 13) //@since 2.5.0 由int改成Serializable, 具体数据类型只能是int、long、String
protected Serializable currentUserid;
@ConvertColumn(index = 13)
@ConvertColumn(index = 14)
@Comment("http header信息")
protected Map<String, String> headers;
@ConvertColumn(index = 14)
@ConvertColumn(index = 15)
@Comment("参数信息")
protected Map<String, String> params;
@ConvertColumn(index = 15)
@ConvertColumn(index = 16)
@Comment("http body信息")
protected byte[] body; //对应HttpRequest.array
public static HttpSimpleRequest create(String requestURI) {
return new HttpSimpleRequest().requestURI(requestURI);
return new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid());
}
public static HttpSimpleRequest create(String requestURI, Object... params) {
HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI);
HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid());
int len = params.length / 2;
for (int i = 0; i < len; i++) {
req.param(params[i * 2].toString(), params[i * 2 + 1]);
@@ -115,6 +119,11 @@ public class HttpSimpleRequest implements java.io.Serializable {
return this;
}
public HttpSimpleRequest traceid(String traceid) {
this.traceid = traceid;
return this;
}
public HttpSimpleRequest requestURI(String requestURI) {
this.requestURI = requestURI;
return this;
@@ -298,6 +307,14 @@ public class HttpSimpleRequest implements java.io.Serializable {
this.rpc = rpc;
}
public String getTraceid() {
return traceid;
}
public void setTraceid(String traceid) {
this.traceid = traceid;
}
public String getRequestURI() {
return requestURI;
}

View File

@@ -2,6 +2,8 @@
*/
package org.redkale.util;
import java.util.function.Supplier;
/**
* 创建traceid工具类
*
@@ -15,21 +17,23 @@ public class Traces {
private static final boolean disabled = !Boolean.getBoolean("redkale.trace.enable");
private static ThreadLocal<String> localTrace = new ThreadLocal<>();
private static final ThreadLocal<String> localTrace = new ThreadLocal<>();
private static final Supplier<String> tidSupplier = () -> Utility.uuid();
public static boolean enable() {
return !disabled;
}
public static String onceTraceid() {
return disabled ? null : Utility.uuid();
return disabled ? null : tidSupplier.get();
}
public static String createTraceid() {
if (disabled) return null;
String traceid = localTrace.get();
if (traceid == null) {
traceid = Utility.uuid();
traceid = tidSupplier.get();
localTrace.set(traceid);
}
return traceid;