SncpClient优化

This commit is contained in:
redkale
2023-03-23 11:56:13 +08:00
parent f34554f2cb
commit 80bad30811
20 changed files with 213 additions and 188 deletions

View File

@@ -7,10 +7,6 @@ redkale.lib = ./
redkale.executor.threads = 4
redkale.executor.hash = false
#\u3010transport\u8282\u70b9\u5168\u5c40\u552f\u4e00\u3011
redkale.transport.bufferCapacity = 32k
redkale.transport.bufferPoolSize = 32
#\u3010excludelibs\u8282\u70b9\u5168\u5c40\u552f\u4e00\u3011
redkale.excludelibs.value = ^.*mysql.*$;^.*kafka.*$

View File

@@ -42,18 +42,6 @@
-->
<executor threads="4" hash="false"/>
<!--
【节点全局唯一】
transport节点只能有一个用于配置所有Transport的池参数没配置该节点将自动创建一个。
threads 线程总数, 默认: <group>节点数*CPU核数*2
bufferCapacity: ByteBuffer的初始化大小 默认: 32K;
bufferPoolSize ByteBuffer池的大小默认: 线程总数*4
readTimeoutSeconds: TCP读取超时秒数, 默认为6秒 为0表示无超时限制
writeTimeoutSeconds: TCP写入超时秒数, 默认为6秒 为0表示无超时限制
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
-->
<transport bufferCapacity="32K" bufferPoolSize="32" threads="32" readTimeoutSeconds="6" writeTimeoutSeconds="6"/>
<!--
【节点全局唯一】
自动扫描时排除部分包路径
@@ -127,9 +115,6 @@
load: 加载文件,多个用;隔开。
其他属性: 供org.redkale.boot.PropertiesAgentProvider使用判断
默认置入的system.property.的有:
System.setProperty("redkale.net.transport.poolmaxconns", "100");
System.setProperty("redkale.net.transport.pinginterval", "30");
System.setProperty("redkale.net.transport.checkinterval", "30");
System.setProperty("redkale.convert.pool.size", "128");
System.setProperty("redkale.convert.writer.buffer.defsize", "4096");
System.setProperty("redkale.trace.enable", "false");

View File

@@ -303,15 +303,6 @@ public final class Application {
}
}
//设置默认系统变量
if (System.getProperty("redkale.net.transport.poolmaxconns") == null) {
System.setProperty("redkale.net.transport.poolmaxconns", "100");
}
if (System.getProperty("redkale.net.transport.pinginterval") == null) {
System.setProperty("redkale.net.transport.pinginterval", "30");
}
if (System.getProperty("redkale.net.transport.checkinterval") == null) {
System.setProperty("redkale.net.transport.checkinterval", "30");
}
if (System.getProperty("redkale.convert.pool.size") == null) {
System.setProperty("redkale.convert.pool.size", "128");
}

View File

@@ -928,9 +928,6 @@ public abstract class NodeServer {
if ("executor".equals(key)) {
return AnyValue.MergeFunction.REPLACE;
}
if ("transport".equals(key)) {
return AnyValue.MergeFunction.REPLACE;
}
if ("excludelibs".equals(key)) {
return AnyValue.MergeFunction.REPLACE;
}

View File

@@ -74,15 +74,6 @@ public class BsonWriter extends Writer implements ByteTuple {
return new ByteArray(this);
}
public byte[] toArray() {
if (count == content.length) {
return content;
}
byte[] newdata = new byte[count];
System.arraycopy(content, 0, newdata, 0, count);
return newdata;
}
public ByteBuffer[] toBuffers() {
return new ByteBuffer[]{ByteBuffer.wrap(content, 0, count)};
}

View File

@@ -276,7 +276,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void completeInIOThread(boolean kill) {
if (!this.inited) {
return; //避免重复关闭
} //System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
}
//System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
if (kill) {
refuseAlive();
}
@@ -300,7 +301,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
new ProtocolCodec(context, poolSupplier, poolConsumer, conn).response(this).run(null);
}
} else {
new Exception().printStackTrace();
this.responseConsumer.accept(this);
}
}

View File

