This commit is contained in:
redkale
2023-10-06 12:15:05 +08:00
parent d5e193d0a8
commit 690acf286c
2 changed files with 38 additions and 3 deletions

View File

@@ -1739,7 +1739,7 @@ public final class Application {
ClassFilter.Loader.load(getHome(), this.serverClassLoader, ((this.excludelibs != null ? (this.excludelibs + ";") : "") + (excludelibs == null ? "" : excludelibs)).split(";"), filters);
}
private static String alignString(String value, int maxlen) {
private String alignString(String value, int maxlen) {
StringBuilder sb = new StringBuilder(maxlen);
sb.append(value);
for (int i = 0; i < maxlen - value.length(); i++) {

View File

@@ -10,6 +10,7 @@ import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntFunction;
@@ -139,8 +140,11 @@ public abstract class MessageAgent implements Resourcable {
}
public Map<String, Long> start(List<MessageConsumer> consumers) {
initMessageConsumer(consumers);
StringBuilder loginfo = initMessageConsumer(consumers);
startMessageConsumer();
if (loginfo.length() > 0) {
logger.log(Level.INFO, loginfo.toString());
}
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
this.clientConsumerNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
@@ -209,10 +213,14 @@ public abstract class MessageAgent implements Resourcable {
return messageProducerMap.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t)));
}
protected void initMessageConsumer(List<MessageConsumer> consumers) {
protected StringBuilder initMessageConsumer(List<MessageConsumer> consumers) {
final StringBuilder sb = new StringBuilder();
clientConsumerLock.lock();
try {
Map<String, Map<String, MessageConsumerWrapper>> maps = new HashMap<>();
AtomicInteger typeMax = new AtomicInteger();
AtomicInteger topicMax = new AtomicInteger();
Map<String, String> views = new LinkedHashMap<>();
for (MessageConsumer consumer : consumers) {
ResourceConsumer res = consumer.getClass().getAnnotation(ResourceConsumer.class);
String group = application.getPropertyValue(res.group());
@@ -220,9 +228,11 @@ public abstract class MessageAgent implements Resourcable {
group = consumer.getClass().getName();
}
Map<String, MessageConsumerWrapper> map = maps.computeIfAbsent(group, g -> new HashMap<>());
List<String> topics = new ArrayList<>();
for (String t : res.topics()) {
String topic = application.getPropertyValue(t);
if (!topic.trim().isEmpty()) {
topics.add(topic);
if (map.containsKey(topic.trim())) {
throw new RedkaleException(MessageConsumer.class.getSimpleName()
+ " consume topic (" + topic + ") repeat with " + map.get(topic).getClass().getName() + " and " + consumer.getClass().getName());
@@ -237,12 +247,37 @@ public abstract class MessageAgent implements Resourcable {
map.put(topic.trim(), new MessageConsumerWrapper(this, consumer, res.convertType()));
}
}
String typestr = consumer.getClass().getName();
String topicstr = topics.size() == 1 ? topics.get(0) : topics.toString();
if (typestr.length() > typeMax.get()) {
typeMax.set(typestr.length());
}
if (topicstr.length() > topicMax.get()) {
topicMax.set(topicstr.length());
}
views.put(typestr, topicstr);
}
views.forEach((typestr, topicstr) -> {
sb.append(MessageConsumer.class.getSimpleName())
.append(" (type=").append(alignString(typestr, typeMax.get()))
.append(", topics=").append(alignString(topicstr, topicMax.get()))
.append(") startuped\r\n");
});
messageConsumerList.addAll(consumers);
messageConsumerMap.putAll(maps);
} finally {
clientConsumerLock.unlock();
}
return sb;
}
static String alignString(String value, int maxlen) {
StringBuilder sb = new StringBuilder(maxlen);
sb.append(value);
for (int i = 0; i < maxlen - value.length(); i++) {
sb.append(' ');
}
return sb.toString();
}
@Override