优化SncpClient

This commit is contained in:
redkale
2023-03-22 23:24:01 +08:00
parent 039ed0f569
commit e2f331ab6b
15 changed files with 127 additions and 246 deletions

View File

@@ -19,7 +19,6 @@ import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.*;
import javax.net.ssl.SSLContext;
import org.redkale.annotation.Resource;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.*;
@@ -160,9 +159,6 @@ public final class Application {
//NodeServer 资源, 顺序必须是sncps, others, watchs
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
//SNCP传输端的TransportFactory, 注意: 只给SNCP使用
private final TransportFactory sncpTransportFactory;
//配置项里的group信息, 注意: 只给SNCP使用
private final SncpRpcGroups sncpRpcGroups = new SncpRpcGroups();
@@ -452,33 +448,19 @@ public final class Application {
throw new RedkaleException(e);
}
//------------------------------------ 配置 <transport> 节点 ------------------------------------
TransportStrategy strategy = null;
String excludelib0 = null;
ClusterAgent cluster = null;
MessageAgent[] mqs = null;
int bufferCapacity = 32 * 1024;
int bufferPoolSize = Utility.cpus() * 8;
int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS;
int writeTimeoutSeconds = TransportFactory.DEFAULT_WRITETIMEOUTSECONDS;
int readTimeoutSeconds = 6;
int writeTimeoutSeconds = 6;
AnyValue executorConf = null;
executorConf = config.getAnyValue("executor");
AnyValue excludelibConf = config.getAnyValue("excludelibs");
if (excludelibConf != null) {
excludelib0 = excludelibConf.getValue("value");
}
AnyValue transportConf = config.getAnyValue("transport");
int groupSize = config.getAnyValues("group").length;
if (groupSize > 0 && transportConf == null) {
transportConf = new DefaultAnyValue();
}
if (transportConf != null) {
//--------------transportBufferPool-----------
bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 32 * 1024);
readTimeoutSeconds = transportConf.getIntValue("readTimeoutSeconds", readTimeoutSeconds);
writeTimeoutSeconds = transportConf.getIntValue("writeTimeoutSeconds", writeTimeoutSeconds);
final int threads = parseLenth(transportConf.getValue("threads"), groupSize * Utility.cpus() * 2);
bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), threads * 4);
}
AnyValue clusterConf = config.getAnyValue("cluster");
if (clusterConf != null) {
@@ -611,11 +593,6 @@ public final class Application {
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);
this.excludelibs = excludelib0;
this.sncpTransportFactory = TransportFactory.create(this.clientAsyncGroup, (SSLContext) null, Transport.DEFAULT_NETPROTOCOL, readTimeoutSeconds, writeTimeoutSeconds, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("redkale.net.transport.pool.maxconns", "100"))
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("redkale.net.transport.ping.interval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("redkale.net.transport.check.interval", "30"));
this.sncpTransportFactory.init(tarnsportConf, ByteBuffer.wrap(Sncp.getPingBytes()).asReadOnlyBuffer(), Sncp.getPongBytes().length);
this.clusterAgent = cluster;
this.messageAgents = mqs;
if (compileMode || this.classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader) {
@@ -1047,9 +1024,6 @@ public final class Application {
ResourceFactory rs = serv ? rf : (resName.isEmpty() ? application.resourceFactory : null);
field.set(srcObj, rs);
return rs;
} else if (type == TransportFactory.class) {
field.set(srcObj, application.sncpTransportFactory);
return application.sncpTransportFactory;
} else if (type == NodeSncpServer.class) {
NodeServer server = null;
for (NodeServer ns : application.getNodeServers()) {
@@ -1105,7 +1079,7 @@ public final class Application {
return false;
}
}, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
}, Application.class, ResourceFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
//------------------------------------ 注册 java.net.http.HttpClient ------------------------------------
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
@@ -1334,18 +1308,14 @@ public final class Application {
if (group.indexOf('$') >= 0) {
throw new RedkaleException("<group> name cannot contains '$' in " + group);
}
final String protocol = conf.getValue("protocol", Transport.DEFAULT_NETPROTOCOL).toUpperCase();
final String protocol = conf.getValue("protocol", "TCP").toUpperCase();
if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) {
throw new RedkaleException("Not supported Transport Protocol " + conf.getValue("protocol"));
}
SncpRpcGroup rg = sncpRpcGroups.computeIfAbsent(group, protocol);
TransportGroupInfo ginfo = new TransportGroupInfo(group, protocol, new LinkedHashSet<>());
for (AnyValue node : conf.getAnyValues("node")) {
final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"));
ginfo.putAddress(addr);
rg.putAddress(addr);
rg.putAddress(new InetSocketAddress(node.getValue("addr"), node.getIntValue("port")));
}
sncpTransportFactory.addGroupInfo(ginfo);
}
for (AnyValue conf : config.getAnyValues("listener")) {
final String listenClass = conf.getValue("value", "");
@@ -2515,7 +2485,6 @@ public final class Application {
this.clientAsyncGroup.dispose();
logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
}
this.sncpTransportFactory.shutdownNow();
long intms = System.currentTimeMillis() - f;
String ms = String.valueOf(intms);
@@ -2537,11 +2506,6 @@ public final class Application {
return resourceFactory;
}
@Deprecated
public TransportFactory getSncpTransportFactory2() {
return sncpTransportFactory;
}
public ClusterAgent getClusterAgent() {
return clusterAgent;
}

View File

@@ -1,39 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot.watch;
import org.redkale.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.*;
/**
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@RestService(name = "servlet", catalog = "watch", repair = false)
public class ServletWatchService extends AbstractWatchService {
@Resource
protected Application application;
@Resource
protected TransportFactory transportFactory;
//
// @RestMapping(name = "loadServlet", auth = false, comment = "动态增加Servlet")
// public RetResult loadServlet(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) {
// //待开发
// return RetResult.success();
// }
//
// @RestMapping(name = "stopServlet", auth = false, comment = "动态停止Servlet")
// public RetResult stopServlet(String type) {
// //待开发
// return RetResult.success();
// }
}

View File

@@ -19,7 +19,7 @@ import static org.redkale.boot.Application.*;
import org.redkale.convert.ConvertDisabled;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.MessageMultiConsumer;
import org.redkale.net.*;
import org.redkale.net.Server;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
@@ -407,7 +407,7 @@ public abstract class ClusterAgent {
this.address = addr;
this.serviceRef = new WeakReference(service);
Server server = ns.getServer();
this.netProtocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_NETPROTOCOL;
this.netProtocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : "TCP";
}
@Override

View File

@@ -37,6 +37,12 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
protected volatile long writeTime;
protected volatile boolean connectPending;
protected volatile boolean readPending;
protected volatile boolean writePending;
private Map<String, Object> attributes; //用于存储绑定在Connection上的对象集合
private Object subobject; //用于存储绑定在Connection上的对象 同attributes 只绑定单个对象时尽量使用subobject而非attributes
@@ -265,6 +271,20 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
public final void readInIOThreadSafe(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrReadThread()) {
if (!readPending) {
read(handler);
}
} else {
executeRead(() -> {
if (!readPending) {
read(handler);
}
});
}
}
//src写完才会回调
public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) {

View File

@@ -35,8 +35,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected CompletionHandler<Void, Object> connectCompletionHandler;
protected volatile boolean connectPending;
protected SelectionKey connectKey;
//-------------------------------- 读操作 --------------------------------------
@@ -48,8 +46,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
protected volatile boolean readPending;
protected SelectionKey readKey;
//-------------------------------- 写操作 --------------------------------------
@@ -88,8 +84,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected CompletionHandler<Integer, Object> writeCompletionHandler;
protected volatile boolean writePending;
protected SelectionKey writeKey;
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
@@ -157,7 +151,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
@Override
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler<Integer, Void> handler) {
public void write(byte[] headerContent, int headerOffset, int headerLength,
byte[] bodyContent, int bodyOffset, int bodyLength,
Consumer bodyCallback, Object bodyAttachment, CompletionHandler<Integer, Void> handler) {
if (sslEngine != null) {
super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler);
return;
@@ -289,6 +286,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
boolean hasRemain = true;
boolean writeCompleted = true;
if (direct) {
int batchOffset = writeOffset;
int batchLength = writeLength;
while (hasRemain) { //必须要将buffer写完为止
if (writeByteTuple1Array != null) {
final ByteBuffer buffer = pollWriteBuffer();
@@ -345,20 +344,24 @@ abstract class AsyncNioConnection extends AsyncConnection {
writeCount = implWrite(writeByteBuffer);
hasRemain = writeByteBuffer.hasRemaining();
} else {
writeCount = implWrite(writeByteBuffers, writeOffset, writeLength);
writeCount = implWrite(writeByteBuffers, batchOffset, batchLength);
boolean remain = false;
for (int i = writeByteBuffers.length - 1; i >= writeOffset; i--) {
if (writeByteBuffers[i].hasRemaining()) {
for (int i = 0; i < batchLength; i++) {
if (writeByteBuffers[batchOffset + i].hasRemaining()) {
remain = true;
batchOffset += i;
batchLength -= i;
break;
}
}
hasRemain = remain;
}
if (writeCount == 0) {
if (hasRemain) {
writeCompleted = false;
writeTotal = totalCount;
//writeCompleted = false;
//writeTotal = totalCount;
continue; //要全部输出完才返回
}
break;
} else if (writeCount < 0) {

View File

@@ -292,7 +292,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel();
if (conn != null && conn.protocolCodec != null) {
this.responseConsumer.accept(this);
conn.readInIOThread(conn.protocolCodec);
conn.readInIOThreadSafe(conn.protocolCodec);
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
Consumer<Response> poolConsumer = this.responseConsumer;
@@ -300,6 +300,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
new ProtocolCodec(context, poolSupplier, poolConsumer, conn).response(this).run(null);
}
} else {
new Exception().printStackTrace();
this.responseConsumer.accept(this);
}
}
@@ -332,23 +333,23 @@ public abstract class Response<C extends Context, R extends Request<C>> {
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.writePipeline(this.finishBytesIOThreadHandler);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
this.channel.writePipeline(this.finishBytesIOThreadHandler);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= length) {
buffer.clear();
buffer.put(bs, offset, length);
buffer.flip();
this.channel.write(buffer, buffer, finishBufferIOThreadHandler);
this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler);
} else {
this.channel.write(bs, offset, length, finishBytesIOThreadHandler);
this.channel.writeInIOThread(bs, offset, length, finishBytesIOThreadHandler);
}
}
}
@@ -361,16 +362,16 @@ public abstract class Response<C extends Context, R extends Request<C>> {
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.writePipeline(this.finishBytesIOThreadHandler);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
this.channel.writePipeline(this.finishBytesIOThreadHandler);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler);
this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler);
}
}

View File

@@ -8,6 +8,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redkale.net.AsyncIOThread;
import org.redkale.util.*;
@@ -23,6 +24,8 @@ import org.redkale.util.*;
*/
public class ClientWriteIOThread extends AsyncIOThread {
private final AtomicBoolean writingFlag = new AtomicBoolean();
private final BlockingQueue<ClientFuture> requestQueue = new LinkedBlockingQueue<>();
public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads,
@@ -109,6 +112,10 @@ public class ClientWriteIOThread extends AsyncIOThread {
conn.pauseRequests.addAll(list.subList(i, list.size()));
break;
}
if (writeArray.length() > capacity) { //合并的数据包不能太大
conn.channel.write(writeArray, conn, writeHandler);
writeArray.clear();
}
}
listPool.accept(list);
//channel.write

View File

@@ -16,7 +16,7 @@ import org.redkale.annotation.Resource;
import org.redkale.convert.bson.*;
import org.redkale.convert.json.*;
import org.redkale.mq.*;
import org.redkale.net.*;
import org.redkale.net.AsyncConnection;
import org.redkale.net.sncp.Sncp.SncpDyn;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction;
@@ -69,9 +69,9 @@ public final class OldSncpClient {
protected Set<String> remoteGroups;
//远程模式, 可能为null
protected Transport remoteGroupTransport;
//protected Transport remoteGroupTransport;
public <T extends Service> OldSncpClient(final String serviceResourceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory,
public <T extends Service> OldSncpClient(final String serviceResourceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent messageAgent,
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
this.remote = remote;
this.messageAgent = messageAgent;
@@ -124,14 +124,6 @@ public final class OldSncpClient {
this.remoteGroups = remoteGroups;
}
public Transport getRemoteGroupTransport() {
return remoteGroupTransport;
}
public void setRemoteGroupTransport(Transport remoteGroupTransport) {
this.remoteGroupTransport = remoteGroupTransport;
}
@Override
public String toString() {
String service = serviceClass.getName();
@@ -155,7 +147,6 @@ public final class OldSncpClient {
return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion
+ ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
+ ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups)
+ (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses()))
+ ", actions.size = " + actions.length + ")";
}
@@ -167,7 +158,7 @@ public final class OldSncpClient {
params[action.paramHandlerIndex] = null;
}
final BsonReader reader = bsonConvert.pollReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
CompletableFuture<byte[]> future = remote0(handlerFunc, null, action, params);
if (action.returnFutureResultType != null) { //与handlerFuncIndex互斥
CompletableFuture result = (CompletableFuture) action.returnFutureCreator.create();
future.whenComplete((v, e) -> {
@@ -207,7 +198,7 @@ public final class OldSncpClient {
}
}
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpRemoteAction action, final Object... params) {
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final SocketAddress addr0, final SncpRemoteAction action, final Object... params) {
final String traceid = Traces.currTraceid();
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClasses;
@@ -266,7 +257,7 @@ public final class OldSncpClient {
});
}
final SocketAddress addr = addr0 == null ? (action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : null) : addr0;
CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr);
CompletableFuture<AsyncConnection> connFuture = null; //transport.pollConnection(addr);
return connFuture.thenCompose(conn0 -> {
final CompletableFuture<byte[]> future = new CompletableFuture();
if (conn0 == null) {
@@ -298,7 +289,7 @@ public final class OldSncpClient {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params)));
conn.offerReadBuffer(buffer);
transport.offerConnection(true, conn);
//transport.offerConnection(true, conn);
return;
}
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
@@ -347,7 +338,7 @@ public final class OldSncpClient {
} catch (Throwable e) {
e.printStackTrace();
future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
//transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
handler.failed(e, handlerAttach);
@@ -359,7 +350,7 @@ public final class OldSncpClient {
@SuppressWarnings("unchecked")
public void success() {
future.complete(this.body);
transport.offerConnection(false, conn);
//transport.offerConnection(false, conn);
if (handler != null) {
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
final BsonReader reader = bsonConvert.pollReader();
@@ -380,7 +371,7 @@ public final class OldSncpClient {
public void failed(Throwable exc, ByteBuffer attachment2) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
conn.offerReadBuffer(attachment2);
transport.offerConnection(true, conn);
//transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
handler.failed(exc, handlerAttach);
@@ -393,7 +384,7 @@ public final class OldSncpClient {
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params)));
transport.offerConnection(true, conn);
//transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null;
handler.failed(exc, handlerAttach);

View File

@@ -80,13 +80,13 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
} else if (action.returnFutureClass != null) { //返回类型为CompletableFuture
if (action.returnFutureClass == CompletableFuture.class) {
//v,length-1为了读掉(byte)0
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v, 1, v.length - 1));
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1));
} else {
final CompletableFuture returnFuture = action.returnFutureCreator.create();
future.whenComplete((v, t) -> {
if (t == null) {
//v,length-1为了读掉(byte)0
returnFuture.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v, 1, v.length - 1));
returnFuture.complete(v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1));
} else {
returnFuture.completeExceptionally(t);
}
@@ -130,6 +130,7 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
}
}
requet.prepare(action.header, seqid, traceid, (ByteTuple) writer);
//writer没有回收待优化
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : info.nextRemoteAddress();
return super.connect(addr).thenCompose(conn -> writeChannel(conn, requet).thenApply(rs -> rs.getBodyContent()));
}

