diff --git a/src/main/java/org/redkale/mq/spi/MessageAgent.java b/src/main/java/org/redkale/mq/spi/MessageAgent.java index 865b2e4c8..341c775be 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAgent.java +++ b/src/main/java/org/redkale/mq/spi/MessageAgent.java @@ -268,7 +268,8 @@ public abstract class MessageAgent implements MessageManager { topics.add(topic); if (map.containsKey(topic.trim())) { throw new RedkaleException(MessageConsumer.class.getSimpleName() - + " consume topic (" + topic + ") repeat with " + map.get(topic).getClass().getName() + " and " + consumer.getClass().getName()); + + " consume topic (" + topic + ") repeat with " + + map.get(topic).getClass().getName() + " and " + consumer.getClass().getName()); } for (MessageConsumerWrapper wrapper : map.values()) { if (!Objects.equals(res.convertType(), wrapper.convertType)) { diff --git a/src/main/java/org/redkale/mq/spi/MessageCoder.java b/src/main/java/org/redkale/mq/spi/MessageCoder.java index ac35071cf..b7fad0570 100644 --- a/src/main/java/org/redkale/mq/spi/MessageCoder.java +++ b/src/main/java/org/redkale/mq/spi/MessageCoder.java @@ -216,7 +216,7 @@ public interface MessageCoder { } } else { buffer.putShort((short) 0); - putBigString(buffer, value == null ? null : value.toString()); + putBigString(buffer, value.toString()); } } @@ -244,7 +244,7 @@ public interface MessageCoder { } return c; } else { - return 4 + (value == null ? 0 : Utility.encodeUTF8Length(value.toString())); + return 4 + Utility.encodeUTF8Length(value.toString()); } } diff --git a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java index 78c4ed3b8..fd39ed1d9 100644 --- a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java +++ b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java @@ -47,7 +47,7 @@ public class MessageModuleEngine extends ModuleEngine { //MQ管理配置资源 //@since 2.8.0 - private Properties messageProperties = new Properties(); + private final Properties messageProperties = new Properties(); // private final Map> agentConsumers = new ConcurrentHashMap<>(); @@ -68,6 +68,7 @@ public class MessageModuleEngine extends ModuleEngine { * * @return 方法动态扩展器 */ + @Override public AsmMethodBoost createAsmMethodBoost(boolean remote, Class serviceClass) { return new MessageAsmMethodBoost(remote, serviceClass, this); } @@ -190,7 +191,8 @@ public class MessageModuleEngine extends ModuleEngine { @Override public void load(ResourceFactory rf, String srcResourceName, Object srcObj, ResourceProducer annotation, Field field, Object attachment) { if (field.getType() != MessageProducer.class) { - throw new RestException("@" + ResourceProducer.class.getSimpleName() + " must on " + MessageProducer.class.getName() + " type field, but on " + field); + throw new RestException("@" + ResourceProducer.class.getSimpleName() + + " must on " + MessageProducer.class.getName() + " type field, but on " + field); } MessageAgent agent = resourceFactory.find(annotation.mq(), MessageAgent.class); if (!annotation.required() && agent == null) { @@ -244,6 +246,7 @@ public class MessageModuleEngine extends ModuleEngine { * @param namespace 命名空间 * @param events 变更项 */ + @Override public void onEnvironmentChanged(String namespace, List events) { Set messageRemovedKeys = new HashSet<>(); Properties messageChangedProps = new Properties(); @@ -340,7 +343,7 @@ public class MessageModuleEngine extends ModuleEngine { } } } - messageRemovedKeys.forEach(k -> this.messageProperties.remove(k)); + messageRemovedKeys.forEach(this.messageProperties::remove); this.messageProperties.putAll(messageChangedProps); } @@ -349,6 +352,7 @@ public class MessageModuleEngine extends ModuleEngine { /** * 服务全部启动后被调用 */ + @Override public void onServersPostStart() { if (this.messageAgents == null) { return; @@ -425,6 +429,7 @@ public class MessageModuleEngine extends ModuleEngine { /** * 服务全部停掉前被调用 */ + @Override public void onServersPreStop() { if (application.isCompileMode() && this.messageAgents != null) { Set names = new HashSet<>(); @@ -445,6 +450,7 @@ public class MessageModuleEngine extends ModuleEngine { /** * 服务全部停掉后被调用 */ + @Override public void onServersPostStop() { if (this.messageAgents != null) { Set names = new HashSet<>(); diff --git a/src/main/java/org/redkale/mq/spi/MessageRecord.java b/src/main/java/org/redkale/mq/spi/MessageRecord.java index f625f88b8..0fafe1182 100644 --- a/src/main/java/org/redkale/mq/spi/MessageRecord.java +++ b/src/main/java/org/redkale/mq/spi/MessageRecord.java @@ -102,11 +102,13 @@ public class MessageRecord implements Serializable { this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, respTopic, traceid, content); } - protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) { + protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, + String groupid, String topic, String respTopic, String traceid, byte[] content) { this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, respTopic, traceid, content); } - protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) { + protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, + String groupid, String topic, String respTopic, String traceid, byte[] content) { this.seqid = seqid; this.ctype = ctype; this.version = version; @@ -364,7 +366,4 @@ public class MessageRecord implements Serializable { return sb.toString(); } -// public static void main(String[] args) throws Throwable { -// System.out.println(new MessageRecord(333, 2, 3, null, "tt", null, "xxx".getBytes())); -// } } diff --git a/src/main/java/org/redkale/mq/spi/MessageRespFuture.java b/src/main/java/org/redkale/mq/spi/MessageRespFuture.java index 6a7375988..8cbd14d6e 100644 --- a/src/main/java/org/redkale/mq/spi/MessageRespFuture.java +++ b/src/main/java/org/redkale/mq/spi/MessageRespFuture.java @@ -44,8 +44,11 @@ public class MessageRespFuture implements Runnable { public void run() { //timeout messageClient.respQueue.remove(this.seqid); future.completeExceptionally(new TimeoutException("message-record: " + message)); - messageClient.logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms" - + (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : "")); + messageClient.logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + + " timeout " + (System.currentTimeMillis() - createTime) + "ms" + + (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) + ? (message.userid != null ? (", userid:" + message.userid) + : (", groupid:" + message.groupid)) : "")); } public long getSeqid() { diff --git a/src/main/java/org/redkale/mq/spi/MessageRespProcessor.java b/src/main/java/org/redkale/mq/spi/MessageRespProcessor.java index 31618bd63..e68aa35ad 100644 --- a/src/main/java/org/redkale/mq/spi/MessageRespProcessor.java +++ b/src/main/java/org/redkale/mq/spi/MessageRespProcessor.java @@ -41,18 +41,22 @@ public class MessageRespProcessor implements MessageProcessor { } final long deplay = now - msg.createTime; if (finest) { - logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + "ms, mq.seqid = " + msg.getSeqid() + ")"); + logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + + "ms, mq.seqid = " + msg.getSeqid() + ")"); } messageClient.getMessageAgent().execute(() -> { Traces.currentTraceid(traceid); resp.future.complete(msg); long comems = System.currentTimeMillis() - now; if ((deplay > 1000 || comems > 1000) && logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slower = " + deplay + "ms, mq.complete-slower = " + comems + "ms) mqresp.msg: " + msg); + logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slower = " + deplay + + "ms, mq.complete-slower = " + comems + "ms) mqresp.msg: " + msg); } else if ((deplay > 50 || comems > 50) && logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slowly = " + deplay + "ms, mq.complete-slowly = " + comems + "ms) mqresp.msg: " + msg); + logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slowly = " + deplay + + "ms, mq.complete-slowly = " + comems + "ms) mqresp.msg: " + msg); } else if (finest) { - logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-normal = " + deplay + "ms, mq.complete-normal = " + comems + "ms) mqresp.msg: " + msg); + logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-normal = " + deplay + + "ms, mq.complete-normal = " + comems + "ms) mqresp.msg: " + msg); } Traces.removeTraceid(); }); diff --git a/src/main/java/org/redkale/mq/spi/MessageServlet.java b/src/main/java/org/redkale/mq/spi/MessageServlet.java index 57c0ea739..b7eaf0953 100644 --- a/src/main/java/org/redkale/mq/spi/MessageServlet.java +++ b/src/main/java/org/redkale/mq/spi/MessageServlet.java @@ -61,17 +61,21 @@ public abstract class MessageServlet implements MessageProcessor { context.execute(servlet, request, response); long exems = System.currentTimeMillis() - now; if ((delay > 1000 || block > 100 || exems > 1000) && logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, getClass().getSimpleName() + ".process (mq.delay-slower = " + delay + " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message); + logger.log(Level.FINE, getClass().getSimpleName() + ".process (mq.delay-slower = " + delay + + " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message); } else if ((delay > 50 || block > 10 || exems > 50) && logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, getClass().getSimpleName() + ".process (mq.delay-slowly = " + delay + " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message); + logger.log(Level.FINER, getClass().getSimpleName() + ".process (mq.delay-slowly = " + delay + + " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message); } else if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, getClass().getSimpleName() + ".process (mq.delay-normal = " + delay + " ms, mq.block = " + block + " ms, mq.execute = " + exems + " ms) message: " + message); + logger.log(Level.FINEST, getClass().getSimpleName() + ".process (mq.delay-normal = " + delay + + " ms, mq.block = " + block + " ms, mq.execute = " + exems + " ms) message: " + message); } } catch (Throwable ex) { if (response != null) { onError(response, message, ex); } - logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); + logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); } finally { Traces.removeTraceid(); } diff --git a/src/main/java/org/redkale/mq/spi/SncpMessageResponse.java b/src/main/java/org/redkale/mq/spi/SncpMessageResponse.java index bbc1bca28..2121053e3 100644 --- a/src/main/java/org/redkale/mq/spi/SncpMessageResponse.java +++ b/src/main/java/org/redkale/mq/spi/SncpMessageResponse.java @@ -36,11 +36,13 @@ public class SncpMessageResponse extends SncpResponse { if (out == null) { final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize); writeHeader(result, 0, retcode); - messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, (byte[]) null)); + messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), + MessageRecord.CTYPE_BSON, message.getRespTopic(), null, (byte[]) null)); return; } final ByteArray result = out.toByteArray(); writeHeader(result, result.length() - headerSize, retcode); - messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, result.getBytes())); + messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), + MessageRecord.CTYPE_BSON, message.getRespTopic(), null, result.getBytes())); } } diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 09d581a56..1bc57846d 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -31,7 +31,8 @@ public abstract class ClientCodec> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); + private final ObjectPool> respPool = + ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); protected final ClientConnection connection; diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 2118fd6fd..864c5da37 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -675,7 +675,8 @@ public abstract class Sncp { List params = methodBean.getParams(); for (int i = 0; i < paramTypes.length; i++) { AsmMethodParam param = params.get(i); - mv.visitLocalVariable(param.getName(), param.description(paramTypes[i]), param.signature(paramTypes[i]), l0, l2, insns.get(i)); + mv.visitLocalVariable(param.getName(), param.description(paramTypes[i]), + param.signature(paramTypes[i]), l0, l2, insns.get(i)); } } mv.visitMaxs(20, 20); diff --git a/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java b/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java index 30010dbf7..8580132bd 100644 --- a/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java +++ b/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java @@ -65,7 +65,9 @@ public interface SncpAsyncHandler extends CompletionHandler { FieldVisitor fv; MethodDebugVisitor mv; AnnotationVisitor av0; - cw.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null, handlerInterface ? "java/lang/Object" : handlerClassName, handlerInterface && handlerClass != sncpHandlerClass ? new String[]{handlerClassName, sncpHandlerName} : new String[]{sncpHandlerName}); + cw.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null, + handlerInterface ? "java/lang/Object" : handlerClassName, + handlerInterface && handlerClass != sncpHandlerClass ? new String[]{handlerClassName, sncpHandlerName} : new String[]{sncpHandlerName}); { //handler 属性 fv = cw.visitField(ACC_PRIVATE, "factHandler", realHandlerDesc, null, null); @@ -182,6 +184,10 @@ public interface SncpAsyncHandler extends CompletionHandler { static class HandlerInner { static final Map> creatorMap = new ConcurrentHashMap<>(); + + private HandlerInner() { + //do nothing + } } } diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index 1315f9703..b630c61c0 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -135,15 +135,18 @@ public class SncpHeader { } public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, byte keepAlive, int bodyLength, int retcode) { - return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), keepAlive, bodyLength, retcode); + return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), + clientRequest.traceBytes(), keepAlive, bodyLength, retcode); } public ByteArray writeTo(ByteArray array, SncpResponse response, byte keepAlive, int bodyLength, int retcode) { SncpRequest request = response.request(); - return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), keepAlive, bodyLength, retcode); + return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), + request.traceBytes(), keepAlive, bodyLength, retcode); } - private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, byte keepAlive, int bodyLength, int retcode) { + private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, + byte[] traces, byte keepAlive, int bodyLength, int retcode) { if (newAddrBytes.length != 4) { throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length); } diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index c70a7b372..dfbffe195 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -90,7 +90,8 @@ public class SncpRemoteInfo { this.topic = messageAgent == null ? null : Sncp.generateSncpReqTopic(resourceName, resourceType, messageAgent.getNodeid()); for (Map.Entry en : loadRemoteMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) { - this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient)); + this.actions.put(en.getKey().toString(), + new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient)); } } @@ -187,8 +188,10 @@ public class SncpRemoteInfo { final int retcode = header.getRetcode(); if (retcode != 0) { logger.log(Level.SEVERE, action.method + " sncp (params: " + JsonConvert.root().convertTo(params) - + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params)); - throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + + "), params=" + JsonConvert.root().convertTo(params)); + throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); } final int respBodyLength = header.getBodyLength(); byte[] body = new byte[respBodyLength]; @@ -332,7 +335,8 @@ public class SncpRemoteInfo { protected final SncpHeader header; @SuppressWarnings("unchecked") - SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, Uint128 serviceid, Uint128 actionid, final SncpClient sncpClient) { + SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, + Uint128 serviceid, Uint128 actionid, final SncpClient sncpClient) { this.actionid = actionid == null ? Sncp.actionid(method) : actionid; Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass); this.returnObjectType = rt == void.class || rt == Void.class ? null : rt; @@ -425,7 +429,8 @@ public class SncpRemoteInfo { this.paramHandlerClass = handlerFuncClass; this.paramHandlerResultType = handlerResultType; this.paramHandlerAttachIndex = handlerAttachIndex; - this.header = SncpHeader.create(sncpClient == null ? null : sncpClient.getClientSncpAddress(), serviceid, resourceType.getName(), actionid, method.getName()); + this.header = SncpHeader.create(sncpClient == null ? null : sncpClient.getClientSncpAddress(), + serviceid, resourceType.getName(), actionid, method.getName()); if (this.paramHandlerIndex >= 0 && method.getReturnType() != void.class) { throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); } diff --git a/src/main/java/org/redkale/net/sncp/SncpServer.java b/src/main/java/org/redkale/net/sncp/SncpServer.java index 4fff82ef6..856624d8b 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServer.java +++ b/src/main/java/org/redkale/net/sncp/SncpServer.java @@ -129,8 +129,7 @@ public class SncpServer extends Server createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { Creator creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context)); - ObjectPool pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle); - return pool; + return ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle); } }