@@ -23,7 +23,7 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.util.*;
/**
* 传输客户端
* 被net.client模块代替
*
* <p>
* 详情见: https://redkale.org

View File

@@ -19,8 +19,7 @@ import org.redkale.service.Service;
import org.redkale.util.*;
/**
* System.getProperty("redkale.net.transport.ping.interval", "30") 心跳周期默认30秒
* System.getProperty("redkale.net.transport.check.interval", "30") 检查不可用地址周期默认30秒
* 被net.client模块代替
*
* <p>
* 详情见: https://redkale.org

View File

@@ -12,7 +12,7 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility;
/**
* 协议地址组合对象, 对应application.xml 中 resources-&#62;group 节点信息
* 被net.client模块代替
*
* <p>
* 详情见: https://redkale.org

View File

@@ -9,7 +9,7 @@ import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
/**
* 远程请求的负载均衡策略
* 被net.client模块代替
*
* <p>
* 详情见: https://redkale.org

View File

@@ -69,7 +69,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
for (ClientResponse<R, P> cr : respResults) {
connection.doneResponseCounter.increment();
if (cr.isError()) {
connection.dispose(null);
connection.dispose(cr.exc);
return;
} else {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());

View File

@@ -3,16 +3,10 @@
*/
package org.redkale.net.sncp;
import java.lang.reflect.Type;
import java.net.*;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import org.redkale.convert.*;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.net.*;
import org.redkale.net.client.*;
import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction;
import org.redkale.util.*;
/**
* SNCP版Client, 一个SncpServer只能对应一个SncpClient
@@ -26,7 +20,7 @@ import org.redkale.util.*;
*/
public class SncpClient extends Client<SncpClientConnection, SncpClientRequest, SncpClientResult> {
private final InetSocketAddress clientSncpAddress;
final InetSocketAddress clientSncpAddress;
public SncpClient(String name, AsyncGroup group, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) {
super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns
@@ -47,91 +41,9 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
return super.connect(addr);
}
//只给远程模式调用的
public <T> T remote(final SncpRemoteInfo info, final int index, final Object[] params) {
final Convert convert = info.convert;
final SncpRemoteAction action = info.actions[index];
CompletionHandler callbackHandler = null;
Object callbackHandlerAttach = null;
if (action.paramHandlerIndex >= 0) {
callbackHandler = (CompletionHandler) params[action.paramHandlerIndex];
params[action.paramHandlerIndex] = null;
if (action.paramHandlerAttachIndex >= 0) {
callbackHandlerAttach = params[action.paramHandlerAttachIndex];
params[action.paramHandlerAttachIndex] = null;
}
}
final CompletableFuture<byte[]> future = remote(info, action, convert, Traces.currTraceid(), params);
if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler
final CompletionHandler handler = callbackHandler;
final Object attach = callbackHandlerAttach;
if (handler == null) { //传入的CompletionHandler参数为null
future.join();
} else {
future.whenComplete((v, t) -> {
if (t == null) {
//v,length-1为了读掉(byte)0
handler.completed(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v, 1, v.length - 1), attach);
} else {
handler.failed(t, attach);
}
});
}
} else if (action.returnFutureClass != null) { //返回类型为CompletableFuture
if (action.returnFutureClass == CompletableFuture.class) {
//v,length-1为了读掉(byte)0
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1));
} else {
final CompletableFuture returnFuture = action.returnFutureCreator.create();
future.whenComplete((v, t) -> {
if (t == null) {
//v,length-1为了读掉(byte)0
returnFuture.complete(v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1));
} else {
returnFuture.completeExceptionally(t);
}
});
return (T) returnFuture;
}
} else if (action.returnObjectType != null) { //返回类型为JavaBean
//v,length-1为了读掉(byte)0
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.returnObjectType, v, 1, v.length - 1)).join();
} else { //返回类型为void
future.join();
}
return null;
@Override
protected CompletableFuture<SncpClientResult> writeChannel(ClientConnection conn, SncpClientRequest request) {
return super.writeChannel(conn, request);
}
private CompletableFuture<byte[]> remote(
final SncpRemoteInfo info,
final SncpRemoteAction action,
final Convert convert,
final String traceid,
final Object[] params) {
final Type[] myParamTypes = action.paramTypes;
final Class[] myParamClass = action.paramClasses;
if (action.paramAddressSourceIndex >= 0) {
params[action.paramAddressSourceIndex] = this.clientSncpAddress;
}
final long seqid = System.nanoTime();
final SncpClientRequest requet = new SncpClientRequest();
Writer writer = null;
if (myParamTypes.length > 0) {
writer = convert.pollWriter();
for (int i = 0; i < params.length; i++) { //service方法的参数
Convert bcc = convert;
if (params[i] instanceof org.redkale.service.RetResult) {
org.redkale.convert.Convert cc = ((org.redkale.service.RetResult) params[i]).convert();
if (cc instanceof BsonConvert) {
bcc = (BsonConvert) cc;
}
}
bcc.convertTo(writer, CompletionHandler.class.isAssignableFrom(myParamClass[i]) ? CompletionHandler.class : myParamTypes[i], params[i]);
}
}
requet.prepare(action.header, seqid, traceid, (ByteTuple) writer);
//writer没有回收待优化
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : info.nextRemoteAddress();
return super.connect(addr).thenCompose(conn -> writeChannel(conn, requet).thenApply(rs -> rs.getBodyContent()));
}
}

