From e2f331ab6b4b65be877257e5cf37e3ffa63da195 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 22 Mar 2023 23:24:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96SncpClient?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 46 ++---------- .../boot/watch/ServletWatchService.java | 39 ---------- .../org/redkale/cluster/ClusterAgent.java | 4 +- .../java/org/redkale/net/AsyncConnection.java | 20 +++++ .../org/redkale/net/AsyncNioConnection.java | 27 ++++--- src/main/java/org/redkale/net/Response.java | 17 +++-- .../net/client/ClientWriteIOThread.java | 7 ++ .../org/redkale/net/sncp/OldSncpClient.java | 31 +++----- .../java/org/redkale/net/sncp/SncpClient.java | 5 +- .../org/redkale/net/sncp/SncpClientCodec.java | 2 +- .../redkale/net/sncp/SncpClientRequest.java | 6 ++ .../java/org/redkale/net/sncp/SncpHeader.java | 4 +- .../org/redkale/test/net/TransportTest.java | 75 ------------------- .../java/org/redkale/test/sncp/SncpTest.java | 67 ++++++++--------- .../test/sncp/SncpTestServiceImpl.java | 23 ++++-- 15 files changed, 127 insertions(+), 246 deletions(-) delete mode 100644 src/main/java/org/redkale/boot/watch/ServletWatchService.java delete mode 100644 src/test/java/org/redkale/test/net/TransportTest.java diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index b53381552..df7e06cee 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -19,7 +19,6 @@ import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.logging.*; -import javax.net.ssl.SSLContext; import org.redkale.annotation.Resource; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.cluster.*; @@ -160,9 +159,6 @@ public final class Application { //NodeServer 资源, 顺序必须是sncps, others, watchs final List servers = new CopyOnWriteArrayList<>(); - //SNCP传输端的TransportFactory, 注意: 只给SNCP使用 - private final TransportFactory sncpTransportFactory; - //配置项里的group信息, 注意: 只给SNCP使用 private final SncpRpcGroups sncpRpcGroups = new SncpRpcGroups(); @@ -452,33 +448,19 @@ public final class Application { throw new RedkaleException(e); } //------------------------------------ 配置 节点 ------------------------------------ - TransportStrategy strategy = null; String excludelib0 = null; ClusterAgent cluster = null; MessageAgent[] mqs = null; int bufferCapacity = 32 * 1024; int bufferPoolSize = Utility.cpus() * 8; - int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS; - int writeTimeoutSeconds = TransportFactory.DEFAULT_WRITETIMEOUTSECONDS; + int readTimeoutSeconds = 6; + int writeTimeoutSeconds = 6; AnyValue executorConf = null; executorConf = config.getAnyValue("executor"); AnyValue excludelibConf = config.getAnyValue("excludelibs"); if (excludelibConf != null) { excludelib0 = excludelibConf.getValue("value"); } - AnyValue transportConf = config.getAnyValue("transport"); - int groupSize = config.getAnyValues("group").length; - if (groupSize > 0 && transportConf == null) { - transportConf = new DefaultAnyValue(); - } - if (transportConf != null) { - //--------------transportBufferPool----------- - bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 32 * 1024); - readTimeoutSeconds = transportConf.getIntValue("readTimeoutSeconds", readTimeoutSeconds); - writeTimeoutSeconds = transportConf.getIntValue("writeTimeoutSeconds", writeTimeoutSeconds); - final int threads = parseLenth(transportConf.getValue("threads"), groupSize * Utility.cpus() * 2); - bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), threads * 4); - } AnyValue clusterConf = config.getAnyValue("cluster"); if (clusterConf != null) { @@ -611,11 +593,6 @@ public final class Application { this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); this.excludelibs = excludelib0; - this.sncpTransportFactory = TransportFactory.create(this.clientAsyncGroup, (SSLContext) null, Transport.DEFAULT_NETPROTOCOL, readTimeoutSeconds, writeTimeoutSeconds, strategy); - DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("redkale.net.transport.pool.maxconns", "100")) - .addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("redkale.net.transport.ping.interval", "30")) - .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("redkale.net.transport.check.interval", "30")); - this.sncpTransportFactory.init(tarnsportConf, ByteBuffer.wrap(Sncp.getPingBytes()).asReadOnlyBuffer(), Sncp.getPongBytes().length); this.clusterAgent = cluster; this.messageAgents = mqs; if (compileMode || this.classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader) { @@ -1047,9 +1024,6 @@ public final class Application { ResourceFactory rs = serv ? rf : (resName.isEmpty() ? application.resourceFactory : null); field.set(srcObj, rs); return rs; - } else if (type == TransportFactory.class) { - field.set(srcObj, application.sncpTransportFactory); - return application.sncpTransportFactory; } else if (type == NodeSncpServer.class) { NodeServer server = null; for (NodeServer ns : application.getNodeServers()) { @@ -1105,7 +1079,7 @@ public final class Application { return false; } - }, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); + }, Application.class, ResourceFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); //------------------------------------ 注册 java.net.http.HttpClient ------------------------------------ resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { @@ -1334,18 +1308,14 @@ public final class Application { if (group.indexOf('$') >= 0) { throw new RedkaleException(" name cannot contains '$' in " + group); } - final String protocol = conf.getValue("protocol", Transport.DEFAULT_NETPROTOCOL).toUpperCase(); + final String protocol = conf.getValue("protocol", "TCP").toUpperCase(); if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) { throw new RedkaleException("Not supported Transport Protocol " + conf.getValue("protocol")); } SncpRpcGroup rg = sncpRpcGroups.computeIfAbsent(group, protocol); - TransportGroupInfo ginfo = new TransportGroupInfo(group, protocol, new LinkedHashSet<>()); for (AnyValue node : conf.getAnyValues("node")) { - final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port")); - ginfo.putAddress(addr); - rg.putAddress(addr); + rg.putAddress(new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"))); } - sncpTransportFactory.addGroupInfo(ginfo); } for (AnyValue conf : config.getAnyValues("listener")) { final String listenClass = conf.getValue("value", ""); @@ -2515,7 +2485,6 @@ public final class Application { this.clientAsyncGroup.dispose(); logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); } - this.sncpTransportFactory.shutdownNow(); long intms = System.currentTimeMillis() - f; String ms = String.valueOf(intms); @@ -2537,11 +2506,6 @@ public final class Application { return resourceFactory; } - @Deprecated - public TransportFactory getSncpTransportFactory2() { - return sncpTransportFactory; - } - public ClusterAgent getClusterAgent() { return clusterAgent; } diff --git a/src/main/java/org/redkale/boot/watch/ServletWatchService.java b/src/main/java/org/redkale/boot/watch/ServletWatchService.java deleted file mode 100644 index d02c2c93f..000000000 --- a/src/main/java/org/redkale/boot/watch/ServletWatchService.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.boot.watch; - -import org.redkale.annotation.Resource; -import org.redkale.boot.Application; -import org.redkale.net.TransportFactory; -import org.redkale.net.http.*; - -/** - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -@RestService(name = "servlet", catalog = "watch", repair = false) -public class ServletWatchService extends AbstractWatchService { - - @Resource - protected Application application; - - @Resource - protected TransportFactory transportFactory; -// -// @RestMapping(name = "loadServlet", auth = false, comment = "动态增加Servlet") -// public RetResult loadServlet(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) { -// //待开发 -// return RetResult.success(); -// } -// -// @RestMapping(name = "stopServlet", auth = false, comment = "动态停止Servlet") -// public RetResult stopServlet(String type) { -// //待开发 -// return RetResult.success(); -// } -} diff --git a/src/main/java/org/redkale/cluster/ClusterAgent.java b/src/main/java/org/redkale/cluster/ClusterAgent.java index 13c2487ba..4733d5cf8 100644 --- a/src/main/java/org/redkale/cluster/ClusterAgent.java +++ b/src/main/java/org/redkale/cluster/ClusterAgent.java @@ -19,7 +19,7 @@ import static org.redkale.boot.Application.*; import org.redkale.convert.ConvertDisabled; import org.redkale.convert.json.JsonConvert; import org.redkale.mq.MessageMultiConsumer; -import org.redkale.net.*; +import org.redkale.net.Server; import org.redkale.net.http.*; import org.redkale.net.sncp.*; import org.redkale.service.*; @@ -407,7 +407,7 @@ public abstract class ClusterAgent { this.address = addr; this.serviceRef = new WeakReference(service); Server server = ns.getServer(); - this.netProtocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_NETPROTOCOL; + this.netProtocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : "TCP"; } @Override diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 8be90e1b6..5d55cfe70 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -37,6 +37,12 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected volatile long writeTime; + protected volatile boolean connectPending; + + protected volatile boolean readPending; + + protected volatile boolean writePending; + private Map attributes; //用于存储绑定在Connection上的对象集合 private Object subobject; //用于存储绑定在Connection上的对象, 同attributes, 只绑定单个对象时尽量使用subobject而非attributes @@ -265,6 +271,20 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + public final void readInIOThreadSafe(CompletionHandler handler) { + if (inCurrReadThread()) { + if (!readPending) { + read(handler); + } + } else { + executeRead(() -> { + if (!readPending) { + read(handler); + } + }); + } + } + //src写完才会回调 public final void write(ByteBuffer src, A attachment, CompletionHandler handler) { if (sslEngine == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 6a90c86f8..d5fb2a638 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -35,8 +35,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected CompletionHandler connectCompletionHandler; - protected volatile boolean connectPending; - protected SelectionKey connectKey; //-------------------------------- 读操作 -------------------------------------- @@ -48,8 +46,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected CompletionHandler readCompletionHandler; - protected volatile boolean readPending; - protected SelectionKey readKey; //-------------------------------- 写操作 -------------------------------------- @@ -88,8 +84,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected CompletionHandler writeCompletionHandler; - protected volatile boolean writePending; - protected SelectionKey writeKey; public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, @@ -157,7 +151,10 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler handler) { + public void write(byte[] headerContent, int headerOffset, int headerLength, + byte[] bodyContent, int bodyOffset, int bodyLength, + Consumer bodyCallback, Object bodyAttachment, CompletionHandler handler) { + if (sslEngine != null) { super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler); return; @@ -289,6 +286,8 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; if (direct) { + int batchOffset = writeOffset; + int batchLength = writeLength; while (hasRemain) { //必须要将buffer写完为止 if (writeByteTuple1Array != null) { final ByteBuffer buffer = pollWriteBuffer(); @@ -345,20 +344,24 @@ abstract class AsyncNioConnection extends AsyncConnection { writeCount = implWrite(writeByteBuffer); hasRemain = writeByteBuffer.hasRemaining(); } else { - writeCount = implWrite(writeByteBuffers, writeOffset, writeLength); + writeCount = implWrite(writeByteBuffers, batchOffset, batchLength); boolean remain = false; - for (int i = writeByteBuffers.length - 1; i >= writeOffset; i--) { - if (writeByteBuffers[i].hasRemaining()) { + for (int i = 0; i < batchLength; i++) { + if (writeByteBuffers[batchOffset + i].hasRemaining()) { remain = true; + batchOffset += i; + batchLength -= i; break; } } hasRemain = remain; } + if (writeCount == 0) { if (hasRemain) { - writeCompleted = false; - writeTotal = totalCount; + //writeCompleted = false; + //writeTotal = totalCount; + continue; //要全部输出完才返回 } break; } else if (writeCount < 0) { diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index f9bd002da..f9d9cc734 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -292,7 +292,7 @@ public abstract class Response> { AsyncConnection conn = removeChannel(); if (conn != null && conn.protocolCodec != null) { this.responseConsumer.accept(this); - conn.readInIOThread(conn.protocolCodec); + conn.readInIOThreadSafe(conn.protocolCodec); } else { Supplier poolSupplier = this.responseSupplier; Consumer poolConsumer = this.responseConsumer; @@ -300,6 +300,7 @@ public abstract class Response> { new ProtocolCodec(context, poolSupplier, poolConsumer, conn).response(this).run(null); } } else { + new Exception().printStackTrace(); this.responseConsumer.accept(this); } } @@ -332,23 +333,23 @@ public abstract class Response> { boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); if (allCompleted) { request.pipelineCompleted = true; - this.channel.writePipeline(this.finishBytesIOThreadHandler); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { removeChannel(); this.responseConsumer.accept(this); } } else if (this.channel.hasPipelineData()) { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); - this.channel.writePipeline(this.finishBytesIOThreadHandler); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= length) { buffer.clear(); buffer.put(bs, offset, length); buffer.flip(); - this.channel.write(buffer, buffer, finishBufferIOThreadHandler); + this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler); } else { - this.channel.write(bs, offset, length, finishBytesIOThreadHandler); + this.channel.writeInIOThread(bs, offset, length, finishBytesIOThreadHandler); } } } @@ -361,16 +362,16 @@ public abstract class Response> { boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); if (allCompleted) { request.pipelineCompleted = true; - this.channel.writePipeline(this.finishBytesIOThreadHandler); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { removeChannel(); this.responseConsumer.accept(this); } } else if (this.channel.hasPipelineData()) { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); - this.channel.writePipeline(this.finishBytesIOThreadHandler); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { - this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler); + this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler); } } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index b6e07bbd5..5caa01f97 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -8,6 +8,7 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.redkale.net.AsyncIOThread; import org.redkale.util.*; @@ -23,6 +24,8 @@ import org.redkale.util.*; */ public class ClientWriteIOThread extends AsyncIOThread { + private final AtomicBoolean writingFlag = new AtomicBoolean(); + private final BlockingQueue requestQueue = new LinkedBlockingQueue<>(); public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, @@ -109,6 +112,10 @@ public class ClientWriteIOThread extends AsyncIOThread { conn.pauseRequests.addAll(list.subList(i, list.size())); break; } + if (writeArray.length() > capacity) { //合并的数据包不能太大 + conn.channel.write(writeArray, conn, writeHandler); + writeArray.clear(); + } } listPool.accept(list); //channel.write diff --git a/src/main/java/org/redkale/net/sncp/OldSncpClient.java b/src/main/java/org/redkale/net/sncp/OldSncpClient.java index 023360982..cc0d6d8cd 100644 --- a/src/main/java/org/redkale/net/sncp/OldSncpClient.java +++ b/src/main/java/org/redkale/net/sncp/OldSncpClient.java @@ -16,7 +16,7 @@ import org.redkale.annotation.Resource; import org.redkale.convert.bson.*; import org.redkale.convert.json.*; import org.redkale.mq.*; -import org.redkale.net.*; +import org.redkale.net.AsyncConnection; import org.redkale.net.sncp.Sncp.SncpDyn; import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction; @@ -69,9 +69,9 @@ public final class OldSncpClient { protected Set remoteGroups; //远程模式, 可能为null - protected Transport remoteGroupTransport; + //protected Transport remoteGroupTransport; - public OldSncpClient(final String serviceResourceName, final Class serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory, + public OldSncpClient(final String serviceResourceName, final Class serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) { this.remote = remote; this.messageAgent = messageAgent; @@ -124,14 +124,6 @@ public final class OldSncpClient { this.remoteGroups = remoteGroups; } - public Transport getRemoteGroupTransport() { - return remoteGroupTransport; - } - - public void setRemoteGroupTransport(Transport remoteGroupTransport) { - this.remoteGroupTransport = remoteGroupTransport; - } - @Override public String toString() { String service = serviceClass.getName(); @@ -155,7 +147,6 @@ public final class OldSncpClient { return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) + ((remoteGroups == null || remoteGroups.isEmpty()) ? "" : ", remoteGroups = " + remoteGroups) - + (remoteGroupTransport == null ? "" : ", remoteGroupTransport = " + Arrays.toString(remoteGroupTransport.getRemoteAddresses())) + ", actions.size = " + actions.length + ")"; } @@ -167,7 +158,7 @@ public final class OldSncpClient { params[action.paramHandlerIndex] = null; } final BsonReader reader = bsonConvert.pollReader(); - CompletableFuture future = remote0(handlerFunc, remoteGroupTransport, null, action, params); + CompletableFuture future = remote0(handlerFunc, null, action, params); if (action.returnFutureResultType != null) { //与handlerFuncIndex互斥 CompletableFuture result = (CompletableFuture) action.returnFutureCreator.create(); future.whenComplete((v, e) -> { @@ -207,7 +198,7 @@ public final class OldSncpClient { } } - private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpRemoteAction action, final Object... params) { + private CompletableFuture remote0(final CompletionHandler handler, final SocketAddress addr0, final SncpRemoteAction action, final Object... params) { final String traceid = Traces.currTraceid(); final Type[] myparamtypes = action.paramTypes; final Class[] myparamclass = action.paramClasses; @@ -266,7 +257,7 @@ public final class OldSncpClient { }); } final SocketAddress addr = addr0 == null ? (action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : null) : addr0; - CompletableFuture connFuture = transport.pollConnection(addr); + CompletableFuture connFuture = null; //transport.pollConnection(addr); return connFuture.thenCompose(conn0 -> { final CompletableFuture future = new CompletableFuture(); if (conn0 == null) { @@ -298,7 +289,7 @@ public final class OldSncpClient { if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params))); conn.offerReadBuffer(buffer); - transport.offerConnection(true, conn); + //transport.offerConnection(true, conn); return; } if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 @@ -347,7 +338,7 @@ public final class OldSncpClient { } catch (Throwable e) { e.printStackTrace(); future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error, params=" + JsonConvert.root().convertTo(params))); - transport.offerConnection(true, conn); + //transport.offerConnection(true, conn); if (handler != null) { final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; handler.failed(e, handlerAttach); @@ -359,7 +350,7 @@ public final class OldSncpClient { @SuppressWarnings("unchecked") public void success() { future.complete(this.body); - transport.offerConnection(false, conn); + //transport.offerConnection(false, conn); if (handler != null) { final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; final BsonReader reader = bsonConvert.pollReader(); @@ -380,7 +371,7 @@ public final class OldSncpClient { public void failed(Throwable exc, ByteBuffer attachment2) { future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); conn.offerReadBuffer(attachment2); - transport.offerConnection(true, conn); + //transport.offerConnection(true, conn); if (handler != null) { final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; handler.failed(exc, handlerAttach); @@ -393,7 +384,7 @@ public final class OldSncpClient { @Override public void failed(Throwable exc, Void attachment) { future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); - transport.offerConnection(true, conn); + //transport.offerConnection(true, conn); if (handler != null) { final Object handlerAttach = action.paramHandlerAttachIndex >= 0 ? params[action.paramHandlerAttachIndex] : null; handler.failed(exc, handlerAttach); diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 9fd882267..a9c41f04b 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -80,13 +80,13 @@ public class SncpClient extends Client v == null ? null : convert.convertFrom(action.paramHandlerResultType, v, 1, v.length - 1)); + return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1)); } else { final CompletableFuture returnFuture = action.returnFutureCreator.create(); future.whenComplete((v, t) -> { if (t == null) { //v,length-1为了读掉(byte)0 - returnFuture.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v, 1, v.length - 1)); + returnFuture.complete(v == null ? null : convert.convertFrom(action.returnFutureResultType, v, 1, v.length - 1)); } else { returnFuture.completeExceptionally(t); } @@ -130,6 +130,7 @@ public class SncpClient extends Client= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : info.nextRemoteAddress(); return super.connect(addr).thenCompose(conn -> writeChannel(conn, requet).thenApply(rs -> rs.getBodyContent())); } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java index 713716259..6427901a2 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java @@ -87,7 +87,7 @@ public class SncpClientCodec extends ClientCodec addrs = new ArrayList<>(); - addrs.add(new InetSocketAddress("127.0.0.1", 22001)); - addrs.add(new InetSocketAddress("127.0.0.1", 22002)); - addrs.add(new InetSocketAddress("127.0.0.1", 22003)); - addrs.add(new InetSocketAddress("127.0.0.1", 22004)); - for (InetSocketAddress servaddr : addrs) { - //if (servaddr.getPort() % 100 == 4) continue; - HttpServer server = new HttpServer(); - DefaultAnyValue servconf = DefaultAnyValue.create("port", servaddr.getPort()); - server.init(servconf); - server.start(); - } - addrs.add(new InetSocketAddress("127.0.0.1", 22005)); - final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); - asyncGroup.start(); - Thread.sleep(1000); - TransportFactory factory = TransportFactory.create(asyncGroup, 0, 0); - DefaultAnyValue conf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, 5); - factory.init(conf, ByteBuffer.wrap(Sncp.getPingBytes()).asReadOnlyBuffer(), Sncp.getPingBytes().length); - Transport transport = factory.createTransportTCP("", null, addrs); - System.out.println(String.format(format, System.currentTimeMillis())); - try { - CountDownLatch cdl = new CountDownLatch(20); - for (int i = 0; i < 20; i++) { - transport.pollConnection(null).whenComplete((r, t) -> { - cdl.countDown(); - System.out.println("连接: " + r.getRemoteAddress()); - }); - } - cdl.await(); - HttpServer server = new HttpServer(); - DefaultAnyValue servconf = DefaultAnyValue.create("port", 22005); - server.init(servconf); - server.start(); - Thread.sleep(4000); - CountDownLatch cdl2 = new CountDownLatch(20); - for (int i = 0; i < 20; i++) { - transport.pollConnection(null).whenComplete((r, t) -> { - cdl2.countDown(); - System.out.println("连接: " + r.getRemoteAddress()); - }); - } - cdl2.await(); - } finally { - System.out.println(String.format(format, System.currentTimeMillis())); - } - } - -} diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 5ab471f2f..46d751e17 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -5,16 +5,14 @@ */ package org.redkale.test.sncp; -import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousChannelGroup; -import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.redkale.boot.*; import org.redkale.convert.bson.*; import org.redkale.net.*; -import org.redkale.net.sncp.SncpServer; +import org.redkale.net.client.ClientAddress; +import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.util.*; @@ -34,12 +32,20 @@ public class SncpTest { private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192; - private static final ResourceFactory factory = ResourceFactory.create(); + private static ResourceFactory factory; + + private static Application application; + + private static SncpRpcGroups rpcGroups; public static void main(String[] args) throws Exception { LoggingBaseHandler.initDebugLogConfig(); + application = Application.create(true); + rpcGroups = application.getSncpRpcGroups(); + factory = application.getResourceFactory(); factory.register("", BsonConvert.class, BsonFactory.root().getConvert()); - factory.register("", Application.class, Application.create(true)); + factory.register("", Application.class, application); + if (System.getProperty("client") == null) { runServer(); if (port2 > 0) { @@ -54,29 +60,19 @@ public class SncpTest { } } - public static AsynchronousChannelGroup newChannelGroup() throws IOException { - final AtomicInteger counter = new AtomicInteger(); - ExecutorService transportExec = Executors.newFixedThreadPool(16, (Runnable r) -> { - Thread t = new Thread(r); - t.setDaemon(true); - t.setName("Transport-Thread-" + counter.incrementAndGet()); - return t; - }); - return AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1); - } - private static void runClient() throws Exception { InetSocketAddress addr = new InetSocketAddress(myhost, port); - Set set = new LinkedHashSet<>(); - set.add(addr); + rpcGroups.computeIfAbsent("client", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(addr); if (port2 > 0) { - set.add(new InetSocketAddress(myhost, port2)); + rpcGroups.computeIfAbsent("client", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(new InetSocketAddress(myhost, port2)); } final AsyncIOGroup asyncGroup = new AsyncIOGroup(clientCapacity, 16); asyncGroup.start(); - final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0); - transFactory.addGroupInfo("client", set); - final SncpTestIService service = null;//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client"); + + InetSocketAddress sncpAddress = addr; + final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100); + + final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "client");//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client"); factory.inject(service); // SncpTestBean bean = new SncpTestBean(); @@ -93,7 +89,7 @@ public class SncpTest { callbean = service.insert(callbean); System.out.println("bean: " + callbean); - System.out.println("---------------------------------------------------"); + System.out.println("\r\n\r\n\r\n\r\n---------------------------------------------------"); Thread.sleep(200); final int count = 10; final CountDownLatch cld = new CountDownLatch(count); @@ -111,8 +107,8 @@ public class SncpTest { bean.setContent("数据: " + k); StringBuilder sb = new StringBuilder(); sb.append(k).append("--------"); - for (int j = 0; j < 2000; j++) { - sb.append("_").append(j).append("_").append(k).append("_0123456789"); + for (int j = 0; j < 1000; j++) { + sb.append("_").append(j % 10).append("_").append(k).append("7890_0123456789"); } bean.setContent(sb.toString()); @@ -120,6 +116,7 @@ public class SncpTest { //service.updateBean(bean); } catch (Exception e) { e.printStackTrace(); + System.exit(1); } finally { long a = ai.incrementAndGet(); System.out.println("运行了 " + (a == 100 ? "--------------------------------------------------" : "") + a); @@ -134,11 +131,12 @@ public class SncpTest { System.exit(0); return; } + Thread.sleep(200); final CountDownLatch cld2 = new CountDownLatch(1); long s2 = System.currentTimeMillis(); final CompletableFuture future = service.queryResultAsync(callbean); future.whenComplete((v, e) -> { - System.out.println("异步执行完毕: " + v + ", 异常为: " + e + ", 耗时: " + (System.currentTimeMillis() - s2) / 1000.0 + "s"); + System.out.println("异步执行结果: " + v + ", 异常为: " + e + ", 耗时: " + (System.currentTimeMillis() - s2) / 1000.0 + "s"); cld2.countDown(); }); cld2.await(); @@ -165,13 +163,11 @@ public class SncpTest { conf.addValue("protocol", protocol); conf.addValue("maxbody", "" + (100 * 1024 * 1024)); SncpServer server = new SncpServer(null, System.currentTimeMillis(), conf, factory); - Set set = new LinkedHashSet<>(); if (port2 > 0) { - set.add(new InetSocketAddress(myhost, port2)); + rpcGroups.computeIfAbsent("server", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(new InetSocketAddress(myhost, port2)); } - final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0); - transFactory.addGroupInfo("server", set); - SncpTestIService service = null;//Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server"); + + SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); //Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server"); factory.inject(service); server.addSncpServlet(service); System.out.println(service); @@ -206,12 +202,9 @@ public class SncpTest { conf.addValue("protocol", protocol); conf.addValue("maxbody", "" + (100 * 1024 * 1024)); SncpServer server = new SncpServer(null, System.currentTimeMillis(), conf, factory); - Set set = new LinkedHashSet<>(); - set.add(new InetSocketAddress(myhost, port)); + rpcGroups.computeIfAbsent("server", protocol.endsWith(".UDP") ? "UDP" : "TCP").putAddress(new InetSocketAddress(myhost, port)); - final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0); - transFactory.addGroupInfo("server", set); - Service service = null;//Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server"); + Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); //Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, factory, transFactory, addr, "server"); server.addSncpServlet(service); server.init(conf); server.start(); diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index 6030b2de0..8a620896d 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -10,9 +10,12 @@ import java.net.InetSocketAddress; import java.nio.channels.CompletionHandler; import java.util.concurrent.CompletableFuture; import org.redkale.annotation.ResourceType; -import org.redkale.net.*; -import org.redkale.net.sncp.Sncp; +import org.redkale.boot.Application; +import org.redkale.net.AsyncIOGroup; +import org.redkale.net.client.ClientAddress; +import org.redkale.net.sncp.*; import org.redkale.service.*; +import org.redkale.util.ResourceFactory; /** * @@ -64,7 +67,7 @@ public class SncpTestServiceImpl implements SncpTestIService { @Override public String queryResult(SncpTestBean bean) { System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法 content-length: " + bean.getContent().length()); - return "result: " + bean.getContent(); + return "result-content: " + bean.getContent(); } public void queryResult(CompletionHandler handler, @RpcAttachment SncpTestBean bean) { @@ -83,12 +86,16 @@ public class SncpTestServiceImpl implements SncpTestIService { public static void main(String[] args) throws Exception { + final Application application = Application.create(true); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); asyncGroup.start(); - final TransportFactory transFactory = TransportFactory.create(asyncGroup, 0, 0); + final ResourceFactory factory = ResourceFactory.create(); + final SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); + InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 7070); + rpcGroups.computeIfAbsent("g70", "TCP").putAddress(sncpAddress); + final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); - transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070)); - Service service = null;// Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, ResourceFactory.create(), transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); + Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); for (Method method : service.getClass().getDeclaredMethods()) { System.out.println(method); } @@ -97,7 +104,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - service = null;//Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); + service = Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, factory, rpcGroups, client, "g70"); for (Method method : service.getClass().getDeclaredMethods()) { System.out.println(method); } @@ -106,7 +113,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - service = null;//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); + service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "g70"); for (Method method : service.getClass().getDeclaredMethods()) { System.out.println(method); }