View File

@@ -87,7 +87,7 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
halfBodyBytes.put(buffer);
return;
}
halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfHeaderBytes.length());
halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfBodyBytes.length());
//读取完整body
lastResult.setBodyContent(halfBodyBytes.getBytes());
halfBodyBytes = null;

View File

@@ -3,6 +3,7 @@
*/
package org.redkale.net.sncp;
import java.io.Serializable;
import java.util.Objects;
import org.redkale.net.client.*;
import org.redkale.util.*;
@@ -50,6 +51,11 @@ public class SncpClientRequest extends ClientRequest {
return rs;
}
@Override
public Serializable getRequestid() {
return seqid;
}
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
array.putPlaceholder(SncpHeader.HEADER_SIZE);

View File

@@ -64,7 +64,9 @@ public class SncpHeader {
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
this.addrBytes = new byte[4];
if (this.addrBytes == null) {
this.addrBytes = new byte[4];
}
buffer.get(this.addrBytes); //addr 4
this.addrPort = buffer.getChar(); //port 2
this.abilities = buffer.getInt(); //4

View File

@@ -1,75 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.net;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import org.redkale.net.*;
import org.redkale.net.http.HttpServer;
import org.redkale.net.sncp.Sncp;
import org.redkale.util.AnyValue.DefaultAnyValue;
/**
*
* @author zhangjx
*/
public class TransportTest {
private static final String format = "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%tL";
public static void main(String[] args) throws Throwable {
System.setProperty("net.transport.checkinterval", "2");
List<InetSocketAddress> addrs = new ArrayList<>();
addrs.add(new InetSocketAddress("127.0.0.1", 22001));
addrs.add(new InetSocketAddress("127.0.0.1", 22002));
addrs.add(new InetSocketAddress("127.0.0.1", 22003));
addrs.add(new InetSocketAddress("127.0.0.1", 22004));
for (InetSocketAddress servaddr : addrs) {
//if (servaddr.getPort() % 100 == 4) continue;
HttpServer server = new HttpServer();
DefaultAnyValue servconf = DefaultAnyValue.create("port", servaddr.getPort());
server.init(servconf);
server.start();
}
addrs.add(new InetSocketAddress("127.0.0.1", 22005));
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
Thread.sleep(1000);
TransportFactory factory = TransportFactory.create(asyncGroup, 0, 0);
DefaultAnyValue conf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, 5);
factory.init(conf, ByteBuffer.wrap(Sncp.getPingBytes()).asReadOnlyBuffer(), Sncp.getPingBytes().length);
Transport transport = factory.createTransportTCP("", null, addrs);
System.out.println(String.format(format, System.currentTimeMillis()));
try {
CountDownLatch cdl = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
transport.pollConnection(null).whenComplete((r, t) -> {
cdl.countDown();
System.out.println("连接: " + r.getRemoteAddress());
});
}
cdl.await();
HttpServer server = new HttpServer();
DefaultAnyValue servconf = DefaultAnyValue.create("port", 22005);
server.init(servconf);
server.start();
Thread.sleep(4000);
CountDownLatch cdl2 = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
transport.pollConnection(null).whenComplete((r, t) -> {
cdl2.countDown();
System.out.println("连接: " + r.getRemoteAddress());
});
}
cdl2.await();
} finally {
System.out.println(String.format(format, System.currentTimeMillis()));
}
}
}