View File

@@ -72,6 +72,9 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
occurError(null, new SncpException("sncp header length must be " + HEADER_SIZE + ", but " + headerSize)); //request不一定存在
return;
}
//if (halfHeaderBytes.length() != HEADER_SIZE) {
// logger.log(Level.SEVERE, "halfHeaderBytes.length must be " + HEADER_SIZE + ", but " + halfHeaderBytes.length());
//}
halfHeaderBytes = null;
if (result.getBodyLength() < 1) {
addMessage(findRequest(result.getRequestid()), result);
@@ -90,6 +93,9 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfBodyBytes.length());
//读取完整body
lastResult.setBodyContent(halfBodyBytes.getBytes());
//if (halfBodyBytes.length() != lastResult.getBodyLength()) {
// logger.log(Level.SEVERE, "halfBodyBytes.length must be " + lastResult.getBodyLength() + ", but " + halfBodyBytes.length());
//}
halfBodyBytes = null;
addMessage(findRequest(lastResult.getRequestid()), lastResult);
lastResult = null;
@@ -100,8 +106,12 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
halfBodyBytes.put(buffer);
return;
}
//有足够的数据读取完整body
lastResult.readBody(buffer);
halfBodyBytes = null;
//if (lastResult.getBodyContent().length != lastResult.getBodyLength()) {
// logger.log(Level.SEVERE, "lastResult.length must be " + lastResult.getBodyLength() + ", but " + lastResult.getBodyContent().length);
//}
addMessage(findRequest(lastResult.getRequestid()), lastResult);
lastResult = null;
continue;

View File

