This commit is contained in:
redkale
2023-10-08 12:11:49 +08:00
parent 33e751a44c
commit 1fa17bf10c
6 changed files with 29 additions and 56 deletions

View File

@@ -1720,16 +1720,7 @@ public final class Application {
consumer.init(consumerConf);
}
}
Map<String, Long> map = agent.start(consumers);
AtomicInteger maxlen = new AtomicInteger();
map.keySet().forEach(str -> {
if (str.length() > maxlen.get()) {
maxlen.set(str.length());
}
});
new TreeMap<>(map).forEach((topic, ms) -> sb.append(MessageClientConsumer.class.getSimpleName()).append("(topic=")
.append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n")
);
agent.start(consumers);
}
if (sb.length() > 0) {
logger.info(sb.toString().trim());

View File

@@ -333,7 +333,7 @@ public class NodeHttpServer extends NodeServer {
for (AbstractMap.SimpleEntry<String, String[]> en : webss) {
StringBuilder sub = new StringBuilder();
int pos = en.getKey().indexOf(':');
sub.append("RestWebSocket (type=").append(en.getKey().substring(0, pos));
sub.append("RestWebSocket (type=").append(en.getKey().substring(0, pos));
for (int i = 0; i < maxTypeLength - pos; i++) {
sub.append(' ');
}

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntFunction;
import java.util.logging.*;
import java.util.stream.Collectors;
import org.redkale.annotation.*;
@@ -140,20 +139,17 @@ public abstract class MessageAgent implements Resourcable {
return workExecutor.submit(event);
}
public Map<String, Long> start(List<MessageConsumer> consumers) {
public void start(List<MessageConsumer> consumers) {
StringBuilder loginfo = initMessageConsumer(consumers);
startMessageConsumer();
if (loginfo.length() > 0) {
logger.log(Level.INFO, loginfo.toString());
}
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
this.clientConsumerNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
node.consumer.start();
long e = System.currentTimeMillis() - s;
map.put(node.consumer.consumerid, e);
});
return map;
}
//Application.stop 在执行server.shutdown之前执行
@@ -167,40 +163,29 @@ public abstract class MessageAgent implements Resourcable {
//Application.stop 在所有server.shutdown执行后执行
public void destroy(AnyValue config) {
logger.log(Level.FINE, "MessageConsumer destroying");
for (MessageConsumer consumer : messageConsumerList) {
consumer.destroy(config);
}
this.messageConsumerList.clear();
this.messageConsumerMap.clear();
logger.log(Level.FINE, "MessageConsumer destroyed");
this.httpMessageClient.close();
this.sncpMessageClient.close();
logger.log(Level.FINE, "httpMessageClient and sncpMessageClient destroyed");
if (this.httpClientProducer != null) {
logger.log(Level.FINE, "httpClientProducer stoping");
this.httpClientProducer.stop();
logger.log(Level.FINE, "httpClientProducer stoped");
}
if (this.sncpClientProducer != null) {
logger.log(Level.FINE, "sncpClientProducer stoping");
this.sncpClientProducer.stop();
logger.log(Level.FINE, "sncpClientProducer stoped");
}
if (this.clientMessageCoder instanceof Service) {
logger.log(Level.FINE, "clientMessageCoder destroying");
((Service) this.clientMessageCoder).destroy(config);
logger.log(Level.FINE, "clientMessageCoder destroyed");
}
if (this.timeoutExecutor != null) {
this.timeoutExecutor.shutdownNow();
logger.log(Level.FINE, "timeoutExecutor shutdownNow");
}
if (this.workExecutor != null && this.workExecutor != application.getWorkExecutor()) {
this.workExecutor.shutdownNow();
logger.log(Level.FINE, "workExecutor shutdownNow");
}
}
@@ -445,7 +430,7 @@ public abstract class MessageAgent implements Resourcable {
protected abstract MessageClientProducer createMessageClientProducer(String producerName);
//创建指定topic的消费处理器
public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor);
public abstract MessageClientConsumer createMessageClientConsumer(String topic, String group, MessageClientProcessor processor);
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
@@ -462,15 +447,15 @@ public abstract class MessageAgent implements Resourcable {
return;
}
}
String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service);
String topic = generateHttpReqTopic(service);
String consumerid = generateHttpConsumerid(topic, service);
serviceLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
}
HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor)));
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
} finally {
serviceLock.unlock();
}
@@ -493,7 +478,7 @@ public abstract class MessageAgent implements Resourcable {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
}
SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor)));
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
} finally {
serviceLock.unlock();
}
@@ -533,10 +518,10 @@ public abstract class MessageAgent implements Resourcable {
}
//格式: http.req.module.user
protected String[] generateHttpReqTopics(Service service) {
protected String generateHttpReqTopic(Service service) {
String resname = Sncp.getResourceName(service);
String module = Rest.getRestModule(service).toLowerCase();
return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))};
return "http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: consumer-sncp.req.module.user 不提供外部使用
@@ -545,7 +530,7 @@ public abstract class MessageAgent implements Resourcable {
}
//格式: consumer-http.req.module.user
protected String generateHttpConsumerid(String[] topics, Service service) {
protected String generateHttpConsumerid(String topic, Service service) {
String resname = Sncp.getResourceName(service);
String key = Rest.getRestModule(service).toLowerCase();
return "consumer-http.req.module." + key + (resname.isEmpty() ? "" : ("-" + resname));
@@ -564,8 +549,6 @@ public abstract class MessageAgent implements Resourcable {
private final Type messageType;
private final IntFunction<T[]> arrayCreator;
public MessageConsumerWrapper(MessageAgent messageAgent, MessageConsumer<T> consumer, ConvertType convertType) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(consumer);
@@ -575,7 +558,6 @@ public abstract class MessageAgent implements Resourcable {
this.consumer = consumer;
this.convert = ConvertFactory.findConvert(convertType);
this.messageType = parseMessageType(consumer.getClass());
this.arrayCreator = Creator.funcArray(TypeToken.typeToClass(messageType));
}
private static Type parseMessageType(Class<? extends MessageConsumer> clazz) {
@@ -602,18 +584,16 @@ public abstract class MessageAgent implements Resourcable {
public Future onMessage(MessageConext context, List<byte[]> messages) {
return messageAgent.submit(() -> {
try {
T[] msgs = this.arrayCreator.apply(messages.size());
int index = -1;
for (byte[] bs : messages) {
msgs[++index] = (T) convert.convertFrom(messageType, bs);
Convert c = this.convert;
MessageConsumer m = this.consumer;
for (byte[] bs : messages) {
try {
m.onMessage(context, (T) c.convertFrom(messageType, bs));
} catch (Throwable t) {
messageAgent.getLogger().log(Level.SEVERE, m.getClass().getSimpleName()
+ " onMessage error, topic: " + context.getTopic()
+ ", messages: " + new String(bs, StandardCharsets.UTF_8));
}
for (T msg : msgs) {
consumer.onMessage(context, msg);
}
} catch (Throwable t) {
messageAgent.getLogger().log(Level.SEVERE, MessageConsumer.class.getSimpleName() + " execute error, topic: " + context.getTopic()
+ ", messages: " + messages.stream().map(v -> new String(v, StandardCharsets.UTF_8)).collect(Collectors.toList()));
}
});
}

View File

@@ -99,7 +99,7 @@ public abstract class MessageClient {
}
};
long ones = System.currentTimeMillis();
MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{appRespTopic}, appRespConsumerid, processor);
MessageClientConsumer one = messageAgent.createMessageClientConsumer(appRespTopic, appRespConsumerid, processor);
one.start();
long onee = System.currentTimeMillis() - ones;
if (finest) {

View File

@@ -5,6 +5,9 @@
*/
package org.redkale.mq;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.logging.Logger;
@@ -20,7 +23,7 @@ import java.util.logging.Logger;
*/
public abstract class MessageClientConsumer {
protected final String[] topics;
protected final List<String> topics;
protected final String consumerid;
@@ -32,13 +35,13 @@ public abstract class MessageClientConsumer {
protected volatile boolean closed;
protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageClientProcessor processor) {
protected MessageClientConsumer(MessageAgent messageAgent, String topic, final String consumerid, MessageClientProcessor processor) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topics);
Objects.requireNonNull(topic);
Objects.requireNonNull(consumerid);
Objects.requireNonNull(processor);
this.messageAgent = messageAgent;
this.topics = topics;
this.topics = Collections.unmodifiableList(Arrays.asList(topic));
this.consumerid = consumerid;
this.processor = processor;
}
@@ -47,7 +50,7 @@ public abstract class MessageClientConsumer {
return processor;
}
public String[] getTopics() {
public List<String> getTopics() {
return topics;
}

View File

@@ -251,7 +251,6 @@ public abstract class Sncp {
public static <T extends Service> Class getServiceType(Class<T> serviceImplClass) {
SncpDyn dyn = serviceImplClass.getAnnotation(SncpDyn.class);
System.out.println("dyn = " + dyn + ", serviceImplClass = " + serviceImplClass + ", type = " + (dyn == null ? "ddd" : dyn.type()));
return dyn != null ? dyn.type() : serviceImplClass;
}