This commit is contained in:
Redkale
2020-06-03 19:26:38 +08:00
parent 982fc8440c
commit 6909748bda
8 changed files with 117 additions and 42 deletions

View File

@@ -13,7 +13,7 @@ import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.mq.MessageAgent; import org.redkale.mq.MessageAgent;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.service.Service; import org.redkale.service.*;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.AnyValue.DefaultAnyValue;
@@ -34,6 +34,7 @@ public class NodeSncpServer extends NodeServer {
super(application, createServer(application, serconf)); super(application, createServer(application, serconf));
this.sncpServer = (SncpServer) this.server; this.sncpServer = (SncpServer) this.server;
this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> { this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> {
if (x.getClass().getAnnotation(Local.class) != null) return;
SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet
if (agent != null) agent.putService(this, x, servlet); if (agent != null) agent.putService(this, x, servlet);
}; };

View File

@@ -18,12 +18,12 @@ import org.redkale.net.http.*;
*/ */
public class HttpMessageRequest extends HttpRequest { public class HttpMessageRequest extends HttpRequest {
protected MessageRecord reqMessage; protected MessageRecord message;
public HttpMessageRequest(HttpContext context, MessageRecord reqMessage) { public HttpMessageRequest(HttpContext context, MessageRecord message) {
super(context, reqMessage.decodeContent(HttpSimpleRequestCoder.getInstance())); super(context, message.decodeContent(HttpSimpleRequestCoder.getInstance()));
this.reqMessage = reqMessage; this.message = message;
this.currentUserid = reqMessage.getUserid(); this.currentUserid = message.getUserid();
} }
} }

View File

@@ -23,25 +23,25 @@ import org.redkale.util.ObjectPool;
*/ */
public class HttpMessageResponse extends HttpResponse { public class HttpMessageResponse extends HttpResponse {
protected MessageRecord reqMessage; protected MessageRecord message;
protected MessageProducer producer; protected MessageProducer producer;
public HttpMessageResponse(HttpContext context, HttpMessageRequest request, public HttpMessageResponse(HttpContext context, HttpMessageRequest request,
ObjectPool<Response> responsePool, HttpResponseConfig config, MessageProducer producer) { ObjectPool<Response> responsePool, HttpResponseConfig config, MessageProducer producer) {
super(context, request, responsePool, config); super(context, request, responsePool, config);
this.reqMessage = request.reqMessage; this.message = request.message;
this.producer = producer; this.producer = producer;
} }
public HttpMessageResponse(HttpContext context, MessageRecord reqMessage, HttpResponseConfig config, MessageProducer producer) { public HttpMessageResponse(HttpContext context, MessageRecord message, HttpResponseConfig config, MessageProducer producer) {
super(context, new HttpMessageRequest(context, reqMessage), null, config); super(context, new HttpMessageRequest(context, message), null, config);
this.reqMessage = reqMessage; this.message = message;
this.producer = producer; this.producer = producer;
} }
public void finishHttpResult(HttpResult result) { public void finishHttpResult(HttpResult result) {
finishHttpResult(this.producer, reqMessage.getResptopic(), result); finishHttpResult(this.producer, message.getResptopic(), result);
} }
public static void finishHttpResult(MessageProducer producer, String resptopic, HttpResult result) { public static void finishHttpResult(MessageProducer producer, String resptopic, HttpResult result) {
@@ -52,44 +52,44 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finishJson(org.redkale.service.RetResult ret) { public void finishJson(org.redkale.service.RetResult ret) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(ret.clearConvert(), ret)); finishHttpResult(new HttpResult(ret.clearConvert(), ret));
} }
@Override @Override
public void finish(String obj) { public void finish(String obj) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(obj == null ? "" : obj)); finishHttpResult(new HttpResult(obj == null ? "" : obj));
} }
@Override @Override
public void finish(int status, String message) { public void finish(int status, String message) {
if (reqMessage.isEmptyResptopic()) return; if (this.message.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(message == null ? "" : message).status(status)); finishHttpResult(new HttpResult(message == null ? "" : message).status(status));
} }
@Override @Override
public void finish(final Convert convert, HttpResult result) { public void finish(final Convert convert, HttpResult result) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
if (convert != null) result.convert(convert); if (convert != null) result.convert(convert);
finishHttpResult(result); finishHttpResult(result);
} }
@Override @Override
public void finish(final byte[] bs) { public void finish(final byte[] bs) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(bs)); finishHttpResult(new HttpResult(bs));
} }
@Override @Override
public void finish(final String contentType, final byte[] bs) { public void finish(final String contentType, final byte[] bs) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(bs).contentType(contentType)); finishHttpResult(new HttpResult(bs).contentType(contentType));
} }
@Override @Override
public void finish(boolean kill, ByteBuffer buffer) { public void finish(boolean kill, ByteBuffer buffer) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
byte[] bs = new byte[buffer.remaining()]; byte[] bs = new byte[buffer.remaining()];
buffer.get(bs); buffer.get(bs);
finishHttpResult(new HttpResult(bs)); finishHttpResult(new HttpResult(bs));
@@ -97,7 +97,7 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finish(boolean kill, ByteBuffer... buffers) { public void finish(boolean kill, ByteBuffer... buffers) {
if (reqMessage.isEmptyResptopic()) return; if (message.isEmptyResptopic()) return;
int size = 0; int size = 0;
for (ByteBuffer buf : buffers) { for (ByteBuffer buf : buffers) {
size += buf.remaining(); size += buf.remaining();

View File

@@ -43,7 +43,7 @@ public abstract class MessageAgent {
protected MessageConsumer sncpRespConsumer; protected MessageConsumer sncpRespConsumer;
protected SncpMessageProcessor sncpRespProcessor; protected SncpRespProcessor sncpRespProcessor;
//sncpRespConsumer启动耗时 小于0表示未启动 //sncpRespConsumer启动耗时 小于0表示未启动
protected long sncpRespStartms = -1; protected long sncpRespStartms = -1;
@@ -141,21 +141,21 @@ public abstract class MessageAgent {
public final synchronized void putSncpResp(NodeSncpServer ns) { public final synchronized void putSncpResp(NodeSncpServer ns) {
if (this.sncpRespConsumer != null) return; if (this.sncpRespConsumer != null) return;
this.sncpRespProcessor = new SncpMessageProcessor(logger, this); this.sncpRespProcessor = new SncpRespProcessor(this.logger, this);
this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor);
} }
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service); String topic = generateHttpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.producer, ns, service, servlet); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
} }
public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
String topic = generateSncpReqTopic(service); String topic = generateSncpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this); SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
} }