@@ -6,7 +6,7 @@ package org.redkale.net.sncp;
import java.io.Serializable;
import java.util.Objects;
import org.redkale.net.client.*;
import org.redkale.util.*;
import org.redkale.util.ByteArray;
/**
* client版请求
@@ -24,12 +24,12 @@ public class SncpClientRequest extends ClientRequest {
private long seqid;
private ByteTuple bodyContent;
private byte[] bodyContent;
public SncpClientRequest() {
}
public SncpClientRequest prepare(SncpHeader header, long seqid, String traceid, ByteTuple bodyContent) {
public SncpClientRequest prepare(SncpHeader header, long seqid, String traceid, byte[] bodyContent) {
super.prepare();
this.header = header;
this.seqid = seqid;
@@ -55,14 +55,14 @@ public class SncpClientRequest extends ClientRequest {
public Serializable getRequestid() {
return seqid;
}
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
array.putPlaceholder(SncpHeader.HEADER_SIZE);
if (bodyContent == null || bodyContent.length() == 0) {
if (bodyContent == null) {
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, 0, 0);
} else {
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, bodyContent.length(), 0);
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, bodyContent.length, 0);
array.put(bodyContent);
}
}
@@ -71,7 +71,7 @@ public class SncpClientRequest extends ClientRequest {
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"
+ "header=" + header + ", seqid =" + seqid
+ ", body=[" + (bodyContent == null ? -1 : bodyContent.length()) + "]"
+ ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]"
+ "}";
}
@@ -83,7 +83,7 @@ public class SncpClientRequest extends ClientRequest {
return seqid;
}
public ByteTuple getBodyContent() {
public byte[] getBodyContent() {
return bodyContent;
}

View File

@@ -55,8 +55,12 @@ public class SncpHeader {
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
this.serviceid = serviceid;
this.actionid = actionid;
if (addrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + addrBytes.length);
}
}
//返回Header Size
public int read(ByteBuffer buffer) {
this.seqid = buffer.getLong(); //8
int size = buffer.getChar();
@@ -76,6 +80,7 @@ public class SncpHeader {
return size;
}
//返回Header Size
public int read(ByteArray array) {
int offset = 0;
this.seqid = array.getLong(offset); //8
@@ -96,7 +101,7 @@ public class SncpHeader {
this.abilities = array.getInt(offset); //4
offset += 4;
this.timestamp = array.getLong(offset); //8
offset += 4;
offset += 8;
this.retcode = array.getInt(offset); //4
offset += 4;
this.bodyLength = array.getInt(offset); //4
@@ -110,6 +115,9 @@ public class SncpHeader {
}
public ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) {
if (newAddrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length);
}
int offset = 0;
array.putLong(offset, newSeqid); //8
offset += 8;
@@ -149,7 +157,7 @@ public class SncpHeader {
}
public InetSocketAddress getAddress() {
if (addrBytes == null || addrBytes[0] == 0) {
if (addrBytes == null) {
return null;
}
return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]), addrPort);

View File

@@ -6,12 +6,16 @@ package org.redkale.net.sncp;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import org.redkale.convert.Convert;
import java.util.logging.*;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.*;
import static org.redkale.net.sncp.Sncp.loadMethodActions;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -26,7 +30,9 @@ import org.redkale.util.*;
*
* @since 2.8.0
*/
public final class SncpRemoteInfo<T extends Service> {
public class SncpRemoteInfo<T extends Service> {
protected static final Logger logger = Logger.getLogger(SncpRemoteInfo.class.getSimpleName());
protected final String name;
@@ -34,40 +40,43 @@ public final class SncpRemoteInfo<T extends Service> {
protected final Uint128 serviceid;
protected final String resourceid;
protected final int serviceVersion;
protected final SncpRemoteAction[] actions;
//MQ模式下此字段才有值
protected final String topic;
//默认值: BsonConvert.root()
protected final Convert convert;
//非MQ模式下此字段才有值
protected final SncpRpcGroups sncpRpcGroups;
//非MQ模式下此字段才有值
protected final SncpClient sncpClient;
//非MQ模式下此字段才有值, 可能为null
protected String remoteGroup;
//非MQ模式下此字段才有值, 可能为null
protected Set<InetSocketAddress> remoteAddresses;
//默认值: BsonConvert.root()
protected final Convert convert;
//MQ模式下此字段才有值
protected final String topic;
//MQ模式下此字段才有值
protected final MessageAgent messageAgent;
//MQ模式下此字段才有值
protected final SncpMessageClient messageClient;
//MQ模式下此字段才有值, 可能为null
protected String remoteGroup;
//MQ模式下此字段才有值, 可能为null
protected Set<InetSocketAddress> remoteAddresses;
SncpRemoteInfo(String resourceName, Class<T> resourceServiceType, Class<T> serviceImplClass, Convert convert,
SncpRemoteInfo(String resourceName, Class<T> resourceType, Class<T> serviceImplClass, Convert convert,
SncpRpcGroups sncpRpcGroups, SncpClient sncpClient, MessageAgent messageAgent, String remoteGroup) {
Objects.requireNonNull(sncpRpcGroups);
this.name = resourceName;
this.serviceType = resourceServiceType;
this.serviceid = Sncp.serviceid(resourceName, resourceServiceType);
this.serviceType = resourceType;
this.resourceid = Sncp.resourceid(resourceName, resourceType);
this.serviceid = Sncp.serviceid(resourceName, resourceType);
this.convert = convert;
this.serviceVersion = 0;
this.sncpRpcGroups = sncpRpcGroups;
@@ -75,16 +84,149 @@ public final class SncpRemoteInfo<T extends Service> {
this.messageAgent = messageAgent;
this.remoteGroup = remoteGroup;
this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient();
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(resourceName, resourceServiceType);
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(resourceName, resourceType);
final List<SncpRemoteAction> serviceActions = new ArrayList<>();
for (Map.Entry<Uint128, Method> en : loadMethodActions(resourceServiceType).entrySet()) {
for (Map.Entry<Uint128, Method> en : loadMethodActions(resourceType).entrySet()) {
serviceActions.add(new SncpRemoteAction(serviceImplClass, en.getValue(), serviceid, en.getKey()));
}
this.actions = serviceActions.toArray(new SncpRemoteAction[serviceActions.size()]);
}
public InetSocketAddress nextRemoteAddress() {
//由远程模式的DyncRemoveService调用
public <T> T remote(final int index, final Object... params) {
final SncpRemoteAction action = this.actions[index];
CompletionHandler callbackHandler = null;
Object callbackHandlerAttach = null;
if (action.paramHandlerIndex >= 0) {
callbackHandler = (CompletionHandler) params[action.paramHandlerIndex];
params[action.paramHandlerIndex] = null;
if (action.paramHandlerAttachIndex >= 0) {
callbackHandlerAttach = params[action.paramHandlerAttachIndex];
params[action.paramHandlerAttachIndex] = null;
}
}
final CompletableFuture<byte[]> future = remote(action, Traces.currTraceid(), params);
if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler
final CompletionHandler handler = callbackHandler;
final Object attach = callbackHandlerAttach;
if (handler == null) { //传入的CompletionHandler参数为null
future.join();
} else {
future.whenComplete((v, t) -> {
if (t == null) {
//v,length-1为了读掉(byte)0
handler.completed(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v, 1, v.length - 1), attach);
} else {
handler.failed(t, attach);
}
});
}
} else if (action.returnFutureClass != null) { //返回类型为CompletableFuture
if (action.returnFutureClass == CompletableFuture.class) {
//v,length-1为了读掉(byte)0
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1));
} else {
final CompletableFuture returnFuture = action.returnFutureCreator.create();
future.whenComplete((v, t) -> {
if (t == null) {
//v,length-1为了读掉(byte)0
returnFuture.complete(v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1));
} else {
returnFuture.completeExceptionally(t);
}
});
return (T) returnFuture;
}
} else if (action.returnObjectType != null) { //返回类型为JavaBean
//v,length-1为了读掉(byte)0
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.returnObjectType, v, 1, v.length - 1)).join();
} else { //返回类型为void
future.join();
}
return null;
}
private CompletableFuture<byte[]> remote(final SncpRemoteAction action, final String traceid, final Object[] params) {
if (messageAgent != null) {
return remoteMessage(action, traceid, params);
} else {
return remoteClient(action, traceid, params);
}
}
//MQ模式RPC
protected CompletableFuture<byte[]> remoteMessage(final SncpRemoteAction action, final String traceid, final Object[] params) {
final SncpClientRequest request = createSncpClientRequest(action, this.sncpClient.clientSncpAddress, traceid, params);
String targetTopic = action.paramTopicTargetIndex >= 0 ? (String) params[action.paramTopicTargetIndex] : this.topic;
if (targetTopic == null) {
targetTopic = this.topic;
}
ByteArray array = new ByteArray();
request.writeTo(null, array);
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, array.getBytes());
final String tt = targetTopic;
if (logger.isLoggable(Level.FINER)) {
message.attach(Utility.append(new Object[]{action.actionName()}, params));
} else {
message.attach(params);
}
return messageClient.sendMessage(message).thenApply(msg -> {
if (msg == null || msg.getContent() == null) {
logger.log(Level.SEVERE, action.method + " sncp mq(params: " + JsonConvert.root().convertTo(params) + ", message: " + message + ") deal error, this.topic = " + this.topic + ", targetTopic = " + tt + ", result = " + msg);
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
SncpHeader header = new SncpHeader();
int headerSize = header.read(buffer);
if (headerSize != HEADER_SIZE) {
throw new SncpException("sncp header length must be " + HEADER_SIZE + ", but is " + headerSize);
}
if (!header.checkValid(action.header)) {
throw new SncpException("sncp header error, response-header:" + action.header + "+, response-header:" + header);
}
final int retcode = header.getRetcode();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + JsonConvert.root().convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
final int respBodyLength = header.getBodyLength();
byte[] body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
return body;
});
}
//Client模式RPC
protected CompletableFuture<byte[]> remoteClient(final SncpRemoteAction action, final String traceid, final Object[] params) {
final SncpClient client = this.sncpClient;
final SncpClientRequest request = createSncpClientRequest(action, client.clientSncpAddress, traceid, params);
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress();
return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent()));
}
protected SncpClientRequest createSncpClientRequest(final SncpRemoteAction action, final InetSocketAddress clientSncpAddress, final String traceid, final Object[] params) {
final Type[] myParamTypes = action.paramTypes;
final Class[] myParamClass = action.paramClasses;
if (action.paramAddressSourceIndex >= 0) {
params[action.paramAddressSourceIndex] = clientSncpAddress;
}
final long seqid = System.nanoTime();
byte[] body = null;
if (myParamTypes.length > 0) {
Writer writer = convert.pollWriter();
for (int i = 0; i < params.length; i++) { //service方法的参数
convert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myParamClass[i]) ? CompletionHandler.class : myParamTypes[i], params[i]);
}
body = ((ByteTuple) writer).toArray();
convert.offerWriter(writer);
}
final SncpClientRequest requet = new SncpClientRequest();
requet.prepare(action.header, seqid, traceid, body);
return requet;
}
protected InetSocketAddress nextRemoteAddress() {
SncpRpcGroup srg = sncpRpcGroups.getSncpRpcGroup(remoteGroup);
if (srg != null) {
Set<InetSocketAddress> addrs = srg.getAddresses();
@@ -98,19 +240,6 @@ public final class SncpRemoteInfo<T extends Service> {
throw new SncpException("Not found SocketAddress by remoteGroup = " + remoteGroup);
}
//由远程模式的DyncRemoveService调用
public <T> T remote(final int index, final Object... params) {
if (messageAgent != null) {
return remoteMessage(index, params);
} else {
return sncpClient.remote(this, index, params);
}
}
private <T> T remoteMessage(final int index, final Object[] params) {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
InetSocketAddress clientSncpAddress = sncpClient == null ? null : sncpClient.getClientSncpAddress();

View File

@@ -5,6 +5,8 @@
*/
package org.redkale.util;
import java.util.Arrays;
/**
* 简单的byte[]数据接口。
*
@@ -21,4 +23,9 @@ public interface ByteTuple {
public int offset();
public int length();
default byte[] toArray() {
int o = offset();
return Arrays.copyOfRange(content(), o, o + length());
}
}

View File

@@ -49,11 +49,11 @@ public class SncpClientCodecTest {
SncpHeader header = new SncpHeader(sncpAddress, Uint128.ZERO, Uint128.ZERO);
SncpClientRequest request = new SncpClientRequest();
ByteArray writeArray = new ByteArray();
request.prepare(header, 1, "", new ByteArray().putPlaceholder(20));
request.prepare(header, 1, "", new byte[20]);
System.out.println("request.1 = " + request);
writeArray.put(new byte[SncpHeader.HEADER_SIZE]);
request.writeTo(conn, writeArray);
request.prepare(header, 2, "", new ByteArray().putPlaceholder(25));
request.prepare(header, 2, "", new byte[25]);
System.out.println("request.2 = " + request);
writeArray.put(new byte[SncpHeader.HEADER_SIZE]);
request.writeTo(conn, writeArray);

View File

@@ -28,7 +28,7 @@ public class SncpTest {
private static int port2 = 4240;
private static final String protocol = "SNCP.UDP";
private static final String protocol = "SNCP.TCP"; // TCP UDP
private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192;
@@ -91,7 +91,7 @@ public class SncpTest {
System.out.println("bean " + callbean);
System.out.println("\r\n\r\n\r\n\r\n---------------------------------------------------");
Thread.sleep(200);
final int count = 10;
final int count = 40;
final CountDownLatch cld = new CountDownLatch(count);
final AtomicInteger ai = new AtomicInteger();
long s = System.currentTimeMillis();

View File

@@ -33,7 +33,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
try {
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + " sleep 200ms后运行了异步方法-----------queryResultAsync方法");
future.complete("异步result: " + bean);
future.complete("异步 result: " + bean);
} catch (Exception e) {
e.printStackTrace();
}