From 21c00b34199b4290178544b3f7b91bf51d920785 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 9 Oct 2024 21:29:04 +0800 Subject: [PATCH] sncp --- .../java/org/redkale/boot/Application.java | 15 +- .../org/redkale/net/sncp/SncpRemoteInfo.java | 2 + .../java/org/redkale/util/Jdk21Inners.java | 248 +++++++++--------- .../org/redkale/test/sncp/SncpSleepTest.java | 2 +- 4 files changed, 136 insertions(+), 131 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 5d7daedb6..a81c590ed 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -32,6 +32,7 @@ import org.redkale.cluster.spi.HttpClusterRpcClient; import org.redkale.cluster.spi.HttpLocalRpcClient; import org.redkale.convert.Convert; import org.redkale.convert.json.*; +import org.redkale.convert.pb.ProtobufConvert; import org.redkale.convert.pb.ProtobufFactory; import org.redkale.inject.ResourceAnnotationLoader; import org.redkale.inject.ResourceEvent; @@ -305,12 +306,14 @@ public final class Application { this.resourceFactory.register(Environment.class, environment); this.resourceFactory.register(JsonFactory.root()); this.resourceFactory.register(ProtobufFactory.root()); - this.resourceFactory.register(JsonFactory.root().getConvert()); - this.resourceFactory.register(ProtobufFactory.root().getConvert()); - this.resourceFactory.register( - "jsonconvert", Convert.class, JsonFactory.root().getConvert()); - this.resourceFactory.register( - "protobufconvert", Convert.class, ProtobufFactory.root().getConvert()); + JsonConvert jsonConvert = JsonFactory.root().getConvert(); + jsonConvert.offerWriter(jsonConvert.pollWriter()); + this.resourceFactory.register(jsonConvert); + ProtobufConvert protobufConvert = ProtobufFactory.root().getConvert(); + protobufConvert.offerWriter(protobufConvert.pollWriter()); + this.resourceFactory.register(protobufConvert); + this.resourceFactory.register("jsonconvert", Convert.class, jsonConvert); + this.resourceFactory.register("protobufconvert", Convert.class, protobufConvert); JsonFactory.root().registerFieldFuncConsumer(resourceFactory::inject); ProtobufFactory.root().registerFieldFuncConsumer(resourceFactory::inject); diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index 6f9a7dd97..7bcb15aa5 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -11,6 +11,7 @@ import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; +import org.redkale.annotation.ClassDepends; import org.redkale.convert.json.JsonConvert; import org.redkale.convert.pb.ProtobufConvert; import org.redkale.convert.pb.ProtobufWriter; @@ -107,6 +108,7 @@ public class SncpRemoteInfo { } // 由远程模式的DyncRemoveService调用 + @ClassDepends public T remote(final String actionid, final Object... params) { final SncpRemoteAction action = this.actions.get(actionid); CompletionHandler callbackHandler = null; diff --git a/src/main/java/org/redkale/util/Jdk21Inners.java b/src/main/java/org/redkale/util/Jdk21Inners.java index f0819160a..6d7eac5bd 100644 --- a/src/main/java/org/redkale/util/Jdk21Inners.java +++ b/src/main/java/org/redkale/util/Jdk21Inners.java @@ -1,124 +1,124 @@ -///* -// * Copyright (c) 2016-2116 Redkale -// * All rights reserved. -// */ -//package org.redkale.util; -// -//import java.util.concurrent.Executor; -//import java.util.concurrent.ExecutorService; -//import java.util.concurrent.Executors; -//import java.util.concurrent.ThreadFactory; -//import java.util.function.Function; -//import java.util.function.Supplier; -// -///** -// * -// * @author zhangjx -// */ -//class Jdk21Inners { -// -// static { -// // 加载时进行可用性判断 -// Thread.currentThread().isVirtual(); -// } -// -// private Jdk21Inners() { -// // do nothing -// } -// -// public static Executor createExecutor() { -// return new VirtualExecutor(); -// } -// -// public static Function createPoolFunction() { -// return new VirtualPoolFunction(); -// } -// -// public static Function createThreadLocalFunction() { -// return new VirtualThreadLocal(() -> null); -// } -// -// public static Function createThreadFactoryFunction() { -// return new VirtualThreadFactory(""); -// } -// -// static class VirtualExecutor implements Executor { -// -// @Override -// public void execute(Runnable t) { -// Thread.ofVirtual().name("Redkale-VirtualThread").start(t); -// } -// } -// -// static class VirtualPoolFunction implements Function { -// -// @Override -// public ExecutorService apply(String threadNameFormat) { -// final ThreadFactory factory = Thread.ofVirtual().factory(); -// final String threadName = String.format(threadNameFormat, "Virtual"); -// return Executors.newThreadPerTaskExecutor(r -> { -// Thread t = factory.newThread(r); -// t.setName(threadName); -// return t; -// }); -// } -// } -// -// static class VirtualThreadLocal extends ThreadLocal implements Function, ThreadLocal> { -// -// private final Supplier supplier; -// -// public VirtualThreadLocal(Supplier supplier) { -// this.supplier = supplier; -// } -// -// @Override -// public ThreadLocal apply(Supplier supplier) { -// return new VirtualThreadLocal<>(supplier); -// } -// -// @Override -// protected T initialValue() { -// return supplier.get(); -// } -// -// @Override -// public void set(T value) { -// Thread t = Thread.currentThread(); -// if (!t.isVirtual()) { -// super.set(value); -// } -// } -// -// @Override -// public T get() { -// Thread t = Thread.currentThread(); -// return t.isVirtual() ? initialValue() : super.get(); -// } -// } -// -// static class VirtualThreadFactory implements ThreadFactory, Function { -// -// private final ThreadFactory factory = Thread.ofVirtual().factory(); -// -// private final String name; -// -// public VirtualThreadFactory(String name) { -// this.name = name; -// } -// -// @Override -// public ThreadFactory apply(String name) { -// return new VirtualThreadFactory(name); -// } -// -// @Override -// public Thread newThread(Runnable r) { -// Thread t = factory.newThread(r); -// if (name != null) { -// t.setName(name); -// } -// return t; -// } -// } -//} +/* + * Copyright (c) 2016-2116 Redkale + * All rights reserved. + */ +package org.redkale.util; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * + * @author zhangjx + */ +class Jdk21Inners { + + static { + // 加载时进行可用性判断 + Thread.currentThread().isVirtual(); + } + + private Jdk21Inners() { + // do nothing + } + + public static Executor createExecutor() { + return new VirtualExecutor(); + } + + public static Function createPoolFunction() { + return new VirtualPoolFunction(); + } + + public static Function createThreadLocalFunction() { + return new VirtualThreadLocal(() -> null); + } + + public static Function createThreadFactoryFunction() { + return new VirtualThreadFactory(""); + } + + static class VirtualExecutor implements Executor { + + @Override + public void execute(Runnable t) { + Thread.ofVirtual().name("Redkale-VirtualThread").start(t); + } + } + + static class VirtualPoolFunction implements Function { + + @Override + public ExecutorService apply(String threadNameFormat) { + final ThreadFactory factory = Thread.ofVirtual().factory(); + final String threadName = String.format(threadNameFormat, "Virtual"); + return Executors.newThreadPerTaskExecutor(r -> { + Thread t = factory.newThread(r); + t.setName(threadName); + return t; + }); + } + } + + static class VirtualThreadLocal extends ThreadLocal implements Function, ThreadLocal> { + + private final Supplier supplier; + + public VirtualThreadLocal(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public ThreadLocal apply(Supplier supplier) { + return new VirtualThreadLocal<>(supplier); + } + + @Override + protected T initialValue() { + return supplier.get(); + } + + @Override + public void set(T value) { + Thread t = Thread.currentThread(); + if (!t.isVirtual()) { + super.set(value); + } + } + + @Override + public T get() { + Thread t = Thread.currentThread(); + return t.isVirtual() ? initialValue() : super.get(); + } + } + + static class VirtualThreadFactory implements ThreadFactory, Function { + + private final ThreadFactory factory = Thread.ofVirtual().factory(); + + private final String name; + + public VirtualThreadFactory(String name) { + this.name = name; + } + + @Override + public ThreadFactory apply(String name) { + return new VirtualThreadFactory(name); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = factory.newThread(r); + if (name != null) { + t.setName(name); + } + return t; + } + } +} diff --git a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java index 5b5b8602d..183f049ee 100644 --- a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java @@ -64,6 +64,6 @@ public class SncpSleepTest { System.out.println("耗时: " + e + " ms"); server.shutdown(); workExecutor.shutdown(); - Assertions.assertTrue(e < 660); + Assertions.assertTrue(e < 600); } }