diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 60f393ef7..ce8af7a16 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -362,9 +362,9 @@ public final class Application { if (classval == null || classval.isEmpty()) { Iterator it = ServiceLoader.load(MessageAgent.class, classLoader).iterator(); while (it.hasNext()) { - MessageAgent agent = it.next(); - if (agent.match(mqConf)) { - mqs[i] = agent; + MessageAgent messageAgent = it.next(); + if (messageAgent.match(mqConf)) { + mqs[i] = messageAgent; mqs[i].setConfig(mqConf); break; } diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index f30bfee4d..694f3d8e2 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -234,7 +234,7 @@ public class NodeHttpServer extends NodeServer { agent0 = application.getMessageAgent(mqname); if (agent0 == null) throw new RuntimeException("not found " + MessageAgent.class.getSimpleName() + " config for (name=" + mqname + ")"); } - final MessageAgent agent = agent0; + final MessageAgent messageAgent = agent0; final boolean autoload = restConf.getBoolValue("autoload", true); { //加载RestService String userTypeStr = restConf.getValue("usertype"); @@ -278,7 +278,7 @@ public class NodeHttpServer extends NodeServer { if (ws != null && !ws.repair()) prefix2 = ""; resourceFactory.inject(servlet, NodeHttpServer.this); dynServletMap.put(service, servlet); - if (agent != null) agent.putService(this, service, servlet); + if (messageAgent != null) messageAgent.putService(this, service, servlet); //if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); if (ss != null) { String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value(); @@ -332,7 +332,7 @@ public class NodeHttpServer extends NodeServer { return; } restedObjects.add(stype); //避免重复创建Rest对象 - WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty()); + WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty(), messageAgent); if (servlet == null) return; //没有RestOnMessage方法的HttpServlet调用Rest.createRestWebSocketServlet就会返回null String prefix2 = prefix; WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); @@ -348,7 +348,7 @@ public class NodeHttpServer extends NodeServer { } } } - if (agent != null) this.messageAgents.put(agent.getName(), agent); + if (messageAgent != null) this.messageAgents.put(messageAgent.getName(), messageAgent); //输出信息 if (ss != null && !ss.isEmpty() && sb != null) { ss.sort((AbstractMap.SimpleEntry o1, AbstractMap.SimpleEntry o2) -> o1.getKey().compareTo(o2.getKey())); diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index 41bae9ce8..e45278e82 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -33,9 +33,9 @@ public class NodeSncpServer extends NodeServer { private NodeSncpServer(Application application, AnyValue serconf) { super(application, createServer(application, serconf)); this.sncpServer = (SncpServer) this.server; - this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> { + this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> {//singleton模式下不生成SncpServlet if (x.getClass().getAnnotation(Local.class) != null) return; - SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet + SncpDynServlet servlet = sncpServer.addSncpServlet(x); dynServletMap.put(x, servlet); if (agent != null) agent.putService(this, x, servlet); }; @@ -54,8 +54,8 @@ public class NodeSncpServer extends NodeServer { return sncpServer == null ? null : sncpServer.getSocketAddress(); } - public void consumerAccept(MessageAgent agent, Service service) { - if (this.consumer != null) this.consumer.accept(agent, service); + public void consumerAccept(MessageAgent messageAgent, Service service) { + if (this.consumer != null) this.consumer.accept(messageAgent, service); } @Override diff --git a/src/org/redkale/mq/HttpRespProcessor.java b/src/org/redkale/mq/HttpRespProcessor.java index a6f5f97af..d7067d65c 100644 --- a/src/org/redkale/mq/HttpRespProcessor.java +++ b/src/org/redkale/mq/HttpRespProcessor.java @@ -22,13 +22,13 @@ public class HttpRespProcessor implements MessageProcessor { protected final Logger logger; - protected final MessageAgent agent; + protected final MessageAgent messageAgent; protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); - public HttpRespProcessor(Logger logger, MessageAgent agent) { + public HttpRespProcessor(Logger logger, MessageAgent messageAgent) { this.logger = logger; - this.agent = agent; + this.messageAgent = messageAgent; } @Override diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index 1e15b33a8..31cac56de 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -23,7 +23,7 @@ public abstract class MessageConsumer { protected final String topic; - protected MessageAgent agent; + protected MessageAgent messageAgent; protected final MessageProcessor processor; @@ -31,11 +31,11 @@ public abstract class MessageConsumer { protected volatile boolean closed; - protected MessageConsumer(MessageAgent agent, String topic, MessageProcessor processor) { - Objects.requireNonNull(agent); + protected MessageConsumer(MessageAgent messageAgent, String topic, MessageProcessor processor) { + Objects.requireNonNull(messageAgent); Objects.requireNonNull(topic); Objects.requireNonNull(processor); - this.agent = agent; + this.messageAgent = messageAgent; this.topic = topic; this.processor = processor; } diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java index c4d14b142..271d563df 100644 --- a/src/org/redkale/mq/SncpRespProcessor.java +++ b/src/org/redkale/mq/SncpRespProcessor.java @@ -23,13 +23,13 @@ public class SncpRespProcessor implements MessageProcessor { protected final Logger logger; - protected final MessageAgent agent; + protected final MessageAgent messageAgent; protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); - public SncpRespProcessor(Logger logger, MessageAgent agent) { + public SncpRespProcessor(Logger logger, MessageAgent messageAgent) { this.logger = logger; - this.agent = agent; + this.messageAgent = messageAgent; } @Override diff --git a/src/org/redkale/net/http/HttpServer.java b/src/org/redkale/net/http/HttpServer.java index 90fa4efa4..ed0815f0c 100644 --- a/src/org/redkale/net/http/HttpServer.java +++ b/src/org/redkale/net/http/HttpServer.java @@ -16,6 +16,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.logging.Level; +import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.http.HttpContext.HttpContextConfig; import org.redkale.net.http.HttpResponse.HttpResponseConfig; @@ -217,11 +218,12 @@ public class HttpServer extends Server T addRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType, final String prefix, final AnyValue conf) { - T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType); + public T addRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType, final String prefix, final AnyValue conf, final MessageAgent messageAgent) { + T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType, messageAgent); if (servlet != null) this.prepare.addServlet(servlet, prefix, conf); return servlet; } diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index dcb063fb0..47aad7e30 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -22,6 +22,7 @@ import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.*; import org.redkale.convert.json.*; +import org.redkale.mq.MessageAgent; import org.redkale.net.Cryptor; import org.redkale.net.sncp.Sncp; import org.redkale.service.*; @@ -218,7 +219,7 @@ public final class Rest { return t == null ? defValue : t; } - public static T createRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType) { + public static T createRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType, final MessageAgent messageAgent) { if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet"); if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet"); if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet"); @@ -719,6 +720,11 @@ public final class Rest { Class newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray()); try { T servlet = (T) newClazz.getDeclaredConstructor().newInstance(); + if (messageAgent != null) { + Field agentField = servlet.getClass().getDeclaredField("messageAgent"); + agentField.setAccessible(true); + agentField.set(servlet, messageAgent); + } newClazz.getField("_redkale_annotations").set(null, msgclassToAnnotations); if (rws.cryptor() != Cryptor.class) { Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance(); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 2d68d989f..caf51708b 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -18,6 +18,7 @@ import java.util.logging.*; import java.util.zip.*; import javax.annotation.*; import org.redkale.convert.Convert; +import org.redkale.mq.MessageAgent; import org.redkale.net.Cryptor; import org.redkale.service.*; import org.redkale.util.*; @@ -115,6 +116,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Resource(name = "$") protected WebSocketNode node; + protected MessageAgent messageAgent; + @Resource(name = "SERVER_RESFACTORY") protected ResourceFactory resourceFactory; diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 5b81eab8b..902ef7a96 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -494,9 +494,9 @@ public abstract class Sncp { } } - public static T createSimpleRemoteService(final Class serviceImplClass, final MessageAgent agent, + public static T createSimpleRemoteService(final Class serviceImplClass, final MessageAgent messageAgent, final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { - return createRemoteService(null, "", serviceImplClass, agent, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); + return createRemoteService(null, "", serviceImplClass, messageAgent, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); } /** @@ -533,7 +533,7 @@ public abstract class Sncp { * @param classLoader ClassLoader * @param name 资源名 * @param serviceTypeOrImplClass Service类 - * @param agent MQ管理器 + * @param messageAgent MQ管理器 * @param transportFactory TransportFactory * @param clientAddress 本地IP地址 * @param groups0 所有的组节点,包含自身 @@ -547,7 +547,7 @@ public abstract class Sncp { final ClassLoader classLoader, final String name, final Class serviceTypeOrImplClass, - final MessageAgent agent, + final MessageAgent messageAgent, final TransportFactory transportFactory, final InetSocketAddress clientAddress, final Set groups0, @@ -570,7 +570,7 @@ public abstract class Sncp { try { Class newClazz = loader.loadClass(newDynName.replace('/', '.')); T service = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); @@ -751,7 +751,7 @@ public abstract class Sncp { }.loadClass(newDynName.replace('/', '.'), bytes); try { T service = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); { diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index bd8c3752f..f54702283 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -58,7 +58,7 @@ public final class SncpClient { protected final ExecutorService executor; - protected final MessageAgent agent; + protected final MessageAgent messageAgent; protected final String topic; @@ -76,13 +76,13 @@ public final class SncpClient { //远程模式, 可能为null protected Transport remoteGroupTransport; - public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, MessageAgent agent, final TransportFactory factory, + public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory, final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) { this.remote = remote; this.executor = factory.getExecutor(); this.bufferSupplier = factory.getBufferSupplier(); - this.agent = agent; - this.topic = agent == null ? null : agent.generateSncpReqTopic(service); + this.messageAgent = messageAgent; + this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service); Class tn = serviceTypeOrImplClass; Version ver = tn.getAnnotation(Version.class); this.serviceClass = serviceClass; @@ -113,7 +113,7 @@ public final class SncpClient { } public MessageAgent getMessageAgent() { - return agent; + return messageAgent; } public InetSocketAddress getClientAddress() { @@ -265,7 +265,7 @@ public final class SncpClient { final Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientSncpAddress; if (bsonConvert == null) bsonConvert = BsonConvert.root(); - final BsonWriter writer = agent == null ? bsonConvert.pollBsonWriter() : bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 + final BsonWriter writer = messageAgent == null ? bsonConvert.pollBsonWriter() : bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 writer.writeTo(DEFAULT_HEADER); for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean BsonConvert bcc = bsonConvert; @@ -278,11 +278,11 @@ public final class SncpClient { final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 final long seqid = System.nanoTime(); final DLong actionid = action.actionid; - if (agent != null) { //MQ模式 + if (messageAgent != null) { //MQ模式 final byte[] reqbytes = writer.toArray(); fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); MessageRecord message = new MessageRecord(ConvertType.BSON, this.topic, null, reqbytes); - return agent.sendRemoteSncp(null, message).thenApply(msg -> { + return messageAgent.sendRemoteSncp(null, message).thenApply(msg -> { ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); checkResult(seqid, action, buffer);