View File

@@ -5,16 +5,14 @@
*/
package org.redkale.test.sncp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.boot.*;
import org.redkale.convert.bson.*;
import org.redkale.net.*;
import org.redkale.net.sncp.SncpServer;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
import org.redkale.util.*;
@@ -34,12 +32,20 @@ public class SncpTest {
private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192;
private static final ResourceFactory factory = ResourceFactory.create();
private static ResourceFactory factory;
private static Application application;
private static SncpRpcGroups rpcGroups;
public static void main(String[] args) throws Exception {
LoggingBaseHandler.initDebugLogConfig();
application = Application.create(true);
rpcGroups = application.getSncpRpcGroups();
factory = application.getResourceFactory();
factory.register("", BsonConvert.class, BsonFactory.root().getConvert());
factory.register("", Application.class, Application.create(true));
factory.register("", Application.class, application);
if (System.getProperty("client") == null) {
runServer();
if (port2 > 0) {
@@ -54,29 +60,19 @@ public class SncpTest {
}
}
public static AsynchronousChannelGroup newChannelGroup() throws IOException {
final AtomicInteger counter = new AtomicInteger();
ExecutorService transportExec = Executors.newFixedThreadPool(16, (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("Transport-Thread-" + counter.incrementAndGet());
return t;
});
return AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1);
}
private static void runClient() throws Exception {
InetSocketAddress addr = new InetSocketAddress(myhost, port);
Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(addr);
rpcGroups.computeIfAbsent("client", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(addr);
if (port2 > 0) {
set.add(new InetSocketAddress(myhost, port2));
rpcGroups.computeIfAbsent("client", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(new InetSocketAddress(myhost, port2));
}
final AsyncIOGroup asyncGroup = new AsyncIOGroup(clientCapacity, 16);
asyncGroup.start();
final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0);
transFactory.addGroupInfo("client", set);
final SncpTestIService service = null;//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client");
InetSocketAddress sncpAddress = addr;
final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100);
final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "client");//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client");
factory.inject(service);
// SncpTestBean bean = new SncpTestBean();
@@ -93,7 +89,7 @@ public class SncpTest {
callbean = service.insert(callbean);
System.out.println("bean " + callbean);
System.out.println("---------------------------------------------------");
System.out.println("\r\n\r\n\r\n\r\n---------------------------------------------------");
Thread.sleep(200);
final int count = 10;
final CountDownLatch cld = new CountDownLatch(count);
@@ -111,8 +107,8 @@ public class SncpTest {
bean.setContent("数据: " + k);
StringBuilder sb = new StringBuilder();
sb.append(k).append("--------");
for (int j = 0; j < 2000; j++) {
sb.append("_").append(j).append("_").append(k).append("_0123456789");
for (int j = 0; j < 1000; j++) {
sb.append("_").append(j % 10).append("_").append(k).append("7890_0123456789");
}
bean.setContent(sb.toString());
@@ -120,6 +116,7 @@ public class SncpTest {
//service.updateBean(bean);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
} finally {
long a = ai.incrementAndGet();
System.out.println("运行了 " + (a == 100 ? "--------------------------------------------------" : "") + a);
@@ -134,11 +131,12 @@ public class SncpTest {
System.exit(0);
return;
}
Thread.sleep(200);
final CountDownLatch cld2 = new CountDownLatch(1);
long s2 = System.currentTimeMillis();
final CompletableFuture<String> future = service.queryResultAsync(callbean);
future.whenComplete((v, e) -> {
System.out.println("异步执行完毕: " + v + ", 异常为: " + e + ", 耗时: " + (System.currentTimeMillis() - s2) / 1000.0 + "s");
System.out.println("异步执行结果: " + v + ", 异常为: " + e + ", 耗时: " + (System.currentTimeMillis() - s2) / 1000.0 + "s");
cld2.countDown();
});
cld2.await();
@@ -165,13 +163,11 @@ public class SncpTest {
conf.addValue("protocol", protocol);
conf.addValue("maxbody", "" + (100 * 1024 * 1024));
SncpServer server = new SncpServer(null, System.currentTimeMillis(), conf, factory);
Set<InetSocketAddress> set = new LinkedHashSet<>();
if (port2 > 0) {
set.add(new InetSocketAddress(myhost, port2));
rpcGroups.computeIfAbsent("server", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(new InetSocketAddress(myhost, port2));
}
final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0);
transFactory.addGroupInfo("server", set);
SncpTestIService service = null;//Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server");
SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); //Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server");
factory.inject(service);
server.addSncpServlet(service);
System.out.println(service);
@@ -206,12 +202,9 @@ public class SncpTest {
conf.addValue("protocol", protocol);
conf.addValue("maxbody", "" + (100 * 1024 * 1024));
SncpServer server = new SncpServer(null, System.currentTimeMillis(), conf, factory);
Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(new InetSocketAddress(myhost, port));
rpcGroups.computeIfAbsent("server", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(new InetSocketAddress(myhost, port));
final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0);
transFactory.addGroupInfo("server", set);
Service service = null;//Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server");
Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); //Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server");
server.addSncpServlet(service);
server.init(conf);
server.start();

View File

@@ -10,9 +10,12 @@ import java.net.InetSocketAddress;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import org.redkale.annotation.ResourceType;
import org.redkale.net.*;
import org.redkale.net.sncp.Sncp;
import org.redkale.boot.Application;
import org.redkale.net.AsyncIOGroup;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.util.ResourceFactory;
/**
*
@@ -64,7 +67,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
@Override
public String queryResult(SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法 content-length: " + bean.getContent().length());
return "result: " + bean.getContent();
return "result-content: " + bean.getContent();
}
public void queryResult(CompletionHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) {
@@ -83,12 +86,16 @@ public class SncpTestServiceImpl implements SncpTestIService {
public static void main(String[] args) throws Exception {
final Application application = Application.create(true);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
final TransportFactory transFactory = TransportFactory.create(asyncGroup, 0, 0);
final ResourceFactory factory = ResourceFactory.create();
final SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 7070);
rpcGroups.computeIfAbsent("g70", "TCP").putAddress(sncpAddress);
final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070));
Service service = null;// Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, ResourceFactory.create(), transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");
Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory);
for (Method method : service.getClass().getDeclaredMethods()) {
System.out.println(method);
}
@@ -97,7 +104,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
System.out.println(method);
}
System.out.println("-----------------------------------");
service = null;//Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");
service = Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, factory, rpcGroups, client, "g70");
for (Method method : service.getClass().getDeclaredMethods()) {
System.out.println(method);
}
@@ -106,7 +113,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
System.out.println(method);
}
System.out.println("-----------------------------------");
service = null;//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");
service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "g70");
for (Method method : service.getClass().getDeclaredMethods()) {
System.out.println(method);
}