diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index e5a5fa5d6..951f83396 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -12,6 +12,7 @@ import java.util.logging.Logger; import javax.annotation.Resource; import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NODEID; +import org.redkale.convert.ConvertType; import org.redkale.net.Servlet; import org.redkale.net.http.*; import org.redkale.net.sncp.*; @@ -41,6 +42,8 @@ public abstract class MessageAgent { protected MessageProducer producer; + protected String sncpRespTopic; + protected MessageConsumer sncpRespConsumer; protected SncpRespProcessor sncpRespProcessor; @@ -148,6 +151,17 @@ public abstract class MessageAgent { this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); } + public CompletableFuture sendRemoteSncp(AtomicLong counter, MessageRecord message) { + if (this.sncpRespConsumer == null) { + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("Not open sncp consumer")); + return future; + } + message.setFormat(ConvertType.BSON); + message.setResptopic(generateSncpRespTopic()); + return this.sncpRespProcessor.createFuture(message.getSeqid(), counter); + } + public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); @@ -165,13 +179,15 @@ public abstract class MessageAgent { //格式: sncp.req.user public String generateSncpReqTopic(Service service) { String resname = Sncp.getResourceName(service); - if (service instanceof WebSocketNode) return "sncp.req.wsn" + (resname.isEmpty() ? "" : ("-" + resname)); + if (service instanceof WebSocketNode) return "sncp.req.ws" + (resname.isEmpty() ? "" : ("-" + resname)); return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } //格式: sncp.resp.node10 - protected String generateSncpRespTopic() { - return "sncp.resp.node" + nodeid; + private String generateSncpRespTopic() { + if (this.sncpRespTopic != null) return this.sncpRespTopic; + this.sncpRespTopic = "sncp.resp.node" + nodeid; + return this.sncpRespTopic; } //格式: http.req.user diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 970d99253..5b81eab8b 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -449,7 +449,7 @@ public abstract class Sncp { final AnyValue conf) { try { final Class newClazz = createLocalServiceClass(classLoader, name, serviceImplClass); - T rs = (T) newClazz.getDeclaredConstructor().newInstance(); + T service = (T) newClazz.getDeclaredConstructor().newInstance(); //-------------------------------------- Service remoteService = null; { @@ -464,7 +464,7 @@ public abstract class Sncp { if (remoteService == null && clientSncpAddress != null) { remoteService = createRemoteService(classLoader, name, serviceImplClass, remoteAgent, transportFactory, clientSncpAddress, groups, conf); } - if (remoteService != null) field.set(rs, remoteService); + if (remoteService != null) field.set(service, remoteService); } } while ((loop = loop.getSuperclass()) != Object.class); } @@ -473,20 +473,20 @@ public abstract class Sncp { try { Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client"); e.setAccessible(true); - client = new SncpClient(name, serviceImplClass, rs, remoteAgent, transportFactory, false, newClazz, clientSncpAddress); - e.set(rs, client); - transportFactory.addSncpService(rs); + client = new SncpClient(name, serviceImplClass, service, remoteAgent, transportFactory, false, newClazz, clientSncpAddress); + e.set(service, client); + if (transportFactory != null) transportFactory.addSncpService(service); } catch (NoSuchFieldException ne) { ne.printStackTrace(); } } - if (client == null) return rs; + if (client == null) return service; { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf"); c.setAccessible(true); - c.set(rs, conf); + c.set(service, conf); } - return rs; + return service; } catch (RuntimeException rex) { throw rex; } catch (Exception ex) { @@ -569,15 +569,15 @@ public abstract class Sncp { String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceTypeOrImplClass.getSimpleName(); try { Class newClazz = loader.loadClass(newDynName.replace('/', '.')); - T rs = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + T service = (T) newClazz.getDeclaredConstructor().newInstance(); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); - c.set(rs, client); - transportFactory.addSncpService(rs); - return rs; + c.set(service, client); + if (transportFactory != null) transportFactory.addSncpService(service); + return service; } catch (Throwable ex) { } //------------------------------------------------------------------------------ @@ -750,22 +750,22 @@ public abstract class Sncp { } }.loadClass(newDynName.replace('/', '.'), bytes); try { - T rs = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + T service = (T) newClazz.getDeclaredConstructor().newInstance(); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); - c.set(rs, client); + c.set(service, client); } { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf"); c.setAccessible(true); - c.set(rs, conf); + c.set(service, conf); } - transportFactory.addSncpService(rs); - return rs; + if (transportFactory != null) transportFactory.addSncpService(service); + return service; } catch (Exception ex) { throw new RuntimeException(ex); } diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 3ba3b0e1c..bd8c3752f 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -15,9 +15,10 @@ import java.util.concurrent.*; import java.util.function.Supplier; import java.util.logging.*; import javax.annotation.Resource; +import org.redkale.convert.ConvertType; import org.redkale.convert.bson.*; import org.redkale.convert.json.*; -import org.redkale.mq.MessageAgent; +import org.redkale.mq.*; import org.redkale.net.*; import static org.redkale.net.sncp.SncpRequest.*; import org.redkale.service.*; @@ -59,6 +60,8 @@ public final class SncpClient { protected final MessageAgent agent; + protected final String topic; + protected final Supplier bufferSupplier; @Resource @@ -79,6 +82,7 @@ public final class SncpClient { this.executor = factory.getExecutor(); this.bufferSupplier = factory.getBufferSupplier(); this.agent = agent; + this.topic = agent == null ? null : agent.generateSncpReqTopic(service); Class tn = serviceTypeOrImplClass; Version ver = tn.getAnnotation(Version.class); this.serviceClass = serviceClass; @@ -261,7 +265,7 @@ public final class SncpClient { final Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientSncpAddress; if (bsonConvert == null) bsonConvert = BsonConvert.root(); - final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 + final BsonWriter writer = agent == null ? bsonConvert.pollBsonWriter() : bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 writer.writeTo(DEFAULT_HEADER); for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean BsonConvert bcc = bsonConvert; @@ -274,6 +278,25 @@ public final class SncpClient { final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 final long seqid = System.nanoTime(); final DLong actionid = action.actionid; + if (agent != null) { //MQ模式 + final byte[] reqbytes = writer.toArray(); + fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); + MessageRecord message = new MessageRecord(ConvertType.BSON, this.topic, null, reqbytes); + return agent.sendRemoteSncp(null, message).thenApply(msg -> { + ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); + checkResult(seqid, action, buffer); + + final int respBodyLength = buffer.getInt(); + final int retcode = buffer.getInt(); + if (retcode != 0) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params)); + throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + } + byte[] body = new byte[respBodyLength]; + buffer.get(body, 0, respBodyLength); + return body; + }); + } final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; CompletableFuture connFuture = transport.pollConnection(addr); return connFuture.thenCompose(conn0 -> {