View File

@@ -5,7 +5,10 @@
*/ */
package org.redkale.mq; package org.redkale.mq;
import java.util.logging.Logger; import java.util.logging.*;
import org.redkale.boot.NodeSncpServer;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
/** /**
* *
@@ -20,15 +23,49 @@ public class SncpMessageProcessor implements MessageProcessor {
protected final Logger logger; protected final Logger logger;
protected final MessageAgent agent; protected final MessageProducer producer;
public SncpMessageProcessor(Logger logger, MessageAgent agent) { protected final NodeSncpServer server;
protected final Service service;
protected final SncpServlet servlet;
public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger; this.logger = logger;
this.agent = agent; this.producer = producer;
this.server = server;
this.service = service;
this.servlet = servlet;
} }
@Override @Override
public void process(MessageRecord message) { public void process(MessageRecord message) {
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
SncpMessageResponse response = new SncpMessageResponse(context, request, null, producer);
try {
servlet.execute(request, response);
} catch (Exception ex) {
response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
}
} }
public MessageProducer getProducer() {
return producer;
}
public NodeSncpServer getServer() {
return server;
}
public Service getService() {
return service;
}
public SncpServlet getServlet() {
return servlet;
}
} }

View File

@@ -19,12 +19,13 @@ import org.redkale.net.sncp.*;
*/ */
public class SncpMessageRequest extends SncpRequest { public class SncpMessageRequest extends SncpRequest {
public SncpMessageRequest(SncpContext context) { protected MessageRecord message;
@SuppressWarnings("OverridableMethodCallInConstructor")
public SncpMessageRequest(SncpContext context, MessageRecord message) {
super(context, null); super(context, null);
this.message = message;
readHeader(ByteBuffer.wrap(message.getContent()));
} }
@Override
public int readHeader(ByteBuffer buffer) {
return super.readHeader(buffer);
}
} }

View File

@@ -6,7 +6,7 @@
package org.redkale.mq; package org.redkale.mq;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.*; import org.redkale.convert.ConvertType;
import org.redkale.convert.bson.BsonWriter; import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response; import org.redkale.net.Response;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
@@ -26,16 +26,18 @@ public class SncpMessageResponse extends SncpResponse {
protected MessageRecord message; protected MessageRecord message;
protected BiConsumer<MessageRecord, byte[]> resultConsumer; protected MessageProducer producer;
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool<Response> responsePool) { public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool<Response> responsePool, MessageProducer producer) {
super(context, request, responsePool); super(context, request, responsePool);
this.message = request.message;
this.producer = producer;
} }
public SncpMessageResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) { public SncpMessageResponse(SncpContext context, MessageRecord message, ObjectPool<Response> responsePool, MessageProducer producer) {
super(context, new SncpMessageRequest(context, message), responsePool);
this.message = message; this.message = message;
this.resultConsumer = resultConsumer; this.producer = producer;
return this;
} }
@Override @Override
@@ -43,12 +45,12 @@ public class SncpMessageResponse extends SncpResponse {
if (out == null) { if (out == null) {
final byte[] result = new byte[SncpRequest.HEADER_SIZE]; final byte[] result = new byte[SncpRequest.HEADER_SIZE];
fillHeader(ByteBuffer.wrap(result), 0, retcode); fillHeader(ByteBuffer.wrap(result), 0, retcode);
resultConsumer.accept(message, result); producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, (byte[]) null));
return; return;
} }
final int respBodyLength = out.count(); //body总长度 final int respBodyLength = out.count(); //body总长度
final byte[] result = out.toArray(); final byte[] result = out.toArray();
fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode); fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode);
resultConsumer.accept(message, result); producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, result));
} }
} }

View File

@@ -0,0 +1,34 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.logging.Logger;
/**
* MQ管理器
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpRespProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageAgent agent;
public SncpRespProcessor(Logger logger, MessageAgent agent) {
this.logger = logger;
this.agent = agent;
}
@Override
public void process(MessageRecord message) {
}
}