From 849d628a757cd0d2dfa5202b8a779be12848842c Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 3 Apr 2023 11:02:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95=E7=94=A8?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/source/DataJdbcSource.java | 9 ++- .../redkale/test/http/RestSleepService.java | 37 ++++++++++ .../org/redkale/test/http/RestSleepTest.java | 73 +++++++++++++++++++ .../redkale/test/sncp/SncpSleepService.java | 36 +++++++++ .../org/redkale/test/sncp/SncpSleepTest.java | 65 +++++++++++++++++ 5 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/redkale/test/http/RestSleepService.java create mode 100644 src/test/java/org/redkale/test/http/RestSleepTest.java create mode 100644 src/test/java/org/redkale/test/sncp/SncpSleepService.java create mode 100644 src/test/java/org/redkale/test/sncp/SncpSleepTest.java diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 860eb801a..b4ecc96c7 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -202,7 +202,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final DefaultDataBatch dataBatch = (DefaultDataBatch) batch; if (dataBatch.actions.isEmpty()) { return 0; - } + } int c = 0; Connection conn = null; try { @@ -2548,6 +2548,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { public ConnectionPool(Properties prop) { this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6")); int defMaxConns = Utility.cpus() * 4; + if (workExecutor instanceof ThreadPoolExecutor) { + defMaxConns = ((ThreadPoolExecutor) workExecutor).getCorePoolSize(); + } else if (workExecutor instanceof ThreadHashExecutor) { + defMaxConns = ((ThreadHashExecutor) workExecutor).getCorePoolSize(); + } else if (workExecutor != null) { //maybe virtual thread pool + defMaxConns = Math.min(1000, Utility.cpus() * 100); + } this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns))); this.queue = new ArrayBlockingQueue<>(maxConns); this.url = prop.getProperty(DATA_SOURCE_URL); diff --git a/src/test/java/org/redkale/test/http/RestSleepService.java b/src/test/java/org/redkale/test/http/RestSleepService.java new file mode 100644 index 000000000..38b100aeb --- /dev/null +++ b/src/test/java/org/redkale/test/http/RestSleepService.java @@ -0,0 +1,37 @@ +/* + * + */ +package org.redkale.test.http; + +import org.redkale.net.http.RestService; +import org.redkale.service.AbstractService; +import org.redkale.util.Utility; + +/** + * + * @author zhangjx + */ +@RestService(name = "test", automapping = true) +public class RestSleepService extends AbstractService { + + public String sleep200() { + Utility.sleep(200); + System.out.println("当前执行线程: " + Thread.currentThread().getName()); + return "ok200"; + } + + public String sleep300() { + Utility.sleep(300); + return "ok300"; + } + + public String sleep400() { + Utility.sleep(400); + return "ok400"; + } + + public String sleep500() { + Utility.sleep(500); + return "ok500"; + } +} diff --git a/src/test/java/org/redkale/test/http/RestSleepTest.java b/src/test/java/org/redkale/test/http/RestSleepTest.java new file mode 100644 index 000000000..9bb41e1c2 --- /dev/null +++ b/src/test/java/org/redkale/test/http/RestSleepTest.java @@ -0,0 +1,73 @@ +/* + * + */ +package org.redkale.test.http; + +import java.io.*; +import java.net.*; +import org.junit.jupiter.api.*; +import org.redkale.boot.Application; +import org.redkale.convert.bson.BsonConvert; +import org.redkale.convert.json.JsonConvert; +import org.redkale.net.AsyncIOGroup; +import org.redkale.net.http.*; +import org.redkale.net.sncp.Sncp; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class RestSleepTest { + + private boolean main; + + public static void main(String[] args) throws Throwable { + RestSleepTest test = new RestSleepTest(); + test.main = true; + test.run(); + } + + @Test + public void run() throws Exception { + System.out.println("------------------- 并发调用 -----------------------------------"); + final Application application = Application.create(true); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + asyncGroup.start(); + final ResourceFactory resFactory = ResourceFactory.create(); + resFactory.register(JsonConvert.root()); + resFactory.register(BsonConvert.root()); + + //------------------------ 初始化 CService ------------------------------------ + RestSleepService service = Sncp.createSimpleLocalService(RestSleepService.class, resFactory); + HttpServer server = new HttpServer(application, System.currentTimeMillis(), resFactory); + server.getResourceFactory().register(application); + System.out.println("servlet = " + server.addRestServlet(null, service, null, HttpServlet.class, "")); + server.init(AnyValue.DefaultAnyValue.create("port", 0)); + server.start(); + + int port = server.getSocketAddress().getPort(); + System.out.println("服务器启动端口: " + port); + InetSocketAddress httpAddress = new InetSocketAddress("127.0.0.1", port); + Socket socket = new Socket(httpAddress.getAddress(), port); + OutputStream out = socket.getOutputStream(); + out.write(("GET /test/sleep200 HTTP/1.1\r\n" + + "Connection: Keep-Alive\r\n" + + "\r\n" + + "GET /test/sleep300 HTTP/1.1\r\n" + + "Connection: Keep-Alive\r\n" + + "\r\n" + + "GET /test/sleep500 HTTP/1.1\r\n" + + "Connection: Keep-Alive\r\n" + + "\r\n").getBytes()); + InputStream in = socket.getInputStream(); + byte[] bytes = new byte[8192]; + long s = System.currentTimeMillis(); + int pos = in.read(bytes); + long e = System.currentTimeMillis() - s; + System.out.println("返回结果: " + new String(bytes, 0, pos)); + System.out.println("耗时: " + e + " ms"); + server.shutdown(); + Assertions.assertTrue(e < 600); + } +} diff --git a/src/test/java/org/redkale/test/sncp/SncpSleepService.java b/src/test/java/org/redkale/test/sncp/SncpSleepService.java new file mode 100644 index 000000000..a056a5e10 --- /dev/null +++ b/src/test/java/org/redkale/test/sncp/SncpSleepService.java @@ -0,0 +1,36 @@ +/* + * + */ +package org.redkale.test.sncp; + +import java.util.concurrent.CompletableFuture; +import org.redkale.service.AbstractService; +import org.redkale.util.Utility; + +/** + * + * @author zhangjx + */ +public class SncpSleepService extends AbstractService { + + public CompletableFuture sleep200() { + return (CompletableFuture) CompletableFuture.supplyAsync(() -> { + Utility.sleep(200); + return "ok200"; + }); + } + + public CompletableFuture sleep300() { + return (CompletableFuture) CompletableFuture.supplyAsync(() -> { + Utility.sleep(300); + return "ok300"; + }); + } + + public CompletableFuture sleep500() { + return (CompletableFuture) CompletableFuture.supplyAsync(() -> { + Utility.sleep(500); + return "ok500"; + }); + } +} diff --git a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java new file mode 100644 index 000000000..79e75b887 --- /dev/null +++ b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java @@ -0,0 +1,65 @@ +package org.redkale.test.sncp; + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.*; +import org.redkale.boot.Application; +import org.redkale.convert.bson.BsonConvert; +import org.redkale.convert.json.JsonConvert; +import org.redkale.net.AsyncIOGroup; +import org.redkale.net.client.ClientAddress; +import org.redkale.net.sncp.*; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class SncpSleepTest { + + private boolean main; + + public static void main(String[] args) throws Throwable { + SncpSleepTest test = new SncpSleepTest(); + test.main = true; + test.run(); + } + + @Test + public void run() throws Exception { + System.out.println("------------------- 并发调用 -----------------------------------"); + final Application application = Application.create(true); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + asyncGroup.start(); + final ResourceFactory resFactory = ResourceFactory.create(); + resFactory.register(JsonConvert.root()); + resFactory.register(BsonConvert.root()); + + //------------------------ 初始化 CService ------------------------------------ + SncpSleepService service = Sncp.createSimpleLocalService(SncpSleepService.class, resFactory); + SncpServer server = new SncpServer(application, System.currentTimeMillis(), null, resFactory); + server.getResourceFactory().register(application); + server.addSncpServlet(service); + server.init(AnyValue.DefaultAnyValue.create("port", 0)); + server.start(); + + int port = server.getSocketAddress().getPort(); + System.out.println("服务器启动端口: " + port); + InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", port); + final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); + final SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); + rpcGroups.computeIfAbsent("cs", "TCP").putAddress(sncpAddress); + SncpSleepService remoteCService = Sncp.createSimpleRemoteService(SncpSleepService.class, resFactory, rpcGroups, client, "cs"); + long s = System.currentTimeMillis(); + CompletableFuture[] futures = new CompletableFuture[]{ + remoteCService.sleep200(), + remoteCService.sleep300(), + remoteCService.sleep500() + }; + CompletableFuture.allOf(futures).join(); + long e = System.currentTimeMillis() - s; + System.out.println("耗时: " + e + " ms"); + server.shutdown(); + Assertions.assertTrue(e < 600); + } +}