diff --git a/src/main/java/org/redkale/annotation/Component.java b/src/main/java/org/redkale/annotation/Component.java new file mode 100644 index 000000000..069385b86 --- /dev/null +++ b/src/main/java/org/redkale/annotation/Component.java @@ -0,0 +1,24 @@ +package org.redkale.annotation; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.*; + +/** + * 标记Component的Service类特点:
+ * 1、直接构造, 不使用Sncp动态构建对象
+ * 2、不会生成对应协议的Servlet
+ * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ +@Inherited +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +public @interface Component { + +} diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 27990858e..bc3c1f957 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -21,7 +21,7 @@ import org.redkale.annotation.*; import static org.redkale.boot.Application.*; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.cluster.ClusterAgent; -import org.redkale.mq.MessageAgent; +import org.redkale.mq.*; import org.redkale.net.Filter; import org.redkale.net.*; import org.redkale.net.client.ClientAddress; @@ -297,7 +297,9 @@ public abstract class NodeServer { } //ResourceFactory resfactory = (isSNCP() ? appResFactory : resourceFactory); - Service service = Modifier.isFinal(resServiceType.getModifiers()) ? (Service) resServiceType.getConstructor().newInstance() : Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, appResFactory, application.getSncpRpcGroups(), sncpClient, null, null, null); + Service service = Modifier.isFinal(resServiceType.getModifiers()) || resServiceType.getAnnotation(Component.class) != null + ? (Service) resServiceType.getConstructor().newInstance() + : Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, appResFactory, application.getSncpRpcGroups(), sncpClient, null, null, null); appResFactory.register(resourceName, resServiceType, service); field.set(srcObj, service); @@ -459,9 +461,11 @@ public abstract class NodeServer { boolean isLocalGroup0 = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry); final String group = isLocalGroup0 ? null : entry.getGroup(); final boolean localMode = serviceImplClass.getAnnotation(Local.class) != null || isLocalGroup0;//本地模式 - if (localMode && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) { - continue; //本地模式不能实例化接口和抽象类的Service类 + if ((localMode || serviceImplClass.getAnnotation(Component.class) != null) + && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) { + continue; //本地模式或Component不能实例化接口和抽象类的Service类 } + final ResourceTypeLoader resourceLoader = (ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) -> { try { if (Sncp.loadMethodActions(Sncp.getResourceType(serviceImplClass)).isEmpty() @@ -475,7 +479,14 @@ public abstract class NodeServer { MessageAgent agent = getMessageAgent(entry.getProperty()); Service service; final boolean ws = srcObj instanceof WebSocketServlet; - if (ws || localMode) { //本地模式 + final boolean component = serviceImplClass.getAnnotation(Component.class) != null; + if (component) { //Component + RedkaleClassLoader.putReflectionPublicConstructors(serviceImplClass, serviceImplClass.getName()); + if (!acceptsComponent(serviceImplClass)) { + return null; + } + service = serviceImplClass.getDeclaredConstructor().newInstance(); + } else if (ws || localMode) { //本地模式 service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, rpcGroups, this.sncpClient, agent, group, entry.getProperty()); } else { service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, rpcGroups, this.sncpClient, agent, group, entry.getProperty()); @@ -496,7 +507,9 @@ public abstract class NodeServer { rf.inject(resourceName, service); //动态加载的Service也存在按需加载的注入资源 } localServices.add(service); - interceptorServices.add(service); + if (!component) { + interceptorServices.add(service); + } if (consumer != null) { consumer.accept(agent, service); } @@ -585,6 +598,17 @@ public abstract class NodeServer { slist.add(new StringBuilder().append(serstr).append(" load and init in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); } }); + localServices.stream().forEach(y -> { + if (y.getClass().getAnnotation(Component.class) != null) { + long s = System.currentTimeMillis(); + interceptComponent(y); + long e = System.currentTimeMillis() - s; + String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); + if (slist != null) { + slist.add(new StringBuilder().append(serstr).append(" component-start in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); + } + } + }); } if (slist != null && sb != null) { List wlist = new ArrayList<>(slist); //直接使用CopyOnWriteArrayList偶尔会出现莫名的异常(CopyOnWriteArrayList源码1185行) @@ -606,6 +630,28 @@ public abstract class NodeServer { maxTypeLength = Math.max(maxTypeLength, Sncp.getResourceType(y).getName().length() + 1); } + protected boolean acceptsComponent(Class serviceImplClass) { + if (MessageConsumerListener.class.isAssignableFrom(serviceImplClass)) { + MessageConsumer mqConsumer = serviceImplClass.getAnnotation(MessageConsumer.class); + if (mqConsumer == null) { + return false; + } + MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq()); + if (mqAgent == null) { + return false; + } + } + return true; + } + + protected void interceptComponent(Service service) { + if (service instanceof MessageConsumerListener) { + MessageConsumer mqConsumer = service.getClass().getAnnotation(MessageConsumer.class); + MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq()); + mqAgent.addConsumerListener((MessageConsumerListener) service); + } + } + protected MessageAgent getMessageAgent(AnyValue serviceConf) { MessageAgent agent = null; if (serviceConf != null && serviceConf.getValue("mq") != null) { diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 9f3fe6ea5..c0afca85f 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -55,6 +55,8 @@ public abstract class MessageAgent implements Resourcable { protected final ReentrantLock sncpNodesLock = new ReentrantLock(); + protected final List consumerListeners = new CopyOnWriteArrayList<>(); + protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); protected HttpMessageClient httpMessageClient; @@ -103,9 +105,6 @@ public abstract class MessageAgent implements Resourcable { this.timeoutExecutor.setRemoveOnCancelPolicy(true); } - @ResourceListener - public abstract void onResourceChange(ResourceEvent[] events); - public CompletableFuture> start() { final LinkedHashMap map = new LinkedHashMap<>(); final List futures = new ArrayList<>(); @@ -276,7 +275,7 @@ public abstract class MessageAgent implements Resourcable { } //创建指定topic的生产处理器 - protected abstract MessageClientProducer createMessageClientProducer(String name); + protected abstract MessageClientProducer createMessageClientProducer(String producerName); //创建topic,如果已存在则跳过 public abstract boolean createTopic(String... topics); @@ -293,6 +292,13 @@ public abstract class MessageAgent implements Resourcable { //创建指定topic的消费处理器 public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor); + @ResourceListener + public abstract void onResourceChange(ResourceEvent[] events); + + public void addConsumerListener(MessageConsumerListener listener) { + + } + public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) { diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java new file mode 100644 index 000000000..6973335df --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -0,0 +1,30 @@ +/* + * + */ +package org.redkale.mq; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.*; + +/** + * MQ资源注解 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +public @interface MessageConsumer { + + String mq(); + + String group() default ""; + + String[] topics(); +} diff --git a/src/main/java/org/redkale/mq/MessageConsumerListener.java b/src/main/java/org/redkale/mq/MessageConsumerListener.java new file mode 100644 index 000000000..a3379dd0e --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageConsumerListener.java @@ -0,0 +1,24 @@ +/* + * + */ +package org.redkale.mq; + +import org.redkale.annotation.Component; +import org.redkale.service.*; + +/** + * MQ资源注解 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +@Local +@Component +public interface MessageConsumerListener extends Service { + + public void onMessage(String topic, T message); +} diff --git a/src/main/java/org/redkale/mq/MessageProducer.java b/src/main/java/org/redkale/mq/MessageProducer.java new file mode 100644 index 000000000..3c55ec88f --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageProducer.java @@ -0,0 +1,26 @@ +/* + * + */ +package org.redkale.mq; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.*; + +/** + * MQ资源注解, 只能标记在MessageProducerSender类型字段上 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +@Documented +@Target({FIELD}) +@Retention(RUNTIME) +public @interface MessageProducer { + + String mq(); +} diff --git a/src/main/java/org/redkale/mq/MessageProducerSender.java b/src/main/java/org/redkale/mq/MessageProducerSender.java new file mode 100644 index 000000000..4eaa23849 --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageProducerSender.java @@ -0,0 +1,31 @@ +/* + * + */ +package org.redkale.mq; + +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import org.redkale.convert.Convert; + +/** + * MQ消息发送器 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public interface MessageProducerSender { + + public CompletableFuture send(String topic, Object value); + + default CompletableFuture send(String topic, Convert convert, Object value) { + return send(topic, convert.convertToBytes(value)); + } + + default CompletableFuture send(String topic, Convert convert, Type type, Object value) { + return send(topic, convert.convertToBytes(type, value)); + } +} diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index b073338e3..d5e29e5c9 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -9,6 +9,7 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.CompletableFuture; +import org.redkale.annotation.Component; import org.redkale.convert.Convert; import org.redkale.util.*; @@ -26,6 +27,7 @@ import org.redkale.util.*; * * @author zhangjx */ +@Component public interface CacheSource extends Resourcable { public String getType(); diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index e77260c34..76cd8cac9 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -218,7 +218,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Connection conn = null; try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); for (BatchAction action : dataBatch.actions) { if (action instanceof InsertBatchAction1) { @@ -314,7 +313,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Connection conn = null; try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); int c = insertDB(false, conn, info, entitys); conn.commit(); @@ -596,7 +594,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Connection conn = null; try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); int c = deleteDB(false, conn, info, tables, flipper, node, pkmap, sqls); conn.commit(); @@ -735,7 +732,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); int c; if (sqls.length == 1) { @@ -841,7 +837,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); int c; if (copyTableSql == null) { @@ -970,7 +965,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); int c; if (sqls.length == 1) { @@ -1069,7 +1063,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Connection conn = null; try { conn = writePool.pollConnection(); - conn.setReadOnly(false); conn.setAutoCommit(false); int c = updateEntityDB(false, conn, info, entitys); conn.commit(); @@ -1477,7 +1470,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); stmt = conn.createStatement(); ResultSet set = stmt.executeQuery(sql); if (set.next()) { @@ -1580,7 +1572,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); stmt = conn.createStatement(); Number rs = defVal; ResultSet set = stmt.executeQuery(sql); @@ -1671,7 +1662,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); stmt = conn.createStatement(); ResultSet set = stmt.executeQuery(sql); ResultSetMetaData rsd = set.getMetaData(); @@ -1759,7 +1749,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); stmt = conn.createStatement(); ResultSet set = stmt.executeQuery(sql); ResultSetMetaData rsd = set.getMetaData(); @@ -1908,7 +1897,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { PreparedStatement ps = null; try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); ps = conn.prepareStatement(sql); ps.setFetchSize(1); final DataResultSet set = createDataResultSet(info, ps.executeQuery()); @@ -1989,7 +1977,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { final Attribute attr = info.getAttribute(column); try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); ps = conn.prepareStatement(sql); ps.setFetchSize(1); final DataResultSet set = createDataResultSet(info, ps.executeQuery()); @@ -2075,7 +2062,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { PreparedStatement ps = null; try { conn = readPool.pollConnection(); - //conn.setReadOnly(true); ps = conn.prepareStatement(sql); final ResultSet set = ps.executeQuery(); boolean rs = set.next() ? (set.getInt(1) > 0) : false; @@ -2339,7 +2325,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { slowLog(s, listSql); return new Sheet<>(total, list); } else { - //conn.setReadOnly(true); PreparedStatement ps = conn.prepareStatement(listSql); if (flipper != null && flipper.getLimit() > 0) { ps.setFetchSize(flipper.getLimit()); @@ -2502,7 +2487,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); Connection conn = writePool.pollConnection(); try { - conn.setReadOnly(false); conn.setAutoCommit(false); final Statement stmt = conn.createStatement(); final int[] rs = new int[sqls.length]; @@ -2546,7 +2530,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (logger.isLoggable(Level.FINEST)) { logger.finest("direct query sql=" + sql); } - //conn.setReadOnly(true); final Statement statement = conn.createStatement(); //final PreparedStatement statement = conn.prepareStatement(sql); final ResultSet set = statement.executeQuery(sql);// ps.executeQuery(); @@ -2730,7 +2713,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } else if (workExecutor instanceof ThreadHashExecutor) { defMaxConns = ((ThreadHashExecutor) workExecutor).getCorePoolSize(); } else if (workExecutor != null) { //maybe virtual thread pool - defMaxConns = Math.min(1000, Utility.cpus() * 100); + defMaxConns = Math.min(1024, Utility.cpus() * 100); } this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns))); this.canNewSemaphore = new Semaphore(this.maxConns); diff --git a/src/main/java/org/redkale/source/DataSource.java b/src/main/java/org/redkale/source/DataSource.java index 0276f356b..298730c51 100644 --- a/src/main/java/org/redkale/source/DataSource.java +++ b/src/main/java/org/redkale/source/DataSource.java @@ -9,6 +9,7 @@ import java.io.Serializable; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; +import org.redkale.annotation.Component; import org.redkale.util.*; /** @@ -39,6 +40,7 @@ import org.redkale.util.*; * * @author zhangjx */ +@Component @SuppressWarnings("unchecked") public interface DataSource extends Resourcable {