From 1fa17bf10c14234363a873a0d7fdec3d479fab08 Mon Sep 17 00:00:00 2001 From: redkale Date: Sun, 8 Oct 2023 12:11:49 +0800 Subject: [PATCH] mq --- .../java/org/redkale/boot/Application.java | 11 +--- .../java/org/redkale/boot/NodeHttpServer.java | 2 +- .../java/org/redkale/mq/MessageAgent.java | 56 ++++++------------- .../java/org/redkale/mq/MessageClient.java | 2 +- .../org/redkale/mq/MessageClientConsumer.java | 13 +++-- src/main/java/org/redkale/net/sncp/Sncp.java | 1 - 6 files changed, 29 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index a9fc50ef4..d7ddbd9b3 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -1720,16 +1720,7 @@ public final class Application { consumer.init(consumerConf); } } - Map map = agent.start(consumers); - AtomicInteger maxlen = new AtomicInteger(); - map.keySet().forEach(str -> { - if (str.length() > maxlen.get()) { - maxlen.set(str.length()); - } - }); - new TreeMap<>(map).forEach((topic, ms) -> sb.append(MessageClientConsumer.class.getSimpleName()).append("(topic=") - .append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n") - ); + agent.start(consumers); } if (sb.length() > 0) { logger.info(sb.toString().trim()); diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index 2a2e5309c..7c24ea67f 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -333,7 +333,7 @@ public class NodeHttpServer extends NodeServer { for (AbstractMap.SimpleEntry en : webss) { StringBuilder sub = new StringBuilder(); int pos = en.getKey().indexOf(':'); - sub.append("RestWebSocket (type=").append(en.getKey().substring(0, pos)); + sub.append("RestWebSocket (type=").append(en.getKey().substring(0, pos)); for (int i = 0; i < maxTypeLength - pos; i++) { sub.append(' '); } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 71131a1b7..32717cdeb 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -13,7 +13,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.IntFunction; import java.util.logging.*; import java.util.stream.Collectors; import org.redkale.annotation.*; @@ -140,20 +139,17 @@ public abstract class MessageAgent implements Resourcable { return workExecutor.submit(event); } - public Map start(List consumers) { + public void start(List consumers) { StringBuilder loginfo = initMessageConsumer(consumers); startMessageConsumer(); if (loginfo.length() > 0) { logger.log(Level.INFO, loginfo.toString()); } - final LinkedHashMap map = new LinkedHashMap<>(); this.clientConsumerNodes.values().forEach(node -> { long s = System.currentTimeMillis(); node.consumer.start(); long e = System.currentTimeMillis() - s; - map.put(node.consumer.consumerid, e); }); - return map; } //Application.stop 在执行server.shutdown之前执行 @@ -167,40 +163,29 @@ public abstract class MessageAgent implements Resourcable { //Application.stop 在所有server.shutdown执行后执行 public void destroy(AnyValue config) { - logger.log(Level.FINE, "MessageConsumer destroying"); for (MessageConsumer consumer : messageConsumerList) { consumer.destroy(config); } this.messageConsumerList.clear(); this.messageConsumerMap.clear(); - logger.log(Level.FINE, "MessageConsumer destroyed"); this.httpMessageClient.close(); this.sncpMessageClient.close(); - logger.log(Level.FINE, "httpMessageClient and sncpMessageClient destroyed"); if (this.httpClientProducer != null) { - logger.log(Level.FINE, "httpClientProducer stoping"); this.httpClientProducer.stop(); - logger.log(Level.FINE, "httpClientProducer stoped"); } if (this.sncpClientProducer != null) { - logger.log(Level.FINE, "sncpClientProducer stoping"); this.sncpClientProducer.stop(); - logger.log(Level.FINE, "sncpClientProducer stoped"); } if (this.clientMessageCoder instanceof Service) { - logger.log(Level.FINE, "clientMessageCoder destroying"); ((Service) this.clientMessageCoder).destroy(config); - logger.log(Level.FINE, "clientMessageCoder destroyed"); } if (this.timeoutExecutor != null) { this.timeoutExecutor.shutdownNow(); - logger.log(Level.FINE, "timeoutExecutor shutdownNow"); } if (this.workExecutor != null && this.workExecutor != application.getWorkExecutor()) { this.workExecutor.shutdownNow(); - logger.log(Level.FINE, "workExecutor shutdownNow"); } } @@ -445,7 +430,7 @@ public abstract class MessageAgent implements Resourcable { protected abstract MessageClientProducer createMessageClientProducer(String producerName); //创建指定topic的消费处理器 - public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor); + public abstract MessageClientConsumer createMessageClientConsumer(String topic, String group, MessageClientProcessor processor); public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); @@ -462,15 +447,15 @@ public abstract class MessageAgent implements Resourcable { return; } } - String[] topics = generateHttpReqTopics(service); - String consumerid = generateHttpConsumerid(topics, service); + String topic = generateHttpReqTopic(service); + String consumerid = generateHttpConsumerid(topic, service); serviceLock.lock(); try { if (clientConsumerNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); - this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor))); + this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); } finally { serviceLock.unlock(); } @@ -493,7 +478,7 @@ public abstract class MessageAgent implements Resourcable { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); - this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor))); + this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); } finally { serviceLock.unlock(); } @@ -533,10 +518,10 @@ public abstract class MessageAgent implements Resourcable { } //格式: http.req.module.user - protected String[] generateHttpReqTopics(Service service) { + protected String generateHttpReqTopic(Service service) { String resname = Sncp.getResourceName(service); String module = Rest.getRestModule(service).toLowerCase(); - return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))}; + return "http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname)); } //格式: consumer-sncp.req.module.user 不提供外部使用 @@ -545,7 +530,7 @@ public abstract class MessageAgent implements Resourcable { } //格式: consumer-http.req.module.user - protected String generateHttpConsumerid(String[] topics, Service service) { + protected String generateHttpConsumerid(String topic, Service service) { String resname = Sncp.getResourceName(service); String key = Rest.getRestModule(service).toLowerCase(); return "consumer-http.req.module." + key + (resname.isEmpty() ? "" : ("-" + resname)); @@ -564,8 +549,6 @@ public abstract class MessageAgent implements Resourcable { private final Type messageType; - private final IntFunction arrayCreator; - public MessageConsumerWrapper(MessageAgent messageAgent, MessageConsumer consumer, ConvertType convertType) { Objects.requireNonNull(messageAgent); Objects.requireNonNull(consumer); @@ -575,7 +558,6 @@ public abstract class MessageAgent implements Resourcable { this.consumer = consumer; this.convert = ConvertFactory.findConvert(convertType); this.messageType = parseMessageType(consumer.getClass()); - this.arrayCreator = Creator.funcArray(TypeToken.typeToClass(messageType)); } private static Type parseMessageType(Class clazz) { @@ -602,18 +584,16 @@ public abstract class MessageAgent implements Resourcable { public Future onMessage(MessageConext context, List messages) { return messageAgent.submit(() -> { - try { - T[] msgs = this.arrayCreator.apply(messages.size()); - int index = -1; - for (byte[] bs : messages) { - msgs[++index] = (T) convert.convertFrom(messageType, bs); + Convert c = this.convert; + MessageConsumer m = this.consumer; + for (byte[] bs : messages) { + try { + m.onMessage(context, (T) c.convertFrom(messageType, bs)); + } catch (Throwable t) { + messageAgent.getLogger().log(Level.SEVERE, m.getClass().getSimpleName() + + " onMessage error, topic: " + context.getTopic() + + ", messages: " + new String(bs, StandardCharsets.UTF_8)); } - for (T msg : msgs) { - consumer.onMessage(context, msg); - } - } catch (Throwable t) { - messageAgent.getLogger().log(Level.SEVERE, MessageConsumer.class.getSimpleName() + " execute error, topic: " + context.getTopic() - + ", messages: " + messages.stream().map(v -> new String(v, StandardCharsets.UTF_8)).collect(Collectors.toList())); } }); } diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 04c3d3989..cd1ba2711 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -99,7 +99,7 @@ public abstract class MessageClient { } }; long ones = System.currentTimeMillis(); - MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{appRespTopic}, appRespConsumerid, processor); + MessageClientConsumer one = messageAgent.createMessageClientConsumer(appRespTopic, appRespConsumerid, processor); one.start(); long onee = System.currentTimeMillis() - ones; if (finest) { diff --git a/src/main/java/org/redkale/mq/MessageClientConsumer.java b/src/main/java/org/redkale/mq/MessageClientConsumer.java index cdbef7348..93240efbe 100644 --- a/src/main/java/org/redkale/mq/MessageClientConsumer.java +++ b/src/main/java/org/redkale/mq/MessageClientConsumer.java @@ -5,6 +5,9 @@ */ package org.redkale.mq; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.logging.Logger; @@ -20,7 +23,7 @@ import java.util.logging.Logger; */ public abstract class MessageClientConsumer { - protected final String[] topics; + protected final List topics; protected final String consumerid; @@ -32,13 +35,13 @@ public abstract class MessageClientConsumer { protected volatile boolean closed; - protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageClientProcessor processor) { + protected MessageClientConsumer(MessageAgent messageAgent, String topic, final String consumerid, MessageClientProcessor processor) { Objects.requireNonNull(messageAgent); - Objects.requireNonNull(topics); + Objects.requireNonNull(topic); Objects.requireNonNull(consumerid); Objects.requireNonNull(processor); this.messageAgent = messageAgent; - this.topics = topics; + this.topics = Collections.unmodifiableList(Arrays.asList(topic)); this.consumerid = consumerid; this.processor = processor; } @@ -47,7 +50,7 @@ public abstract class MessageClientConsumer { return processor; } - public String[] getTopics() { + public List getTopics() { return topics; } diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 30f80e568..39307e725 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -251,7 +251,6 @@ public abstract class Sncp { public static Class getServiceType(Class serviceImplClass) { SncpDyn dyn = serviceImplClass.getAnnotation(SncpDyn.class); - System.out.println("dyn = " + dyn + ", serviceImplClass = " + serviceImplClass + ", type = " + (dyn == null ? "ddd" : dyn.type())); return dyn != null ? dyn.type() : serviceImplClass; }