This commit is contained in:
redkale
2023-09-30 09:12:38 +08:00
parent 1eb153f51f
commit dbf4c2e8ca
16 changed files with 288 additions and 128 deletions

View File

@@ -75,12 +75,24 @@
-->
<mq name="" type="org.redkalex.mq.kafka.KafkaMessageAgent">
<servers value="127.0.0.1:9101"/>
<consumer>
<!--
加载所有的MessageConsumer实例;
autoload="true" 默认值. 自动加载classpath下所有的MessageConsumer类
autoload="false" 需要显著的指定MessageConsumer类
includes 当autoload="true" 拉取类名与includes中的正则表达式匹配的类, 多个正则表达式用分号;隔开
excludes 当autoload="true" 排除类名与excludes中的正则表达式匹配的类, 多个正则表达式用分号;隔开
-->
<consumer autoload="true" includes="" excludes=""/>
<!--
MQ实现方的配置项
type: 配置项类型值只能是consumer或producer
-->
<config type="consumer">
<property name="xxxxxx" value="XXXXXXXX"/>
</consumer>
<producer>
</config>
<config type="producer">
<property name="xxxxxx" value="XXXXXXXX"/>
</producer>
</config>
</mq>
<!--

View File

@@ -250,6 +250,20 @@ public final class Application {
//Server根ClassLoader
private final RedkaleClassLoader serverClassLoader;
/**
* 初始化步骤: <br>
* 1、基本环境变量设置 <br>
* 2、ClassLoader初始化 <br>
* 3、日志配置初始化 <br>
* 4、本地和远程配置文件读取 <br>
* 5、ClusterAgent和MessageAgent实例化 <br>
* 6、Work线程池初始化
* 7、原生sql解析器初始化 <br>
*
* @param singletonMode 是否测试模式
* @param compileMode 是否编译模式
* @param config 启动配置
*/
@SuppressWarnings("UseSpecificCatch") //config: 不带redkale的配置项
Application(final boolean singletonMode, boolean compileMode, final AnyValue config) {
this.singletonMode = singletonMode;
@@ -322,11 +336,11 @@ public final class Application {
System.setProperty("redkale.convert.writer.buffer.defsize", "4096");
}
sysProperties.forEach((key, value) -> {
System.setProperty(key.toString(), replaceValue(value.toString(), sysProperties));
System.setProperty(key.toString(), getPropertyValue(value.toString(), sysProperties));
});
}
String localaddr = replaceValue(config.getValue("address", "").trim());
String localaddr = getPropertyValue(config.getValue("address", "").trim());
InetAddress addr = localaddr.isEmpty() ? Utility.localInetAddress() : new InetSocketAddress(localaddr, config.getIntValue("port")).getAddress();
this.localAddress = new InetSocketAddress(addr, config.getIntValue("port"));
this.resourceFactory.register(RESNAME_APP_ADDR, addr.getHostAddress());
@@ -392,7 +406,7 @@ public final class Application {
properties0.load(fin);
fin.close();
Properties logProps = new Properties();
properties0.forEach((k, v) -> logProps.put(k.toString(), replaceValue(v.toString(), properties0)));
properties0.forEach((k, v) -> logProps.put(k.toString(), getPropertyValue(v.toString(), properties0)));
reconfigLogging(logProps);
} catch (IOException e) {
Logger.getLogger(this.getClass().getSimpleName()).log(Level.WARNING, "init logger configuration error", e);
@@ -428,7 +442,7 @@ public final class Application {
+ RESNAME_APP_CONF_DIR + " = " + confDir.substring(confDir.indexOf('!') + 1));
if (!compileMode && !(classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader)) {
String lib = replaceValue(config.getValue("lib", "${APP_HOME}/libs/*").trim());
String lib = getPropertyValue(config.getValue("lib", "${APP_HOME}/libs/*").trim());
lib = lib.isEmpty() ? confDir : (lib + ";" + confDir);
Server.loadLib(classLoader, logger, lib);
}
@@ -462,7 +476,7 @@ public final class Application {
AnyValue clusterConf = config.getAnyValue("cluster");
if (clusterConf != null) {
try {
String classVal = replaceValue(clusterConf.getValue("type", clusterConf.getValue("value"))); //兼容value字段
String classVal = getPropertyValue(clusterConf.getValue("type", clusterConf.getValue("value"))); //兼容value字段
if (classVal == null || classVal.isEmpty() || classVal.indexOf('.') < 0) { //不包含.表示非类名,比如值: consul, nacos
Iterator<ClusterAgentProvider> it = ServiceLoader.load(ClusterAgentProvider.class, classLoader).iterator();
RedkaleClassLoader.putServiceLoader(ClusterAgentProvider.class);
@@ -509,7 +523,7 @@ public final class Application {
Set<String> mqnames = new HashSet<>();
for (int i = 0; i < mqConfs.length; i++) {
AnyValue mqConf = mqConfs[i];
String names = replaceValue(mqConf.getValue("name")); //含,或者;表示多个别名使用同一mq对象
String names = getPropertyValue(mqConf.getValue("name")); //含,或者;表示多个别名使用同一mq对象
if (names != null && !names.isEmpty()) {
for (String n : names.replace(',', ';').split(";")) {
if (n.trim().isEmpty()) {
@@ -522,7 +536,7 @@ public final class Application {
}
}
try {
String classVal = replaceValue(mqConf.getValue("type", mqConf.getValue("value"))); //兼容value字段
String classVal = getPropertyValue(mqConf.getValue("type", mqConf.getValue("value"))); //兼容value字段
if (classVal == null || classVal.isEmpty() || classVal.indexOf('.') < 0) { //不包含.表示非类名,比如值: kafka, pulsar
Iterator<MessageAgentProvider> it = ServiceLoader.load(MessageAgentProvider.class, classLoader).iterator();
RedkaleClassLoader.putServiceLoader(MessageAgentProvider.class);
@@ -800,7 +814,7 @@ public final class Application {
if (!dyncProps.isEmpty()) {
Properties newDyncProps = new Properties();
dyncProps.forEach((k, v) -> newDyncProps.put(k.toString(), replaceValue(v.toString(), dyncProps)));
dyncProps.forEach((k, v) -> newDyncProps.put(k.toString(), getPropertyValue(v.toString(), dyncProps)));
//合并配置
this.config.merge(AnyValue.loadFromProperties(newDyncProps).getAnyValue("redkale"), NodeServer.appConfigmergeFunction);
newDyncProps.forEach((key, val) -> {
@@ -834,7 +848,7 @@ public final class Application {
if (key == null) {
continue;
}
value = value == null ? value : replaceValue(value, dyncProps);
value = value == null ? value : getPropertyValue(value, dyncProps);
if (key.startsWith("system.property.")) {
String propName = key.substring("system.property.".length());
if (System.getProperty(propName) == null) { //命令行传参数优先级高
@@ -1328,6 +1342,10 @@ public final class Application {
}
try {
DataSource source = AbstractDataSource.createDataSource(serverClassLoader, resourceFactory, sourceConf, sourceName, compileMode);
if (!compileMode && source instanceof Service) {
resourceFactory.inject(sourceName, source);
((Service) source).init(sourceConf);
}
dataSources.add(source);
if (source instanceof DataMemorySource && source instanceof SearchSource) {
resourceFactory.register(sourceName, SearchSource.class, source);
@@ -1606,33 +1624,12 @@ public final class Application {
runServers(timecd, others);
runServers(timecd, watchs); //必须在所有服务都启动后再启动WATCH服务
timecd.await();
if (this.messageAgents != null) {
this.startMessageAgent();
}
if (this.clusterAgent != null) {
this.clusterAgent.start();
}
if (this.messageAgents != null) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "MessageAgent starting");
}
long s = System.currentTimeMillis();
final StringBuffer sb = new StringBuffer();
Set<String> names = new HashSet<>();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
Map<String, Long> map = agent.start().join();
AtomicInteger maxlen = new AtomicInteger();
map.keySet().forEach(str -> {
if (str.length() > maxlen.get()) {
maxlen.set(str.length());
}
});
new TreeMap<>(map).forEach((topic, ms) -> sb.append("MessageConsumer(topic=").append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n")
);
}
if (sb.length() > 0) {
logger.info(sb.toString().trim());
}
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") start in " + (System.currentTimeMillis() - s) + " ms");
}
for (ApplicationListener listener : this.listeners) {
listener.postStart(this);
@@ -1649,6 +1646,90 @@ public final class Application {
}
}
private void startMessageAgent() throws Exception {
if (this.messageAgents == null) {
return;
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, MessageAgent.class.getSimpleName() + " starting");
}
long s = System.currentTimeMillis();
final StringBuffer sb = new StringBuffer();
Set<String> names = new HashSet<>();
ResourceFactory resourceFactory = ResourceFactory.create();
List<ResourceFactory> factorys = new ArrayList<>();
for (NodeServer ns : this.servers) {
factorys.add(ns.getResourceFactory());
}
resourceFactory.register(new ResourceTypeLoader() {
@Override
public Object load(ResourceFactory factory, String srcResourceName, Object srcObj, String resourceName, Field field, Object attachment) {
for (ResourceFactory f : factorys) {
Object val = f.find(resourceName, field.getGenericType());
if (val != null) {
return val;
}
}
return null;
}
@Override
public boolean autoNone() {
return false;
}
}, Object.class);
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
List<MessageConsumer> consumers = new ArrayList<>();
AnyValue consumerConf = agent.getConfig().getAnyValue("consumer");
if (consumerConf != null) { //加载 MessageConsumer
ClassFilter filter = new ClassFilter(this.serverClassLoader, ResourceConsumer.class, MessageConsumer.class, null, null);
if (consumerConf.getBoolValue("autoload", true)) {
String includes = consumerConf.getValue("includes", "");
String excludes = consumerConf.getValue("excludes", "");
filter.setIncludePatterns(includes.split(";"));
filter.setExcludePatterns(excludes.split(";"));
} else {
filter.setRefused(true);
}
loadClassesByFilters(null, filter);
List<FilterEntry<? extends MessageConsumer>> entrys = new ArrayList(filter.getFilterEntrys());
for (FilterEntry<? extends MessageConsumer> en : entrys) {
Class<? extends MessageConsumer> clazz = en.getType();
ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class);
if (!Objects.equals(agent.getName(), getPropertyValue(res.mq()))) {
continue;
}
RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, clazz.getName());
final MessageConsumer consumer = clazz.getDeclaredConstructor().newInstance();
resourceFactory.inject(consumer);
consumers.add(consumer);
}
for (MessageConsumer consumer : consumers) {
consumer.init(consumerConf);
}
}
Map<String, Long> map = agent.start(consumers);
AtomicInteger maxlen = new AtomicInteger();
map.keySet().forEach(str -> {
if (str.length() > maxlen.get()) {
maxlen.set(str.length());
}
});
new TreeMap<>(map).forEach((topic, ms) -> sb.append(MessageClientConsumer.class.getSimpleName()).append("(topic=")
.append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n")
);
}
if (sb.length() > 0) {
logger.info(sb.toString().trim());
}
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") start in " + (System.currentTimeMillis() - s) + " ms");
}
void loadClassesByFilters(String excludelibs, final ClassFilter... filters) throws IOException {
ClassFilter.Loader.load(getHome(), this.serverClassLoader, ((this.excludelibs != null ? (this.excludelibs + ";") : "") + (excludelibs == null ? "" : excludelibs)).split(";"), filters);
}
private static String alignString(String value, int maxlen) {
StringBuilder sb = new StringBuilder(maxlen);
sb.append(value);
@@ -1988,7 +2069,7 @@ public final class Application {
System.exit(0); //必须要有
}
private String replaceValue(String value, Properties... envs) {
public String getPropertyValue(String value, Properties... envs) {
if (value == null || value.isBlank()) {
return value;
}
@@ -2015,9 +2096,9 @@ public final class Application {
list.add(this.clusterProperties);
list.add(this.loggingProperties);
list.add(this.messageProperties);
for (Properties prop : envs) {
for (Properties prop : list) {
if (prop.containsKey(key)) {
newVal = replaceValue(prop.getProperty(key), envs);
newVal = getPropertyValue(prop.getProperty(key), envs);
break;
}
}
@@ -2028,7 +2109,7 @@ public final class Application {
if (newVal == null) {
throw new RedkaleException("Not found '" + key + "' value");
}
return replaceValue(val.substring(0, pos2) + newVal + val.substring(pos2 + 1), envs);
return getPropertyValue(val.substring(0, pos2) + newVal + val.substring(pos2 + 1), envs);
} else if ((pos1 >= 0 && pos2 < 0) || (pos1 < 0 && pos2 >= 0)) {
throw new RedkaleException(value + " is illegal naming");
}
@@ -2499,7 +2580,7 @@ public final class Application {
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
agent.stop().join();
agent.stop();
}
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms");
}

View File

@@ -16,10 +16,10 @@ import java.util.function.Predicate;
import java.util.jar.*;
import java.util.logging.*;
import java.util.regex.Pattern;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.annotation.AutoLoad;
import org.redkale.util.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
/**
* class过滤器 符合条件的class会保留下来存入FilterEntry。
@@ -76,10 +76,10 @@ public final class ClassFilter<T> {
this.classLoader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader;
}
public static ClassFilter create(RedkaleClassLoader classLoader, Class[] excludeSuperClasses, String includeregs, String excluderegs, Set<String> includeValues, Set<String> excludeValues) {
public static ClassFilter create(RedkaleClassLoader classLoader, Class[] excludeSuperClasses, String includeRegxs, String excludeRegxs, Set<String> includeValues, Set<String> excludeValues) {
ClassFilter filter = new ClassFilter(classLoader, null, null, excludeSuperClasses);
filter.setIncludePatterns(includeregs == null ? null : includeregs.split(";"));
filter.setExcludePatterns(excluderegs == null ? null : excluderegs.split(";"));
filter.setIncludePatterns(includeRegxs == null ? null : includeRegxs.split(";"));
filter.setExcludePatterns(excludeRegxs == null ? null : excludeRegxs.split(";"));
filter.setPrivilegeIncludes(includeValues);
filter.setPrivilegeExcludes(excludeValues);
return filter;
@@ -165,45 +165,45 @@ public final class ClassFilter<T> {
* 过滤指定的class
*
* @param property application.xml中对应class节点下的property属性项
* @param clazzname class名称
* @param clazzName class名称
* @param autoscan 为true表示自动扫描的 false表示显著调用filter AutoLoad的注解将被忽略
*/
public final void filter(AnyValue property, String clazzname, boolean autoscan) {
filter(property, clazzname, autoscan, null);
public final void filter(AnyValue property, String clazzName, boolean autoscan) {
filter(property, clazzName, autoscan, null);
}
/**
* 过滤指定的class
*
* @param property application.xml中对应class节点下的property属性项
* @param clazzname class名称
* @param clazzName class名称
* @param autoScan 为true表示自动扫描的 false表示显著调用filter AutoLoad的注解将被忽略
* @param url URL
*/
public final void filter(AnyValue property, String clazzname, boolean autoScan, URL url) {
boolean r = accept0(property, clazzname);
public final void filter(AnyValue property, String clazzName, boolean autoScan, URL url) {
boolean r = accept0(property, clazzName);
ClassFilter cf = r ? this : null;
if (r && ands != null) {
for (ClassFilter filter : ands) {
if (!filter.accept(property, clazzname)) {
if (!filter.accept(property, clazzName)) {
return;
}
}
}
if (!r && ors != null) {
for (ClassFilter filter : ors) {
if (filter.accept(filter.conf, clazzname)) {
if (filter.accept(filter.conf, clazzName)) {
cf = filter;
property = cf.conf;
break;
}
}
}
if (cf == null || clazzname.startsWith("sun.") || clazzname.contains("module-info")) {
if (cf == null || clazzName.startsWith("sun.") || clazzName.contains("module-info")) {
return;
}
try {
Class clazz = classLoader.loadClass(clazzname);
Class clazz = classLoader.loadClass(clazzName);
if (!cf.accept(property, clazz, autoScan)) {
return;
}
@@ -222,17 +222,17 @@ public final class ClassFilter<T> {
AutoLoad auto = (AutoLoad) clazz.getAnnotation(AutoLoad.class);
org.redkale.util.AutoLoad auto2 = (org.redkale.util.AutoLoad) clazz.getAnnotation(org.redkale.util.AutoLoad.class);
if ((expectPredicate != null && expectPredicate.test(clazzname)) || (autoScan && auto != null && !auto.value())
if ((expectPredicate != null && expectPredicate.test(clazzName)) || (autoScan && auto != null && !auto.value())
|| (autoScan && auto2 != null && !auto2.value())) { //自动扫描且被标记为@AutoLoad(false)的
expectEntrys.add(new FilterEntry(clazz, autoScan, true, property));
} else {
entrys.add(new FilterEntry(clazz, autoScan, false, property));
}
} catch (Throwable cfe) {
if (logger.isLoggable(Level.FINEST) && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.") && !clazzname.startsWith("META-INF")
&& !clazzname.startsWith("com.mysql.") && !clazzname.startsWith("com.microsoft.") && !clazzname.startsWith("freemarker.")
&& !clazzname.startsWith("org.redkale") && (clazzname.contains("Service") || clazzname.contains("Servlet"))) {
if (logger.isLoggable(Level.FINEST) && !clazzName.startsWith("sun.") && !clazzName.startsWith("javax.")
&& !clazzName.startsWith("com.sun.") && !clazzName.startsWith("jdk.") && !clazzName.startsWith("META-INF")
&& !clazzName.startsWith("com.mysql.") && !clazzName.startsWith("com.microsoft.") && !clazzName.startsWith("freemarker.")
&& !clazzName.startsWith("org.redkale") && (clazzName.contains("Service") || clazzName.contains("Servlet"))) {
if (cfe instanceof NoClassDefFoundError) {
String msg = ((NoClassDefFoundError) cfe).getMessage();
if (msg.startsWith("java.lang.NoClassDefFoundError: java") || msg.startsWith("javax/")) {
@@ -240,7 +240,7 @@ public final class ClassFilter<T> {
}
}
//&& (!(cfe instanceof NoClassDefFoundError) || (cfe instanceof UnsupportedClassVersionError) || ((NoClassDefFoundError) cfe).getMessage().startsWith("java.lang.NoClassDefFoundError: java"))) {
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error for class: " + clazzname + (url == null ? "" : (" in " + url)), cfe);
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error for class: " + clazzName + (url == null ? "" : (" in " + url)), cfe);
}
}
}
@@ -601,7 +601,7 @@ public final class ClassFilter<T> {
continue;
}
//常见的jar跳过
if (classname.startsWith("com.redkaledyn.")) {
if (classname.startsWith("org.redkaledyn.")) {
break; //redkale动态生成的类
}
if (classname.startsWith("com.mysql.")) {

View File

@@ -14,15 +14,15 @@ import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.*;
import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Command;
import org.redkale.annotation.*;
import static org.redkale.boot.Application.*;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.ClusterAgent;
import org.redkale.mq.*;
import org.redkale.net.Filter;
import org.redkale.net.*;
import org.redkale.net.Filter;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
@@ -212,7 +212,7 @@ public abstract class NodeServer {
filters.addAll(otherFilters);
}
long s = System.currentTimeMillis();
ClassFilter.Loader.load(application.getHome(), serverClassLoader, ((application.excludelibs != null ? (application.excludelibs + ";") : "") + serverConf.getValue("excludelibs", "")).split(";"), filters.toArray(new ClassFilter[filters.size()]));
application.loadClassesByFilters(serverConf.getValue("excludelibs", ""), filters.toArray(new ClassFilter[filters.size()]));
long e = System.currentTimeMillis() - s;
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(serviceFilter); //必须在servlet之前

View File

@@ -77,7 +77,7 @@ public abstract class MessageAgent implements Resourcable {
protected final ReentrantLock producerLock = new ReentrantLock();
protected final ReentrantLock nodesLock = new ReentrantLock();
protected final ReentrantLock serviceLock = new ReentrantLock();
protected final List<MessageConsumer> consumerListeners = new CopyOnWriteArrayList<>();
@@ -122,29 +122,34 @@ public abstract class MessageAgent implements Resourcable {
this.timeoutExecutor.setRemoveOnCancelPolicy(true);
}
public CompletableFuture<Map<String, Long>> start() {
public Map<String, Long> start(List<MessageConsumer> consumers) {
this.consumerListeners.addAll(consumers);
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
final List<CompletableFuture> futures = new ArrayList<>();
this.clientConsumerNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.consumerid, System.currentTimeMillis() - s)));
node.consumer.startup();
long e = System.currentTimeMillis() - s;
map.put(node.consumer.consumerid, e);
});
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(r -> map);
return map;
}
//Application.shutdown 在执行server.shutdown之前执行
public CompletableFuture<Void> stop() {
List<CompletableFuture> futures = new ArrayList<>();
public void stop() {
this.clientConsumerNodes.values().forEach(node -> {
futures.add(node.consumer.shutdown());
node.consumer.shutdown();
});
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
}
//Application.shutdown 在所有server.shutdown执行后执行
public void destroy(AnyValue config) {
this.httpMessageClient.close().join();
this.sncpMessageClient.close().join();
this.httpMessageClient.close();
this.sncpMessageClient.close();
for (MessageConsumer consumer : consumerListeners) {
consumer.destroy(config);
}
this.consumerListeners.clear();
if (this.timeoutExecutor != null) {
this.timeoutExecutor.shutdown();
}
@@ -374,7 +379,7 @@ public abstract class MessageAgent implements Resourcable {
}
String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service);
nodesLock.lock();
serviceLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
@@ -382,7 +387,7 @@ public abstract class MessageAgent implements Resourcable {
HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor)));
} finally {
nodesLock.unlock();
serviceLock.unlock();
}
}
@@ -397,7 +402,7 @@ public abstract class MessageAgent implements Resourcable {
}
String topic = generateSncpReqTopic(service);
String consumerid = generateSncpConsumerid(topic, service);
nodesLock.lock();
serviceLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
@@ -405,7 +410,7 @@ public abstract class MessageAgent implements Resourcable {
SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor)));
} finally {
nodesLock.unlock();
serviceLock.unlock();
}
}
@@ -466,6 +471,46 @@ public abstract class MessageAgent implements Resourcable {
}
protected static class MessageConsumerWrapper {
private final MessageConsumer consumer;
public MessageConsumerWrapper(MessageConsumer consumer) {
Objects.requireNonNull(consumer);
this.consumer = consumer;
}
public void init(AnyValue config) {
consumer.init(config);
}
public void onMessage(MessageConext context, Object[] messages) {
consumer.onMessage(context, messages);
}
public void destroy(AnyValue config) {
consumer.destroy(config);
}
@Override
public int hashCode() {
return Objects.hashCode(this.consumer);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final MessageConsumerWrapper other = (MessageConsumerWrapper) obj;
return Objects.equals(this.consumer.getClass(), other.consumer.getClass());
}
}
protected static class ConvertMessageProducer implements MessageProducer {
private final MessageProducer producer;

View File

@@ -49,11 +49,10 @@ public abstract class MessageClient {
this.clazzName = getClass().getSimpleName();
}
protected CompletableFuture<Void> close() {
if (this.respConsumer == null) {
return CompletableFuture.completedFuture(null);
protected void close() {
if (this.respConsumer != null) {
this.respConsumer.shutdown();
}
return this.respConsumer.shutdown();
}
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp) {
@@ -101,7 +100,7 @@ public abstract class MessageClient {
};
long ones = System.currentTimeMillis();
MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{appRespTopic}, appRespConsumerid, processor);
one.startup().join();
one.startup();
long onee = System.currentTimeMillis() - ones;
if (finest) {
messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.startup " + onee + "ms ");

View File

@@ -6,7 +6,6 @@
package org.redkale.mq;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
/**
@@ -53,9 +52,9 @@ public abstract class MessageClientConsumer {
return topics;
}
public abstract CompletableFuture<Void> startup();
public abstract void startup();
public abstract CompletableFuture<Void> shutdown();
public abstract void shutdown();
public boolean isClosed() {
return closed;

View File

@@ -354,7 +354,7 @@ public final class Rest {
if (!valid) {
throw new RestException("Rest WebSocket Class(" + webSocketType + ") must have public or protected Constructor on createRestWebSocketServlet");
}
final String rwsname = ResourceFactory.formatResourceName(rws.name());
final String rwsname = ResourceFactory.getResourceName(rws.name());
if (!checkName(rws.catalog())) {
throw new RestException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
}

View File

@@ -12,9 +12,9 @@ import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.stream.Stream;
import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Comment;
import org.redkale.annotation.*;
import org.redkale.annotation.ResourceListener;
import org.redkale.annotation.ResourceType;
import static org.redkale.boot.Application.RESNAME_APP_EXECUTOR;
@@ -309,6 +309,21 @@ public abstract class AbstractDataSource extends AbstractService implements Data
return executor;
}
protected String executorToString() {
ExecutorService executor = this.sourceExecutor;
if (executor == null) {
return "";
}
if (executor.getClass().getSimpleName().contains("ThreadPerTaskExecutor")) {
return ", thread-pool=[virtual]";
}
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor re = (ThreadPoolExecutor) executor;
return ", pool-size=" + re.getMaximumPoolSize();
}
return "";
}
/**
* 是否虚拟化的持久对象
*

View File

@@ -14,8 +14,8 @@ import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceListener;
import org.redkale.annotation.ResourceType;
import static org.redkale.boot.Application.*;
@@ -304,7 +304,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement
if (pos > 0) {
url = url.substring(0, pos) + "...";
}
return getClass().getSimpleName() + "{url=" + url + ", maxconns=" + readMaxConns() + "}";
return getClass().getSimpleName() + "{url=" + url + ", maxconns=" + readMaxConns() + executorToString() + "}";
} else {
String readUrl = readConfProps.getProperty(DATA_SOURCE_URL);
int pos = readUrl.indexOf('?');
@@ -316,7 +316,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement
if (pos > 0) {
writeUrl = writeUrl.substring(0, pos) + "...";
}
return getClass().getSimpleName() + "{read-url=" + readUrl + ", read-maxconns=" + readMaxConns() + ",write-url=" + writeUrl + ", write-maxconns=" + writeMaxConns() + "}";
return getClass().getSimpleName() + "{read-url=" + readUrl + ", read-maxconns=" + readMaxConns() + executorToString() + ",write-url=" + writeUrl + ", write-maxconns=" + writeMaxConns() + "}";
}
}

View File

@@ -13,8 +13,8 @@ import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.Level;
import java.util.stream.Stream;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceListener;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
@@ -2789,7 +2789,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected static class DataJdbcResultSet implements DataResultSet {
EntityInfo info;
final EntityInfo info;
ResultSet rr;
@@ -2913,7 +2913,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected int maxConns;
protected Semaphore maxSemaphore;
protected Semaphore newSemaphore;
protected String url;
@@ -2928,7 +2928,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
defMaxConns = Math.min(1000, Utility.cpus() * 100);
}
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns)));
this.maxSemaphore = new Semaphore(this.maxConns);
this.newSemaphore = new Semaphore(this.maxConns);
this.queue = new ArrayBlockingQueue<>(maxConns);
this.url = prop.getProperty(DATA_SOURCE_URL);
String username = prop.getProperty(DATA_SOURCE_USER, "");
@@ -3011,10 +3011,10 @@ public class DataJdbcSource extends AbstractDataSqlSource {
private void changeMaxConns(int newMaxconns) {
ArrayBlockingQueue<SourceConnection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<SourceConnection> oldQueue = this.queue;
Semaphore oldSemaphore = this.maxSemaphore;
Semaphore oldSemaphore = this.newSemaphore;
this.queue = newQueue;
this.maxConns = newMaxconns;
this.maxSemaphore = new Semaphore(this.maxConns);
this.newSemaphore = new Semaphore(this.maxConns);
SourceConnection c;
while ((c = oldQueue.poll()) != null) {
c.version = -1;
@@ -3056,12 +3056,13 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
private SourceConnection newConnection(ArrayBlockingQueue<SourceConnection> queue) {
Semaphore semaphore = this.maxSemaphore;
Semaphore semaphore = this.newSemaphore;
SourceConnection conn = null;
if (semaphore.tryAcquire()) {
try {
conn = new SourceConnection(driver.connect(url, connectAttrs), this.urlVersion.get());
} catch (SQLException ex) {
semaphore.release();
throw new SourceException(ex);
}
usingCounter.increment();
@@ -3071,21 +3072,21 @@ public class DataJdbcSource extends AbstractDataSqlSource {
try {
conn = queue.poll(connectTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException t) {
logger.log(Level.WARNING, "take pooled connection error", t);
logger.log(Level.WARNING, "take pooled jdbc connection error", t);
}
if (conn == null) {
throw new SourceException("create pooled connection timeout");
throw new SourceException("create pooled jdbc connection timeout");
}
return conn;
}
}
public <C> void offerConnection(final C connection) {
offerConnection(connection, this.maxSemaphore, this.queue);
offerConnection(connection, this.newSemaphore, this.queue);
}
public <C> void offerTransConnection(final C connection) {
offerConnection(connection, this.maxSemaphore, this.queue);
offerConnection(connection, this.newSemaphore, this.queue);
}
private <C> void offerConnection(final C connection, Semaphore semaphore, Queue<SourceConnection> queue) {

View File

@@ -183,6 +183,15 @@ public class EntityBuilder<T> {
unconstructorAttributes, attributeMap, queryAttrs.toArray(new Attribute[queryAttrs.size()]));
}
/**
* 将数据ResultSet转成对象集合
*
* @param <T> 泛型
* @param type 实体类或JavaBean
* @param rset 数据ResultSet
*
* @return 对象集合
*/
public static <T> List<T> getListValue(Class<T> type, final DataResultSet rset) {
if (type == byte[].class || type == String.class || type.isPrimitive() || Number.class.isAssignableFrom(type)
|| (!Map.class.isAssignableFrom(type) && type.getName().startsWith("java."))) {
@@ -195,6 +204,15 @@ public class EntityBuilder<T> {
return EntityBuilder.load(type).getObjectList(rset);
}
/**
* 将数据ResultSet转成单个对象
*
* @param <T> 泛型
* @param type 实体类或JavaBean
* @param rset 数据ResultSet
*
* @return 单个对象
*/
public static <T> T getOneValue(Class<T> type, final DataResultSet rset) {
if (!rset.next()) {
return null;
@@ -397,19 +415,17 @@ public class EntityBuilder<T> {
Object[] cps = new Object[this.constructorParameters.length];
for (int i = 0; i < constructorAttrs.length; i++) {
Attribute<T, Serializable> attr = constructorAttrs[i];
if (attr == null) {
continue;
if (attr != null) {
cps[i] = values[++index];
}
cps[i] = values[++index];
}
obj = creator.create(cps);
}
if (unconstructorAttrs != null) {
for (Attribute<T, Serializable> attr : unconstructorAttrs) {
if (attr == null) {
continue;
if (attr != null) {
attr.set(obj, values[++index]);
}
attr.set(obj, values[++index]);
}
}
return obj;

View File

@@ -1677,7 +1677,7 @@ public final class EntityInfo<T> {
&& !value.getClass().getName().startsWith("java.sql.") && !value.getClass().getName().startsWith("java.time.")) {
return new StringBuilder().append('\'').append(jsonConvert.convertTo(value).replace("'", "\\'")).append('\'').toString();
}
return String.valueOf(value);
return value.toString();
}
@Override

View File

@@ -5,9 +5,9 @@
*/
package org.redkale.source;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
/**
* 过滤字段标记
@@ -56,12 +56,6 @@ public @interface FilterColumn {
*/
FilterExpress express() default FilterExpress.EQUAL;
/**
* 当标记的字段类型是数组/Collection类型且express不是IN/NOTIN时则构建过滤条件时会遍历字段值的元素来循环构建表达式元素之间的关系是AND或OR由该值来确定
*
* @return 数组元素间的表达式是否AND关系
*/
boolean itemand() default true;
/**
* 判断字段是否必需for OpenAPI Specification 3.1.0

View File

@@ -776,11 +776,11 @@ public final class ResourceFactory {
return inject(srcResourceName, srcObj, attachment, consumer, new ArrayList());
}
public static String formatResourceName(String name) {
return formatResourceName(null, name);
public static String getResourceName(String name) {
return getResourceName(null, name);
}
public static String formatResourceName(String parent, String name) {
public static String getResourceName(String parent, String name) {
if (name == null) {
return null;
}
@@ -796,7 +796,7 @@ public final class ResourceFactory {
}
String postfix = subName.substring(pos + 1);
String property = subName.substring(0, pos);
return formatResourceName(parent, prefix + System.getProperty(property, "") + postfix);
return getResourceName(parent, prefix + System.getProperty(property, "") + postfix);
}
private <T> boolean inject(String srcResourceName, final Object srcObj, final T attachment, final BiConsumer<Object, Field> consumer, final List<Object> list) {
@@ -899,7 +899,7 @@ public final class ResourceFactory {
}
boolean autoRegNull = true;
final String rcname = formatResourceName(srcResourceName, tname);
final String rcname = getResourceName(srcResourceName, tname);
Object rs = null;
if (rcname.startsWith("system.property.")) {
rs = System.getProperty(rcname.substring("system.property.".length()));
@@ -1027,9 +1027,7 @@ public final class ResourceFactory {
}
public <T extends Annotation> void register(final ResourceAnnotationProvider<T> loader) {
if (loader == null) {
return;
}
Objects.requireNonNull(loader);
parentRoot().resAnnotationProviderMap.put(loader.annotationType(), loader);
}

File diff suppressed because one or more lines are too long