增加MessageConsumer、MessageProducer功能

This commit is contained in:
redkale
2023-04-20 14:30:39 +08:00
parent 5e9c85e0fc
commit 698269f8b0
10 changed files with 202 additions and 28 deletions

View File

@@ -0,0 +1,24 @@
package org.redkale.annotation;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
/**
* 标记Component的Service类特点: <br>
* 1、直接构造, 不使用Sncp动态构建对象 <br>
* 2、不会生成对应协议的Servlet <br>
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*/
@Inherited
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface Component {
}

View File

@@ -21,7 +21,7 @@ import org.redkale.annotation.*;
import static org.redkale.boot.Application.*;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.ClusterAgent;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.*;
import org.redkale.net.Filter;
import org.redkale.net.*;
import org.redkale.net.client.ClientAddress;
@@ -297,7 +297,9 @@ public abstract class NodeServer {
}
//ResourceFactory resfactory = (isSNCP() ? appResFactory : resourceFactory);
Service service = Modifier.isFinal(resServiceType.getModifiers()) ? (Service) resServiceType.getConstructor().newInstance() : Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, appResFactory, application.getSncpRpcGroups(), sncpClient, null, null, null);
Service service = Modifier.isFinal(resServiceType.getModifiers()) || resServiceType.getAnnotation(Component.class) != null
? (Service) resServiceType.getConstructor().newInstance()
: Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, appResFactory, application.getSncpRpcGroups(), sncpClient, null, null, null);
appResFactory.register(resourceName, resServiceType, service);
field.set(srcObj, service);
@@ -459,9 +461,11 @@ public abstract class NodeServer {
boolean isLocalGroup0 = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry);
final String group = isLocalGroup0 ? null : entry.getGroup();
final boolean localMode = serviceImplClass.getAnnotation(Local.class) != null || isLocalGroup0;//本地模式
if (localMode && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) {
continue; //本地模式不能实例化接口和抽象类的Service类
if ((localMode || serviceImplClass.getAnnotation(Component.class) != null)
&& (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) {
continue; //本地模式或Component不能实例化接口和抽象类的Service类
}
final ResourceTypeLoader resourceLoader = (ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) -> {
try {
if (Sncp.loadMethodActions(Sncp.getResourceType(serviceImplClass)).isEmpty()
@@ -475,7 +479,14 @@ public abstract class NodeServer {
MessageAgent agent = getMessageAgent(entry.getProperty());
Service service;
final boolean ws = srcObj instanceof WebSocketServlet;
if (ws || localMode) { //本地模式
final boolean component = serviceImplClass.getAnnotation(Component.class) != null;
if (component) { //Component
RedkaleClassLoader.putReflectionPublicConstructors(serviceImplClass, serviceImplClass.getName());
if (!acceptsComponent(serviceImplClass)) {
return null;
}
service = serviceImplClass.getDeclaredConstructor().newInstance();
} else if (ws || localMode) { //本地模式
service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, rpcGroups, this.sncpClient, agent, group, entry.getProperty());
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, rpcGroups, this.sncpClient, agent, group, entry.getProperty());
@@ -496,7 +507,9 @@ public abstract class NodeServer {
rf.inject(resourceName, service); //动态加载的Service也存在按需加载的注入资源
}
localServices.add(service);
interceptorServices.add(service);
if (!component) {
interceptorServices.add(service);
}
if (consumer != null) {
consumer.accept(agent, service);
}
@@ -585,6 +598,17 @@ public abstract class NodeServer {
slist.add(new StringBuilder().append(serstr).append(" load and init in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString());
}
});
localServices.stream().forEach(y -> {
if (y.getClass().getAnnotation(Component.class) != null) {
long s = System.currentTimeMillis();
interceptComponent(y);
long e = System.currentTimeMillis() - s;
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
if (slist != null) {
slist.add(new StringBuilder().append(serstr).append(" component-start in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString());
}
}
});
}
if (slist != null && sb != null) {
List<String> wlist = new ArrayList<>(slist); //直接使用CopyOnWriteArrayList偶尔会出现莫名的异常(CopyOnWriteArrayList源码1185行)
@@ -606,6 +630,28 @@ public abstract class NodeServer {
maxTypeLength = Math.max(maxTypeLength, Sncp.getResourceType(y).getName().length() + 1);
}
protected boolean acceptsComponent(Class<? extends Service> serviceImplClass) {
if (MessageConsumerListener.class.isAssignableFrom(serviceImplClass)) {
MessageConsumer mqConsumer = serviceImplClass.getAnnotation(MessageConsumer.class);
if (mqConsumer == null) {
return false;
}
MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq());
if (mqAgent == null) {
return false;
}
}
return true;
}
protected void interceptComponent(Service service) {
if (service instanceof MessageConsumerListener) {
MessageConsumer mqConsumer = service.getClass().getAnnotation(MessageConsumer.class);
MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq());
mqAgent.addConsumerListener((MessageConsumerListener) service);
}
}
protected MessageAgent getMessageAgent(AnyValue serviceConf) {
MessageAgent agent = null;
if (serviceConf != null && serviceConf.getValue("mq") != null) {

View File

@@ -55,6 +55,8 @@ public abstract class MessageAgent implements Resourcable {
protected final ReentrantLock sncpNodesLock = new ReentrantLock();
protected final List<MessageConsumerListener> consumerListeners = new CopyOnWriteArrayList<>();
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
protected HttpMessageClient httpMessageClient;
@@ -103,9 +105,6 @@ public abstract class MessageAgent implements Resourcable {
this.timeoutExecutor.setRemoveOnCancelPolicy(true);
}
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public CompletableFuture<Map<String, Long>> start() {
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
final List<CompletableFuture> futures = new ArrayList<>();
@@ -276,7 +275,7 @@ public abstract class MessageAgent implements Resourcable {
}
//创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer(String name);
protected abstract MessageClientProducer createMessageClientProducer(String producerName);
//创建topic如果已存在则跳过
public abstract boolean createTopic(String... topics);
@@ -293,6 +292,13 @@ public abstract class MessageAgent implements Resourcable {
//创建指定topic的消费处理器
public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor);
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public void addConsumerListener(MessageConsumerListener listener) {
}
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) {

View File

@@ -0,0 +1,30 @@
/*
*
*/
package org.redkale.mq;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
/**
* MQ资源注解
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface MessageConsumer {
String mq();
String group() default "";
String[] topics();
}

View File

@@ -0,0 +1,24 @@
/*
*
*/
package org.redkale.mq;
import org.redkale.annotation.Component;
import org.redkale.service.*;
/**
* MQ资源注解
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Local
@Component
public interface MessageConsumerListener<T> extends Service {
public void onMessage(String topic, T message);
}

View File

@@ -0,0 +1,26 @@
/*
*
*/
package org.redkale.mq;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
/**
* MQ资源注解, 只能标记在MessageProducerSender类型字段上
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Documented
@Target({FIELD})
@Retention(RUNTIME)
public @interface MessageProducer {
String mq();
}

View File

@@ -0,0 +1,31 @@
/*
*
*/
package org.redkale.mq;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import org.redkale.convert.Convert;
/**
* MQ消息发送器
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public interface MessageProducerSender {
public CompletableFuture<Void> send(String topic, Object value);
default CompletableFuture<Void> send(String topic, Convert convert, Object value) {
return send(topic, convert.convertToBytes(value));
}
default CompletableFuture<Void> send(String topic, Convert convert, Type type, Object value) {
return send(topic, convert.convertToBytes(type, value));
}
}

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.redkale.annotation.Component;
import org.redkale.convert.Convert;
import org.redkale.util.*;
@@ -26,6 +27,7 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
@Component
public interface CacheSource extends Resourcable {
public String getType();

View File

@@ -218,7 +218,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Connection conn = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
for (BatchAction action : dataBatch.actions) {
if (action instanceof InsertBatchAction1) {
@@ -314,7 +313,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Connection conn = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = insertDB(false, conn, info, entitys);
conn.commit();
@@ -596,7 +594,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Connection conn = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = deleteDB(false, conn, info, tables, flipper, node, pkmap, sqls);
conn.commit();
@@ -735,7 +732,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
@@ -841,7 +837,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (copyTableSql == null) {
@@ -970,7 +965,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
@@ -1069,7 +1063,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Connection conn = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = updateEntityDB(false, conn, info, entitys);
conn.commit();
@@ -1477,7 +1470,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Statement stmt = null;
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql);
if (set.next()) {
@@ -1580,7 +1572,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Statement stmt = null;
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
stmt = conn.createStatement();
Number rs = defVal;
ResultSet set = stmt.executeQuery(sql);
@@ -1671,7 +1662,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Statement stmt = null;
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql);
ResultSetMetaData rsd = set.getMetaData();
@@ -1759,7 +1749,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
Statement stmt = null;
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql);
ResultSetMetaData rsd = set.getMetaData();
@@ -1908,7 +1897,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
PreparedStatement ps = null;
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
ps = conn.prepareStatement(sql);
ps.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, ps.executeQuery());
@@ -1989,7 +1977,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final Attribute<T, Serializable> attr = info.getAttribute(column);
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
ps = conn.prepareStatement(sql);
ps.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, ps.executeQuery());
@@ -2075,7 +2062,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
PreparedStatement ps = null;
try {
conn = readPool.pollConnection();
//conn.setReadOnly(true);
ps = conn.prepareStatement(sql);
final ResultSet set = ps.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
@@ -2339,7 +2325,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
slowLog(s, listSql);
return new Sheet<>(total, list);
} else {
//conn.setReadOnly(true);
PreparedStatement ps = conn.prepareStatement(listSql);
if (flipper != null && flipper.getLimit() > 0) {
ps.setFetchSize(flipper.getLimit());
@@ -2502,7 +2487,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final long s = System.currentTimeMillis();
Connection conn = writePool.pollConnection();
try {
conn.setReadOnly(false);
conn.setAutoCommit(false);
final Statement stmt = conn.createStatement();
final int[] rs = new int[sqls.length];
@@ -2546,7 +2530,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("direct query sql=" + sql);
}
//conn.setReadOnly(true);
final Statement statement = conn.createStatement();
//final PreparedStatement statement = conn.prepareStatement(sql);
final ResultSet set = statement.executeQuery(sql);// ps.executeQuery();
@@ -2730,7 +2713,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} else if (workExecutor instanceof ThreadHashExecutor) {
defMaxConns = ((ThreadHashExecutor) workExecutor).getCorePoolSize();
} else if (workExecutor != null) { //maybe virtual thread pool
defMaxConns = Math.min(1000, Utility.cpus() * 100);
defMaxConns = Math.min(1024, Utility.cpus() * 100);
}
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns)));
this.canNewSemaphore = new Semaphore(this.maxConns);

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.redkale.annotation.Component;
import org.redkale.util.*;
/**
@@ -39,6 +40,7 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
@Component
@SuppressWarnings("unchecked")
public interface DataSource extends Resourcable {