This commit is contained in:
Redkale
2020-06-06 22:04:18 +08:00
parent 64500b113a
commit 8d5e61a9a2
11 changed files with 49 additions and 38 deletions

View File

@@ -362,9 +362,9 @@ public final class Application {
if (classval == null || classval.isEmpty()) { if (classval == null || classval.isEmpty()) {
Iterator<MessageAgent> it = ServiceLoader.load(MessageAgent.class, classLoader).iterator(); Iterator<MessageAgent> it = ServiceLoader.load(MessageAgent.class, classLoader).iterator();
while (it.hasNext()) { while (it.hasNext()) {
MessageAgent agent = it.next(); MessageAgent messageAgent = it.next();
if (agent.match(mqConf)) { if (messageAgent.match(mqConf)) {
mqs[i] = agent; mqs[i] = messageAgent;
mqs[i].setConfig(mqConf); mqs[i].setConfig(mqConf);
break; break;
} }

View File

@@ -234,7 +234,7 @@ public class NodeHttpServer extends NodeServer {
agent0 = application.getMessageAgent(mqname); agent0 = application.getMessageAgent(mqname);
if (agent0 == null) throw new RuntimeException("not found " + MessageAgent.class.getSimpleName() + " config for (name=" + mqname + ")"); if (agent0 == null) throw new RuntimeException("not found " + MessageAgent.class.getSimpleName() + " config for (name=" + mqname + ")");
} }
final MessageAgent agent = agent0; final MessageAgent messageAgent = agent0;
final boolean autoload = restConf.getBoolValue("autoload", true); final boolean autoload = restConf.getBoolValue("autoload", true);
{ //加载RestService { //加载RestService
String userTypeStr = restConf.getValue("usertype"); String userTypeStr = restConf.getValue("usertype");
@@ -278,7 +278,7 @@ public class NodeHttpServer extends NodeServer {
if (ws != null && !ws.repair()) prefix2 = ""; if (ws != null && !ws.repair()) prefix2 = "";
resourceFactory.inject(servlet, NodeHttpServer.this); resourceFactory.inject(servlet, NodeHttpServer.this);
dynServletMap.put(service, servlet); dynServletMap.put(service, servlet);
if (agent != null) agent.putService(this, service, servlet); if (messageAgent != null) messageAgent.putService(this, service, servlet);
//if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); //if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
if (ss != null) { if (ss != null) {
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value(); String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
@@ -332,7 +332,7 @@ public class NodeHttpServer extends NodeServer {
return; return;
} }
restedObjects.add(stype); //避免重复创建Rest对象 restedObjects.add(stype); //避免重复创建Rest对象
WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty()); WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty(), messageAgent);
if (servlet == null) return; //没有RestOnMessage方法的HttpServlet调用Rest.createRestWebSocketServlet就会返回null if (servlet == null) return; //没有RestOnMessage方法的HttpServlet调用Rest.createRestWebSocketServlet就会返回null
String prefix2 = prefix; String prefix2 = prefix;
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
@@ -348,7 +348,7 @@ public class NodeHttpServer extends NodeServer {
} }
} }
} }
if (agent != null) this.messageAgents.put(agent.getName(), agent); if (messageAgent != null) this.messageAgents.put(messageAgent.getName(), messageAgent);
//输出信息 //输出信息
if (ss != null && !ss.isEmpty() && sb != null) { if (ss != null && !ss.isEmpty() && sb != null) {
ss.sort((AbstractMap.SimpleEntry<String, String[]> o1, AbstractMap.SimpleEntry<String, String[]> o2) -> o1.getKey().compareTo(o2.getKey())); ss.sort((AbstractMap.SimpleEntry<String, String[]> o1, AbstractMap.SimpleEntry<String, String[]> o2) -> o1.getKey().compareTo(o2.getKey()));

View File

@@ -33,9 +33,9 @@ public class NodeSncpServer extends NodeServer {
private NodeSncpServer(Application application, AnyValue serconf) { private NodeSncpServer(Application application, AnyValue serconf) {
super(application, createServer(application, serconf)); super(application, createServer(application, serconf));
this.sncpServer = (SncpServer) this.server; this.sncpServer = (SncpServer) this.server;
this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> { this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> {//singleton模式下不生成SncpServlet
if (x.getClass().getAnnotation(Local.class) != null) return; if (x.getClass().getAnnotation(Local.class) != null) return;
SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet SncpDynServlet servlet = sncpServer.addSncpServlet(x);
dynServletMap.put(x, servlet); dynServletMap.put(x, servlet);
if (agent != null) agent.putService(this, x, servlet); if (agent != null) agent.putService(this, x, servlet);
}; };
@@ -54,8 +54,8 @@ public class NodeSncpServer extends NodeServer {
return sncpServer == null ? null : sncpServer.getSocketAddress(); return sncpServer == null ? null : sncpServer.getSocketAddress();
} }
public void consumerAccept(MessageAgent agent, Service service) { public void consumerAccept(MessageAgent messageAgent, Service service) {
if (this.consumer != null) this.consumer.accept(agent, service); if (this.consumer != null) this.consumer.accept(messageAgent, service);
} }
@Override @Override

View File

@@ -22,13 +22,13 @@ public class HttpRespProcessor implements MessageProcessor {
protected final Logger logger; protected final Logger logger;
protected final MessageAgent agent; protected final MessageAgent messageAgent;
protected final ConcurrentHashMap<Long, RespFutureNode> respNodes = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Long, RespFutureNode> respNodes = new ConcurrentHashMap<>();
public HttpRespProcessor(Logger logger, MessageAgent agent) { public HttpRespProcessor(Logger logger, MessageAgent messageAgent) {
this.logger = logger; this.logger = logger;
this.agent = agent; this.messageAgent = messageAgent;
} }
@Override @Override

View File

@@ -23,7 +23,7 @@ public abstract class MessageConsumer {
protected final String topic; protected final String topic;
protected MessageAgent agent; protected MessageAgent messageAgent;
protected final MessageProcessor processor; protected final MessageProcessor processor;
@@ -31,11 +31,11 @@ public abstract class MessageConsumer {
protected volatile boolean closed; protected volatile boolean closed;
protected MessageConsumer(MessageAgent agent, String topic, MessageProcessor processor) { protected MessageConsumer(MessageAgent messageAgent, String topic, MessageProcessor processor) {
Objects.requireNonNull(agent); Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topic); Objects.requireNonNull(topic);
Objects.requireNonNull(processor); Objects.requireNonNull(processor);
this.agent = agent; this.messageAgent = messageAgent;
this.topic = topic; this.topic = topic;
this.processor = processor; this.processor = processor;
} }

View File

@@ -23,13 +23,13 @@ public class SncpRespProcessor implements MessageProcessor {
protected final Logger logger; protected final Logger logger;
protected final MessageAgent agent; protected final MessageAgent messageAgent;
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
public SncpRespProcessor(Logger logger, MessageAgent agent) { public SncpRespProcessor(Logger logger, MessageAgent messageAgent) {
this.logger = logger; this.logger = logger;
this.agent = agent; this.messageAgent = messageAgent;
} }
@Override @Override

View File

@@ -16,6 +16,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.net.http.HttpContext.HttpContextConfig; import org.redkale.net.http.HttpContext.HttpContextConfig;
import org.redkale.net.http.HttpResponse.HttpResponseConfig; import org.redkale.net.http.HttpResponse.HttpResponseConfig;
@@ -217,11 +218,12 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
* @param webSocketType WebSocket的类型 * @param webSocketType WebSocket的类型
* @param prefix url前缀 * @param prefix url前缀
* @param conf 配置信息 * @param conf 配置信息
* @param messageAgent MessageAgent
* *
* @return RestServlet * @return RestServlet
*/ */
public <S extends WebSocket, T extends WebSocketServlet> T addRestWebSocketServlet(final ClassLoader classLoader, final Class<S> webSocketType, final String prefix, final AnyValue conf) { public <S extends WebSocket, T extends WebSocketServlet> T addRestWebSocketServlet(final ClassLoader classLoader, final Class<S> webSocketType, final String prefix, final AnyValue conf, final MessageAgent messageAgent) {
T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType); T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType, messageAgent);
if (servlet != null) this.prepare.addServlet(servlet, prefix, conf); if (servlet != null) this.prepare.addServlet(servlet, prefix, conf);
return servlet; return servlet;
} }

View File

@@ -22,6 +22,7 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type; import org.redkale.asm.Type;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
import org.redkale.net.sncp.Sncp; import org.redkale.net.sncp.Sncp;
import org.redkale.service.*; import org.redkale.service.*;
@@ -218,7 +219,7 @@ public final class Rest {
return t == null ? defValue : t; return t == null ? defValue : t;
} }
public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType) { public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType, final MessageAgent messageAgent) {
if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet"); if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet");
if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet"); if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet");
if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet"); if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet");
@@ -719,6 +720,11 @@ public final class Rest {
Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray()); Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray());
try { try {
T servlet = (T) newClazz.getDeclaredConstructor().newInstance(); T servlet = (T) newClazz.getDeclaredConstructor().newInstance();
if (messageAgent != null) {
Field agentField = servlet.getClass().getDeclaredField("messageAgent");
agentField.setAccessible(true);
agentField.set(servlet, messageAgent);
}
newClazz.getField("_redkale_annotations").set(null, msgclassToAnnotations); newClazz.getField("_redkale_annotations").set(null, msgclassToAnnotations);
if (rws.cryptor() != Cryptor.class) { if (rws.cryptor() != Cryptor.class) {
Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance(); Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance();

View File

@@ -18,6 +18,7 @@ import java.util.logging.*;
import java.util.zip.*; import java.util.zip.*;
import javax.annotation.*; import javax.annotation.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -115,6 +116,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Resource(name = "$") @Resource(name = "$")
protected WebSocketNode node; protected WebSocketNode node;
protected MessageAgent messageAgent;
@Resource(name = "SERVER_RESFACTORY") @Resource(name = "SERVER_RESFACTORY")
protected ResourceFactory resourceFactory; protected ResourceFactory resourceFactory;

View File

@@ -494,9 +494,9 @@ public abstract class Sncp {
} }
} }
public static <T extends Service> T createSimpleRemoteService(final Class<T> serviceImplClass, final MessageAgent agent, public static <T extends Service> T createSimpleRemoteService(final Class<T> serviceImplClass, final MessageAgent messageAgent,
final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
return createRemoteService(null, "", serviceImplClass, agent, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); return createRemoteService(null, "", serviceImplClass, messageAgent, transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
} }
/** /**
@@ -533,7 +533,7 @@ public abstract class Sncp {
* @param classLoader ClassLoader * @param classLoader ClassLoader
* @param name 资源名 * @param name 资源名
* @param serviceTypeOrImplClass Service类 * @param serviceTypeOrImplClass Service类
* @param agent MQ管理器 * @param messageAgent MQ管理器
* @param transportFactory TransportFactory * @param transportFactory TransportFactory
* @param clientAddress 本地IP地址 * @param clientAddress 本地IP地址
* @param groups0 所有的组节点,包含自身 * @param groups0 所有的组节点,包含自身
@@ -547,7 +547,7 @@ public abstract class Sncp {
final ClassLoader classLoader, final ClassLoader classLoader,
final String name, final String name,
final Class<T> serviceTypeOrImplClass, final Class<T> serviceTypeOrImplClass,
final MessageAgent agent, final MessageAgent messageAgent,
final TransportFactory transportFactory, final TransportFactory transportFactory,
final InetSocketAddress clientAddress, final InetSocketAddress clientAddress,
final Set<String> groups0, final Set<String> groups0,
@@ -570,7 +570,7 @@ public abstract class Sncp {
try { try {
Class newClazz = loader.loadClass(newDynName.replace('/', '.')); Class newClazz = loader.loadClass(newDynName.replace('/', '.'));
T service = (T) newClazz.getDeclaredConstructor().newInstance(); T service = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups); client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
@@ -751,7 +751,7 @@ public abstract class Sncp {
}.loadClass(newDynName.replace('/', '.'), bytes); }.loadClass(newDynName.replace('/', '.'), bytes);
try { try {
T service = (T) newClazz.getDeclaredConstructor().newInstance(); T service = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups); client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
{ {

View File

@@ -58,7 +58,7 @@ public final class SncpClient {
protected final ExecutorService executor; protected final ExecutorService executor;
protected final MessageAgent agent; protected final MessageAgent messageAgent;
protected final String topic; protected final String topic;
@@ -76,13 +76,13 @@ public final class SncpClient {
//远程模式, 可能为null //远程模式, 可能为null
protected Transport remoteGroupTransport; protected Transport remoteGroupTransport;
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent agent, final TransportFactory factory, public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent messageAgent, final TransportFactory factory,
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) { final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
this.remote = remote; this.remote = remote;
this.executor = factory.getExecutor(); this.executor = factory.getExecutor();
this.bufferSupplier = factory.getBufferSupplier(); this.bufferSupplier = factory.getBufferSupplier();
this.agent = agent; this.messageAgent = messageAgent;
this.topic = agent == null ? null : agent.generateSncpReqTopic(service); this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service);
Class<?> tn = serviceTypeOrImplClass; Class<?> tn = serviceTypeOrImplClass;
Version ver = tn.getAnnotation(Version.class); Version ver = tn.getAnnotation(Version.class);
this.serviceClass = serviceClass; this.serviceClass = serviceClass;
@@ -113,7 +113,7 @@ public final class SncpClient {
} }
public MessageAgent getMessageAgent() { public MessageAgent getMessageAgent() {
return agent; return messageAgent;
} }
public InetSocketAddress getClientAddress() { public InetSocketAddress getClientAddress() {
@@ -265,7 +265,7 @@ public final class SncpClient {
final Class[] myparamclass = action.paramClass; final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientSncpAddress; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientSncpAddress;
if (bsonConvert == null) bsonConvert = BsonConvert.root(); if (bsonConvert == null) bsonConvert = BsonConvert.root();
final BsonWriter writer = agent == null ? bsonConvert.pollBsonWriter() : bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 final BsonWriter writer = messageAgent == null ? bsonConvert.pollBsonWriter() : bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER); writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
BsonConvert bcc = bsonConvert; BsonConvert bcc = bsonConvert;
@@ -278,11 +278,11 @@ public final class SncpClient {
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime(); final long seqid = System.nanoTime();
final DLong actionid = action.actionid; final DLong actionid = action.actionid;
if (agent != null) { //MQ模式 if (messageAgent != null) { //MQ模式
final byte[] reqbytes = writer.toArray(); final byte[] reqbytes = writer.toArray();
fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength);
MessageRecord message = new MessageRecord(ConvertType.BSON, this.topic, null, reqbytes); MessageRecord message = new MessageRecord(ConvertType.BSON, this.topic, null, reqbytes);
return agent.sendRemoteSncp(null, message).thenApply(msg -> { return messageAgent.sendRemoteSncp(null, message).thenApply(msg -> {
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
checkResult(seqid, action, buffer); checkResult(seqid, action, buffer);