This commit is contained in:
Redkale
2020-10-01 22:49:51 +08:00
parent c7b05e530d
commit 9c5e23090c
13 changed files with 162 additions and 209 deletions

View File

@@ -10,7 +10,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.convert.ConvertType;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*; import org.redkale.net.http.*;
@@ -56,177 +55,129 @@ public class HttpMessageClient extends MessageClient {
} }
public final void produceMessage(HttpSimpleRequest request) { public final void produceMessage(HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); produceMessage(generateHttpReqTopic(request, null), 0, null, request, null);
} }
public final void produceMessage(HttpSimpleRequest request, AtomicLong counter) { public final void produceMessage(HttpSimpleRequest request, AtomicLong counter) {
produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
} }
public final void produceMessage(int userid, HttpSimpleRequest request) { public final void produceMessage(int userid, HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); produceMessage(generateHttpReqTopic(request, null), userid, null, request, null);
} }
public final void produceMessage(int userid, String groupid, HttpSimpleRequest request) { public final void produceMessage(int userid, String groupid, HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
} }
public final void produceMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public final void produceMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
} }
public final void produceMessage(String topic, HttpSimpleRequest request) { public final void produceMessage(String topic, HttpSimpleRequest request) {
produceMessage(topic, ConvertType.JSON, 0, null, request, null); produceMessage(topic, 0, null, request, null);
} }
public final void produceMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { public final void produceMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
produceMessage(topic, ConvertType.JSON, 0, null, request, counter); produceMessage(topic, 0, null, request, counter);
}
public final void produceMessage(String topic, ConvertType convertType, HttpSimpleRequest request) {
produceMessage(topic, convertType, 0, null, request, null);
}
public final void produceMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) {
produceMessage(topic, convertType, 0, null, request, counter);
} }
public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request) {
produceMessage(topic, ConvertType.JSON, userid, groupid, request, null); produceMessage(topic, userid, groupid, request, null);
}
public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
produceMessage(topic, ConvertType.JSON, userid, groupid, request, counter);
}
public final void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) {
produceMessage(topic, convertType, userid, groupid, request, null);
} }
public final void broadcastMessage(HttpSimpleRequest request) { public final void broadcastMessage(HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null);
} }
public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) { public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
} }
public final void broadcastMessage(int userid, HttpSimpleRequest request) { public final void broadcastMessage(int userid, HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null);
} }
public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request) { public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
} }
public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
} }
public final void broadcastMessage(String topic, HttpSimpleRequest request) { public final void broadcastMessage(String topic, HttpSimpleRequest request) {
broadcastMessage(topic, ConvertType.JSON, 0, null, request, null); broadcastMessage(topic, 0, null, request, null);
} }
public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(topic, ConvertType.JSON, 0, null, request, counter); broadcastMessage(topic, 0, null, request, counter);
}
public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request) {
broadcastMessage(topic, convertType, 0, null, request, null);
}
public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(topic, convertType, 0, null, request, counter);
} }
public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, null); broadcastMessage(topic, userid, groupid, request, null);
}
public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, counter);
}
public final void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(topic, convertType, userid, groupid, request, null);
} }
public final <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) { public final <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null).thenApply((HttpResult<byte[]> httbs) -> { return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) return null; if (httbs == null || httbs.getResult() == null) return null;
return JsonConvert.root().convertFrom(type, httbs.getResult()); return JsonConvert.root().convertFrom(type, httbs.getResult());
}); });
} }
public final <T> CompletableFuture<T> sendMessage(int userid, HttpSimpleRequest request, Type type) { public final <T> CompletableFuture<T> sendMessage(int userid, HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null).thenApply((HttpResult<byte[]> httbs) -> { return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) return null; if (httbs == null || httbs.getResult() == null) return null;
return JsonConvert.root().convertFrom(type, httbs.getResult()); return JsonConvert.root().convertFrom(type, httbs.getResult());
}); });
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request, AtomicLong counter) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(int userid, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(int userid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(int userid, String groupid, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(int userid, String groupid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) {
return sendMessage(topic, ConvertType.JSON, 0, null, request, null); return sendMessage(topic, 0, null, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, ConvertType.JSON, 0, null, request, counter); return sendMessage(topic, 0, null, request, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request) {
return sendMessage(topic, convertType, 0, null, request, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, convertType, 0, null, request, counter);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request) {
return sendMessage(topic, ConvertType.JSON, userid, groupid, request, null); return sendMessage(topic, userid, null, request, (AtomicLong) null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, ConvertType.JSON, userid, groupid, request, counter); MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) {
return sendMessage(topic, convertType, userid, groupid, request, null);
}
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
} }
public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
sendMessage(message, false, counter); sendMessage(message, false, counter);
} }
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
sendMessage(message, false, counter); sendMessage(message, false, counter);
} }
@@ -234,5 +185,5 @@ public class HttpMessageClient extends MessageClient {
@Override @Override
protected MessageProducers getProducer() { protected MessageProducers getProducer() {
return messageAgent.getHttpProducer(); return messageAgent.getHttpProducer();
} }
} }

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.cluster.ClusterAgent; import org.redkale.cluster.ClusterAgent;
import org.redkale.convert.ConvertType;
import org.redkale.net.http.*; import org.redkale.net.http.*;
/** /**
@@ -45,17 +44,17 @@ public class HttpMessageClusterClient extends HttpMessageClient {
} }
@Override @Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return httpAsync(userid, request); return httpAsync(userid, request);
} }
@Override @Override
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
httpAsync(userid, request); httpAsync(userid, request);
} }
@Override @Override
public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
mqtpAsync(userid, request); mqtpAsync(userid, request);
} }
@@ -70,6 +69,9 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture(); if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture();
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000));
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid);
if (headers != null) headers.forEach((n, v) -> { if (headers != null) headers.forEach((n, v) -> {
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
@@ -115,6 +117,9 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000));
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid);
if (headers != null) headers.forEach((n, v) -> { if (headers != null) headers.forEach((n, v) -> {
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);

View File

@@ -5,7 +5,6 @@
*/ */
package org.redkale.mq; package org.redkale.mq;
import org.redkale.convert.*;
import org.redkale.net.http.*; import org.redkale.net.http.*;
/** /**
@@ -21,45 +20,14 @@ public class HttpMessageRequest extends HttpRequest {
protected MessageRecord message; protected MessageRecord message;
protected Convert diyConvert;
public HttpMessageRequest(HttpContext context, MessageRecord message) { public HttpMessageRequest(HttpContext context, MessageRecord message) {
super(context, message.decodeContent(HttpSimpleRequestCoder.getInstance())); super(context, message.decodeContent(HttpSimpleRequestCoder.getInstance()));
this.message = message; this.message = message;
this.currentUserid = message.getUserid(); this.currentUserid = message.getUserid();
if (message.getFormat() != ConvertType.JSON) {
this.diyConvert = ConvertFactory.findConvert(message.getFormat());
}
} }
public void setRequestURI(String uri) { public void setRequestURI(String uri) {
this.requestURI = uri; this.requestURI = uri;
} }
@Override
public <T> T getBodyJson(java.lang.reflect.Type type) {
if (diyConvert != null) return (T) diyConvert.convertFrom(type, getBody());
return super.getBodyJson(type);
}
@Override
public String getParameter(String name) {
if (diyConvert != null) return (String) diyConvert.convertFrom(String.class, getBody());
return super.getParameter(name);
}
@Override
public String getParameter(String name, String defaultValue) {
if (diyConvert != null) {
String val = (String) diyConvert.convertFrom(String.class, getBody());
return val == null ? defaultValue : val;
}
return super.getParameter(name, defaultValue);
}
@Override
public <T> T getJsonParameter(java.lang.reflect.Type type, String name) {
if (diyConvert != null) return (T) diyConvert.convertFrom(type, getBody());
return super.getJsonParameter(type, name);
}
} }

View File

@@ -62,15 +62,13 @@ public class HttpMessageResponse extends HttpResponse {
//必须要塞入retcode 开发者可以无需反序列化ret便可确定操作是否返回成功 //必须要塞入retcode 开发者可以无需反序列化ret便可确定操作是否返回成功
if (!ret.isSuccess()) result.header("retcode", String.valueOf(ret.getRetcode())); if (!ret.isSuccess()) result.header("retcode", String.valueOf(ret.getRetcode()));
} }
if (msg.format == ConvertType.PROTOBUF && result.convert() == null) result.convert(ConvertFactory.findConvert(msg.format));
ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType();
if (finest) { if (finest) {
Object innerrs = result.getResult(); Object innerrs = result.getResult();
if (innerrs instanceof byte[]) innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); if (innerrs instanceof byte[]) innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8);
producer.logger.log(Level.FINEST, "HttpMessageProcessor.process seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); producer.logger.log(Level.FINEST, "HttpMessageProcessor.process seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders());
} }
byte[] content = HttpResultCoder.getInstance().encode(result); byte[] content = HttpResultCoder.getInstance().encode(result);
producer.apply(new MessageRecord(msg.getSeqid(), format, resptopic, null, content)); producer.apply(new MessageRecord(msg.getSeqid(), resptopic, null, content));
} }
@Override @Override

View File

@@ -7,6 +7,7 @@ package org.redkale.mq;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.redkale.convert.ConvertType;
import org.redkale.net.http.HttpSimpleRequest; import org.redkale.net.http.HttpSimpleRequest;
/** /**
@@ -37,11 +38,18 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
byte[] headers = MessageCoder.getBytes(data.getHeaders()); byte[] headers = MessageCoder.getBytes(data.getHeaders());
byte[] params = MessageCoder.getBytes(data.getParams()); byte[] params = MessageCoder.getBytes(data.getParams());
byte[] body = MessageCoder.getBytes(data.getBody()); byte[] body = MessageCoder.getBytes(data.getBody());
int count = 1 + 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length + 2 + sessionid.length int count = 1 //rpc
+ 1 //frombody
+ 4 //reqConvertType
+ 4 //respConvertType
+ 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length + 2 + sessionid.length
+ 2 + contentType.length + 4 + headers.length + params.length + 4 + body.length; + 2 + contentType.length + 4 + headers.length + params.length + 4 + body.length;
byte[] bs = new byte[count]; byte[] bs = new byte[count];
ByteBuffer buffer = ByteBuffer.wrap(bs); ByteBuffer buffer = ByteBuffer.wrap(bs);
buffer.put((byte) (data.isRpc() ? 'T' : 'F')); buffer.put((byte) (data.isRpc() ? 'T' : 'F'));
buffer.put((byte) (data.isFrombody() ? 'T' : 'F'));
buffer.putInt(data.getReqConvertType() == null ? 0 : data.getReqConvertType().getValue());
buffer.putInt(data.getRespConvertType() == null ? 0 : data.getRespConvertType().getValue());
buffer.putInt(requestURI.length); buffer.putInt(requestURI.length);
if (requestURI.length > 0) buffer.put(requestURI); if (requestURI.length > 0) buffer.put(requestURI);
buffer.putChar((char) path.length); buffer.putChar((char) path.length);
@@ -66,6 +74,11 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
ByteBuffer buffer = ByteBuffer.wrap(data); ByteBuffer buffer = ByteBuffer.wrap(data);
HttpSimpleRequest req = new HttpSimpleRequest(); HttpSimpleRequest req = new HttpSimpleRequest();
req.setRpc(buffer.get() == 'T'); req.setRpc(buffer.get() == 'T');
req.setFrombody(buffer.get() == 'T');
int reqformat = buffer.getInt();
int respformat = buffer.getInt();
if (reqformat != 0) req.setReqConvertType(ConvertType.find(reqformat));
if (respformat != 0) req.setRespConvertType(ConvertType.find(respformat));
req.setRequestURI(MessageCoder.getLongString(buffer)); req.setRequestURI(MessageCoder.getLongString(buffer));
req.setPath(MessageCoder.getShortString(buffer)); req.setPath(MessageCoder.getShortString(buffer));
req.setRemoteAddr(MessageCoder.getShortString(buffer)); req.setRemoteAddr(MessageCoder.getShortString(buffer));

View File

@@ -8,7 +8,6 @@ package org.redkale.mq;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.convert.ConvertType;
/** /**
* *
@@ -31,8 +30,6 @@ public abstract class MessageClient {
protected String respConsumerid; protected String respConsumerid;
protected ConvertType convertType;
protected MessageClient(MessageAgent messageAgent) { protected MessageClient(MessageAgent messageAgent) {
this.messageAgent = messageAgent; this.messageAgent = messageAgent;
} }
@@ -65,7 +62,6 @@ public abstract class MessageClient {
} }
} }
} }
if (convertType != null) message.setFormat(convertType);
if (needresp && (message.getResptopic() == null || message.getResptopic().isEmpty())) { if (needresp && (message.getResptopic() == null || message.getResptopic().isEmpty())) {
message.setResptopic(respTopic); message.setResptopic(respTopic);
} }

View File

@@ -8,7 +8,6 @@ package org.redkale.mq;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Comment; import org.redkale.util.Comment;
/** /**
@@ -36,34 +35,30 @@ public class MessageRecord implements Serializable {
protected int version; protected int version;
@ConvertColumn(index = 3) @ConvertColumn(index = 3)
@Comment("内容的格式, 只能是JSON、BSON、PROTOBUF、DIY和null, 普通文本也归于JSON")
protected ConvertType format;
@ConvertColumn(index = 4)
@Comment("标记位, 自定义时使用") @Comment("标记位, 自定义时使用")
protected int flag; protected int flag;
@ConvertColumn(index = 5) @ConvertColumn(index = 4)
@Comment("创建时间") @Comment("创建时间")
protected long createtime; protected long createtime;
@ConvertColumn(index = 6) @ConvertColumn(index = 5)
@Comment("用户ID无用户信息视为0") @Comment("用户ID无用户信息视为0")
protected int userid; protected int userid;
@ConvertColumn(index = 7) @ConvertColumn(index = 6)
@Comment("组ID") @Comment("组ID")
protected String groupid; protected String groupid;
@ConvertColumn(index = 8) @ConvertColumn(index = 7)
@Comment("当前topic") @Comment("当前topic")
protected String topic; protected String topic;
@ConvertColumn(index = 9) @ConvertColumn(index = 8)
@Comment("目标topic, 为空表示无目标topic") @Comment("目标topic, 为空表示无目标topic")
protected String resptopic; protected String resptopic;
@ConvertColumn(index = 10) @ConvertColumn(index = 9)
@Comment("消息内容") @Comment("消息内容")
protected byte[] content; protected byte[] content;
@@ -71,53 +66,48 @@ public class MessageRecord implements Serializable {
} }
public MessageRecord(String resptopic, String content) { public MessageRecord(String resptopic, String content) {
this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord(String topic, String resptopic, String content) { public MessageRecord(String topic, String resptopic, String content) {
this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord(int userid, String topic, String resptopic, String content) { public MessageRecord(int userid, String topic, String resptopic, String content) {
this(System.nanoTime(), content == null ? null : ConvertType.JSON, 0, userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord(ConvertType format, String topic, String resptopic, byte[] content) {
this(System.nanoTime(), format, 0, 0, null, topic, resptopic, content);
}
public MessageRecord(long seqid, ConvertType format, String topic, String resptopic, byte[] content) {
this(seqid, format, 0, null, topic, resptopic, content);
}
public MessageRecord(long seqid, ConvertType format, int userid, String groupid, String topic, String resptopic, byte[] content) {
this(seqid, format, 0, userid, groupid, topic, resptopic, content);
} }
public MessageRecord(String topic, String resptopic, Convert convert, Object bean) { public MessageRecord(String topic, String resptopic, Convert convert, Object bean) {
this(0, null, topic, resptopic, convert, bean); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
} }
public MessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
this(userid, null, topic, resptopic, convert, bean); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
} }
public MessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
this(0, userid, groupid, topic, resptopic, convert, bean); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
} }
public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
this(System.nanoTime(), convert.getFactory().getConvertType(), flag, userid, groupid, topic, resptopic, convert.convertToBytes(bean)); this(System.nanoTime(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
} }
public MessageRecord(long seqid, ConvertType format, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { public MessageRecord(String topic, String resptopic, byte[] content) {
this(seqid, 1, format, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content);
} }
public MessageRecord(long seqid, int version, ConvertType format, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { public MessageRecord(long seqid, String topic, String resptopic, byte[] content) {
this(seqid, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content);
}
public MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
this(seqid, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content);
}
public MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) {
this.seqid = seqid; this.seqid = seqid;
this.version = version; this.version = version;
this.format = format;
this.flag = flag; this.flag = flag;
this.createtime = createtime; this.createtime = createtime;
this.userid = userid; this.userid = userid;
@@ -171,11 +161,6 @@ public class MessageRecord implements Serializable {
return this; return this;
} }
public MessageRecord format(ConvertType format) {
this.format = format;
return this;
}
public MessageRecord flag(int flag) { public MessageRecord flag(int flag) {
this.flag = flag; this.flag = flag;
return this; return this;
@@ -232,14 +217,6 @@ public class MessageRecord implements Serializable {
this.version = version; this.version = version;
} }
public ConvertType getFormat() {
return format;
}
public void setFormat(ConvertType format) {
this.format = format;
}
public int getFlag() { public int getFlag() {
return flag; return flag;
} }
@@ -302,19 +279,18 @@ public class MessageRecord implements Serializable {
StringBuilder sb = new StringBuilder(128); StringBuilder sb = new StringBuilder(128);
sb.append("{\"seqid\":").append(this.seqid); sb.append("{\"seqid\":").append(this.seqid);
sb.append(",\"version\":").append(this.version); sb.append(",\"version\":").append(this.version);
if (this.format != null) sb.append(",\"format\":\"").append(this.format).append("\"");
if (this.flag != 0) sb.append(",\"flag\":").append(this.flag); if (this.flag != 0) sb.append(",\"flag\":").append(this.flag);
if (this.createtime != 0) sb.append(",\"createtime\":").append(this.createtime); if (this.createtime != 0) sb.append(",\"createtime\":").append(this.createtime);
if (this.userid != 0) sb.append(",\"userid\":").append(this.userid); if (this.userid != 0) sb.append(",\"userid\":").append(this.userid);
if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\""); if (this.groupid != null) sb.append(",\"groupid\":\"").append(this.groupid).append("\"");
if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\""); if (this.topic != null) sb.append(",\"topic\":\"").append(this.topic).append("\"");
if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\""); if (this.resptopic != null) sb.append(",\"resptopic\":\"").append(this.resptopic).append("\"");
if (this.content != null) sb.append(",\"content\":").append(this.format == ConvertType.JSON ? ("\"" + new String(this.content, StandardCharsets.UTF_8) + "\"") : JsonConvert.root().convertTo(this.content)); if (this.content != null) sb.append(",\"content\":").append(new String(this.content, StandardCharsets.UTF_8)).append("\"");
sb.append("}"); sb.append("}");
return sb.toString(); return sb.toString();
} }
// public static void main(String[] args) throws Throwable { // public static void main(String[] args) throws Throwable {
// System.out.println(new MessageRecord(333, ConvertType.JSON, 2, 3, null, "tt", null, "xxx".getBytes())); // System.out.println(new MessageRecord(333, 2, 3, null, "tt", null, "xxx".getBytes()));
// } // }
} }

View File

@@ -6,7 +6,6 @@
package org.redkale.mq; package org.redkale.mq;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.redkale.convert.ConvertType;
/** /**
* MessageRecord的MessageCoder实现 * MessageRecord的MessageCoder实现
@@ -32,12 +31,11 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
byte[] stopics = MessageCoder.getBytes(data.getTopic()); byte[] stopics = MessageCoder.getBytes(data.getTopic());
byte[] dtopics = MessageCoder.getBytes(data.getResptopic()); byte[] dtopics = MessageCoder.getBytes(data.getResptopic());
byte[] groupid = MessageCoder.getBytes(data.getGroupid()); byte[] groupid = MessageCoder.getBytes(data.getGroupid());
int count = 8 + 4 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); int count = 8 + 4 + 4 + 8 + 4 + 2 + stopics.length + 2 + dtopics.length + 2 + groupid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length);
final byte[] bs = new byte[count]; final byte[] bs = new byte[count];
ByteBuffer buffer = ByteBuffer.wrap(bs); ByteBuffer buffer = ByteBuffer.wrap(bs);
buffer.putLong(data.getSeqid()); buffer.putLong(data.getSeqid());
buffer.putInt(data.getVersion()); buffer.putInt(data.getVersion());
buffer.putInt(data.getFormat() == null ? 0 : data.getFormat().getValue());
buffer.putInt(data.getFlag()); buffer.putInt(data.getFlag());
buffer.putLong(data.getCreatetime()); buffer.putLong(data.getCreatetime());
buffer.putInt(data.getUserid()); buffer.putInt(data.getUserid());
@@ -62,7 +60,6 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
ByteBuffer buffer = ByteBuffer.wrap(data); ByteBuffer buffer = ByteBuffer.wrap(data);
long seqid = buffer.getLong(); long seqid = buffer.getLong();
int version = buffer.getInt(); int version = buffer.getInt();
ConvertType format = ConvertType.find(buffer.getInt());
int flag = buffer.getInt(); int flag = buffer.getInt();
long createtime = buffer.getLong(); long createtime = buffer.getLong();
int userid = buffer.getInt(); int userid = buffer.getInt();
@@ -77,8 +74,7 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
content = new byte[contentlen]; content = new byte[contentlen];
buffer.get(content); buffer.get(content);
} }
return new MessageRecord(seqid, version, format, flag, return new MessageRecord(seqid, version, flag, createtime, userid, groupid, topic, resptopic, content);
createtime, userid, groupid, topic, resptopic, content);
} }
} }

View File

@@ -7,7 +7,6 @@ package org.redkale.mq;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.redkale.convert.ConvertType;
/** /**
* *
@@ -23,7 +22,6 @@ public class SncpMessageClient extends MessageClient {
protected SncpMessageClient(MessageAgent messageAgent) { protected SncpMessageClient(MessageAgent messageAgent) {
super(messageAgent); super(messageAgent);
this.respTopic = messageAgent.generateSncpRespTopic(); this.respTopic = messageAgent.generateSncpRespTopic();
this.convertType = ConvertType.BSON;
} }
@Override @Override

View File

@@ -6,7 +6,6 @@
package org.redkale.mq; package org.redkale.mq;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.redkale.convert.ConvertType;
import org.redkale.convert.bson.BsonWriter; import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response; import org.redkale.net.Response;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
@@ -50,12 +49,12 @@ public class SncpMessageResponse extends SncpResponse {
if (out == null) { if (out == null) {
final byte[] result = new byte[SncpRequest.HEADER_SIZE]; final byte[] result = new byte[SncpRequest.HEADER_SIZE];
fillHeader(ByteBuffer.wrap(result), 0, retcode); fillHeader(ByteBuffer.wrap(result), 0, retcode);
producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, (byte[]) null)); producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, (byte[]) null));
return; return;
} }
final int respBodyLength = out.count(); //body总长度 final int respBodyLength = out.count(); //body总长度
final byte[] result = out.toArray(); final byte[] result = out.toArray();
fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode); fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode);
producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, result)); producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, result));
} }
} }

View File

@@ -110,18 +110,10 @@ public class HttpRequest extends Request<HttpContext> {
if (req != null) { if (req != null) {
this.rpc = req.rpc; this.rpc = req.rpc;
if (req.getBody() != null) this.array.write(req.getBody()); if (req.getBody() != null) this.array.write(req.getBody());
if (req.getHeaders() != null) { if (req.getHeaders() != null) this.headers.putAll(req.getHeaders());
this.headers.putAll(req.getHeaders()); this.frombody = req.isFrombody();
if (this.headers.containsKey(Rest.REST_HEADER_PARAM_FROM_BODY)) { this.reqConvert = req.getReqConvertType() == null ? null : ConvertFactory.findConvert(req.getReqConvertType());
this.frombody = "true".equals(this.headers.get(Rest.REST_HEADER_RESP_CONVERT_TYPE)); this.respConvert = req.getRespConvertType() == null ? null : ConvertFactory.findConvert(req.getRespConvertType());
}
if (this.headers.containsKey(Rest.REST_HEADER_RESP_CONVERT_TYPE)) {
this.respConvert = ConvertFactory.findConvert(ConvertType.valueOf(this.headers.get(Rest.REST_HEADER_RESP_CONVERT_TYPE)));
}
if (this.headers.containsKey(Rest.REST_HEADER_REQ_CONVERT_TYPE)) {
this.reqConvert = ConvertFactory.findConvert(ConvertType.valueOf(this.headers.get(Rest.REST_HEADER_REQ_CONVERT_TYPE)));
}
}
if (req.getParams() != null) this.params.putAll(req.getParams()); if (req.getParams() != null) this.params.putAll(req.getParams());
if (req.getCurrentUserid() != 0) this.currentUserid = req.getCurrentUserid(); if (req.getCurrentUserid() != 0) this.currentUserid = req.getCurrentUserid();
this.contentType = req.getContentType(); this.contentType = req.getContentType();

View File

@@ -30,37 +30,49 @@ public class HttpSimpleRequest implements java.io.Serializable {
protected boolean rpc = true; protected boolean rpc = true;
@ConvertColumn(index = 2) @ConvertColumn(index = 2)
@Comment("是否从body中获取参数比如protobuf数据格式")
protected boolean frombody;
@ConvertColumn(index = 3)
@Comment("请求参数的ConvertType")
protected ConvertType reqConvertType;
@ConvertColumn(index = 4)
@Comment("输出结果的ConvertType")
protected ConvertType respConvertType;
@ConvertColumn(index = 5)
@Comment("请求的URI") @Comment("请求的URI")
protected String requestURI; protected String requestURI;
@ConvertColumn(index = 3) @ConvertColumn(index = 6)
@Comment("请求的前缀") @Comment("请求的前缀")
protected String path; protected String path;
@ConvertColumn(index = 4) @ConvertColumn(index = 7)
@Comment("客户端IP") @Comment("客户端IP")
protected String remoteAddr; protected String remoteAddr;
@ConvertColumn(index = 5) @ConvertColumn(index = 8)
@Comment("会话ID") @Comment("会话ID")
protected String sessionid; protected String sessionid;
@ConvertColumn(index = 6) @ConvertColumn(index = 9)
@Comment("Content-Type") @Comment("Content-Type")
protected String contentType; protected String contentType;
@ConvertColumn(index = 7) @ConvertColumn(index = 10)
protected int currentUserid; protected int currentUserid;
@ConvertColumn(index = 8) @ConvertColumn(index = 11)
@Comment("http header信息") @Comment("http header信息")
protected Map<String, String> headers; protected Map<String, String> headers;
@ConvertColumn(index = 9) @ConvertColumn(index = 12)
@Comment("参数信息") @Comment("参数信息")
protected Map<String, String> params; protected Map<String, String> params;
@ConvertColumn(index = 10) @ConvertColumn(index = 13)
@Comment("http body信息") @Comment("http body信息")
protected byte[] body; //对应HttpRequest.array protected byte[] body; //对应HttpRequest.array
@@ -105,6 +117,32 @@ public class HttpSimpleRequest implements java.io.Serializable {
return this; return this;
} }
public HttpSimpleRequest requestURI(boolean frombody) {
this.frombody = frombody;
return this;
}
public HttpSimpleRequest frombody(boolean frombody) {
this.frombody = frombody;
return this;
}
public HttpSimpleRequest bothConvertType(ConvertType convertType) {
this.reqConvertType = convertType;
this.respConvertType = convertType;
return this;
}
public HttpSimpleRequest reqConvertType(ConvertType reqConvertType) {
this.reqConvertType = reqConvertType;
return this;
}
public HttpSimpleRequest respConvertType(ConvertType respConvertType) {
this.respConvertType = respConvertType;
return this;
}
public HttpSimpleRequest remoteAddr(String remoteAddr) { public HttpSimpleRequest remoteAddr(String remoteAddr) {
this.remoteAddr = remoteAddr; this.remoteAddr = remoteAddr;
return this; return this;
@@ -309,6 +347,30 @@ public class HttpSimpleRequest implements java.io.Serializable {
this.body = body; this.body = body;
} }
public boolean isFrombody() {
return frombody;
}
public void setFrombody(boolean frombody) {
this.frombody = frombody;
}
public ConvertType getReqConvertType() {
return reqConvertType;
}
public void setReqConvertType(ConvertType reqConvertType) {
this.reqConvertType = reqConvertType;
}
public ConvertType getRespConvertType() {
return respConvertType;
}
public void setRespConvertType(ConvertType respConvertType) {
this.respConvertType = respConvertType;
}
@Override @Override
public String toString() { public String toString() {
return JsonConvert.root().convertTo(this); return JsonConvert.root().convertTo(this);

View File

@@ -15,7 +15,6 @@ import java.util.concurrent.*;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.redkale.convert.ConvertType;
import org.redkale.convert.bson.*; import org.redkale.convert.bson.*;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.mq.*; import org.redkale.mq.*;
@@ -288,7 +287,7 @@ public final class SncpClient {
fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength);
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
if (targetTopic == null) targetTopic = this.topic; if (targetTopic == null) targetTopic = this.topic;
MessageRecord message = new MessageRecord(ConvertType.BSON, targetTopic, null, reqbytes); MessageRecord message = new MessageRecord(targetTopic, null, reqbytes);
return messageClient.sendMessage(message).thenApply(msg -> { return messageClient.sendMessage(message).thenApply(msg -> {
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
checkResult(seqid, action, buffer); checkResult(seqid, action, buffer);