mq命名优化

This commit is contained in:
redkale
2023-04-03 09:32:44 +08:00
parent 188a74423b
commit 22fd2212de
10 changed files with 43 additions and 37 deletions

View File

@@ -23,7 +23,7 @@ import org.redkale.util.*;
*
* @since 2.1.0
*/
public class HttpMessageProcessor implements MessageProcessor {
public class HttpMessageClientProcessor implements MessageClientProcessor {
protected final Logger logger;
@@ -59,7 +59,7 @@ public class HttpMessageProcessor implements MessageProcessor {
}
};
public HttpMessageProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) {
public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.messageClient = messageClient;
this.producers = producers;
@@ -118,7 +118,7 @@ public class HttpMessageProcessor implements MessageProcessor {
HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(),
null, message, callback, messageClient, producers.getProducer(message), message.getRespTopic(), new HttpResult().status(500));
}
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
}
}
@@ -128,7 +128,7 @@ public class HttpMessageProcessor implements MessageProcessor {
try {
this.cdl.await(30, TimeUnit.SECONDS);
} catch (Exception ex) {
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " commit error, restmodule=" + this.restModule, ex);
logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " commit error, restmodule=" + this.restModule, ex);
}
this.cdl = null;
}

View File

@@ -67,8 +67,8 @@ public abstract class MessageAgent implements Resourcable {
protected MessageCoder<MessageRecord> messageCoder = MessageRecordCoder.getInstance();
//本地Service消息接收处理器 key:consumer
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
//本地Service消息接收处理器 key:consumerid
protected HashMap<String, MessageClientConsumerNode> clientConsumerNodes = new LinkedHashMap<>();
public void init(ResourceFactory factory, AnyValue config) {
this.name = checkName(config.getValue("name", ""));
@@ -96,7 +96,7 @@ public abstract class MessageAgent implements Resourcable {
}
// application (it doesn't execute completion handlers).
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r, "Redkale-MessageAgent-Timeout-Thread");
Thread t = new Thread(r, "Redkale-MessageAgent-[" + name + "]-Timeout-Thread");
t.setDaemon(true);
return t;
});
@@ -109,7 +109,7 @@ public abstract class MessageAgent implements Resourcable {
public CompletableFuture<Map<String, Long>> start() {
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
final List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
this.clientConsumerNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.consumerid, System.currentTimeMillis() - s)));
});
@@ -119,7 +119,7 @@ public abstract class MessageAgent implements Resourcable {
//Application.shutdown 在执行server.shutdown之前执行
public CompletableFuture<Void> stop() {
List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
this.clientConsumerNodes.values().forEach(node -> {
futures.add(node.consumer.shutdown());
});
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
@@ -153,7 +153,7 @@ public abstract class MessageAgent implements Resourcable {
if (one != null) {
consumers.add(one);
}
consumers.addAll(messageNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList()));
consumers.addAll(clientConsumerNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList()));
return consumers;
}
@@ -291,7 +291,7 @@ public abstract class MessageAgent implements Resourcable {
public abstract boolean acceptsConf(AnyValue config);
//创建指定topic的消费处理器
public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageProcessor processor);
public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor);
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
@@ -312,11 +312,11 @@ public abstract class MessageAgent implements Resourcable {
String consumerid = generateHttpConsumerid(topics, service);
httpNodesLock.lock();
try {
if (messageNodes.containsKey(consumerid)) {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
}
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor)));
HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor)));
} finally {
httpNodesLock.unlock();
}
@@ -335,11 +335,11 @@ public abstract class MessageAgent implements Resourcable {
String consumerid = generateSncpConsumerid(topic, service);
sncpNodesLock.lock();
try {
if (messageNodes.containsKey(consumerid)) {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
}
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor)));
SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor)));
} finally {
sncpNodesLock.unlock();
}
@@ -407,7 +407,7 @@ public abstract class MessageAgent implements Resourcable {
return protocol + ".resp.node" + nodeid;
}
protected static class MessageConsumerNode {
protected static class MessageClientConsumerNode {
public final NodeServer server;
@@ -415,11 +415,11 @@ public abstract class MessageAgent implements Resourcable {
public final Servlet servlet;
public final MessageProcessor processor;
public final MessageClientProcessor processor;
public final MessageClientConsumer consumer;
public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageClientConsumer consumer) {
public MessageClientConsumerNode(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) {
this.server = server;
this.service = service;
this.servlet = servlet;

View File

@@ -67,7 +67,7 @@ public abstract class MessageClient {
this.respConsumerid = "consumer-" + this.respTopic;
}
if (this.respConsumer == null) {
MessageProcessor processor = (msg, callback) -> {
MessageClientProcessor processor = (msg, callback) -> {
long now = System.currentTimeMillis();
MessageRespFutureNode node = respNodes.remove(msg.getSeqid());
if (node == null) {

View File

@@ -27,13 +27,13 @@ public abstract class MessageClientConsumer {
protected MessageAgent messageAgent;
protected final MessageProcessor processor;
protected final MessageClientProcessor processor;
protected final Logger logger;
protected volatile boolean closed;
protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) {
protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageClientProcessor processor) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topics);
Objects.requireNonNull(consumerid);
@@ -45,7 +45,7 @@ public abstract class MessageClientConsumer {
this.processor = processor;
}
public MessageProcessor getProcessor() {
public MessageClientProcessor getProcessor() {
return processor;
}

View File

@@ -15,7 +15,7 @@ package org.redkale.mq;
*
* @since 2.1.0
*/
public interface MessageProcessor {
public interface MessageClientProcessor {
default void begin(int size, long starttime) {
}

View File

@@ -22,7 +22,7 @@ import org.redkale.util.Traces;
*
* @since 2.1.0
*/
public class SncpMessageProcessor implements MessageProcessor {
public class SncpMessageClientProcessor implements MessageClientProcessor {
protected final Logger logger;
@@ -46,7 +46,7 @@ public class SncpMessageProcessor implements MessageProcessor {
}
};
public SncpMessageProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger;
this.messageClient = messageClient;
this.producer = producer;
@@ -90,7 +90,7 @@ public class SncpMessageProcessor implements MessageProcessor {
if (response != null) {
response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
}
logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
logger.log(Level.SEVERE, SncpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
}
}

View File

@@ -50,14 +50,13 @@ public class SncpMessageResponse extends SncpResponse {
callback.run();
}
if (out == null) {
final ByteArray result = new ByteArray(SncpHeader.HEADER_SIZE);
final ByteArray result = onlyHeaderData;
fillHeader(result, 0, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null));
return;
}
final int respBodyLength = out.count(); //body总长度
final ByteArray result = out.toByteArray();
fillHeader(result, respBodyLength - SncpHeader.HEADER_SIZE, retcode);
fillHeader(result, result.length() - SncpHeader.HEADER_SIZE, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes()));
}
}

View File

@@ -36,7 +36,9 @@ import org.redkale.util.*;
*/
public abstract class Sncp {
private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).writeTo(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes();
private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO)
.writeTo(new ByteArray(SncpHeader.HEADER_SIZE).putPlaceholder(SncpHeader.HEADER_SIZE), null, 0, 0, 0)
.getBytes();
private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length);

View File

@@ -118,6 +118,9 @@ public class SncpHeader {
if (newAddrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length);
}
if (array.length() < HEADER_SIZE) {
throw new SncpException("ByteArray length must more " + HEADER_SIZE);
}
int offset = 0;
array.putLong(offset, newSeqid); //8
offset += 8;

View File

@@ -36,6 +36,8 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
protected final BsonWriter writer = new BsonWriter();
protected final ByteArray onlyHeaderData = new ByteArray(HEADER_SIZE).putPlaceholder(HEADER_SIZE);
protected final CompletionHandler realHandler = new CompletionHandler() {
@Override
public void completed(Object result, Object attachment) {
@@ -168,9 +170,9 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
//调用此方法时out已写入SncpHeader
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final ByteArray buffer = new ByteArray(HEADER_SIZE);
fillHeader(buffer, 0, retcode);
finish(buffer);
final ByteArray array = onlyHeaderData;
fillHeader(array, 0, retcode);
finish(array);
return;
}
final ByteArray array = out.toByteArray();
@@ -179,9 +181,9 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finish(array);
}
protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) {
protected void fillHeader(ByteArray array, int bodyLength, int retcode) {
SncpHeader header = request.getHeader();
header.writeTo(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode);
header.writeTo(array, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode);
}
}