格式化

This commit is contained in:
redkale
2024-01-18 11:26:49 +08:00
parent 636927e4eb
commit e347a43738
14 changed files with 70 additions and 36 deletions

View File

@@ -268,7 +268,8 @@ public abstract class MessageAgent implements MessageManager {
topics.add(topic);
if (map.containsKey(topic.trim())) {
throw new RedkaleException(MessageConsumer.class.getSimpleName()
+ " consume topic (" + topic + ") repeat with " + map.get(topic).getClass().getName() + " and " + consumer.getClass().getName());
+ " consume topic (" + topic + ") repeat with "
+ map.get(topic).getClass().getName() + " and " + consumer.getClass().getName());
}
for (MessageConsumerWrapper wrapper : map.values()) {
if (!Objects.equals(res.convertType(), wrapper.convertType)) {

View File

@@ -216,7 +216,7 @@ public interface MessageCoder<T> {
}
} else {
buffer.putShort((short) 0);
putBigString(buffer, value == null ? null : value.toString());
putBigString(buffer, value.toString());
}
}
@@ -244,7 +244,7 @@ public interface MessageCoder<T> {
}
return c;
} else {
return 4 + (value == null ? 0 : Utility.encodeUTF8Length(value.toString()));
return 4 + Utility.encodeUTF8Length(value.toString());
}
}

View File

@@ -47,7 +47,7 @@ public class MessageModuleEngine extends ModuleEngine {
//MQ管理配置资源
//@since 2.8.0
private Properties messageProperties = new Properties();
private final Properties messageProperties = new Properties();
//
private final Map<String, List<MessageConsumer>> agentConsumers = new ConcurrentHashMap<>();
@@ -68,6 +68,7 @@ public class MessageModuleEngine extends ModuleEngine {
*
* @return 方法动态扩展器
*/
@Override
public AsmMethodBoost createAsmMethodBoost(boolean remote, Class serviceClass) {
return new MessageAsmMethodBoost(remote, serviceClass, this);
}
@@ -190,7 +191,8 @@ public class MessageModuleEngine extends ModuleEngine {
@Override
public void load(ResourceFactory rf, String srcResourceName, Object srcObj, ResourceProducer annotation, Field field, Object attachment) {
if (field.getType() != MessageProducer.class) {
throw new RestException("@" + ResourceProducer.class.getSimpleName() + " must on " + MessageProducer.class.getName() + " type field, but on " + field);
throw new RestException("@" + ResourceProducer.class.getSimpleName()
+ " must on " + MessageProducer.class.getName() + " type field, but on " + field);
}
MessageAgent agent = resourceFactory.find(annotation.mq(), MessageAgent.class);
if (!annotation.required() && agent == null) {
@@ -244,6 +246,7 @@ public class MessageModuleEngine extends ModuleEngine {
* @param namespace 命名空间
* @param events 变更项
*/
@Override
public void onEnvironmentChanged(String namespace, List<ResourceEvent> events) {
Set<String> messageRemovedKeys = new HashSet<>();
Properties messageChangedProps = new Properties();
@@ -340,7 +343,7 @@ public class MessageModuleEngine extends ModuleEngine {
}
}
}
messageRemovedKeys.forEach(k -> this.messageProperties.remove(k));
messageRemovedKeys.forEach(this.messageProperties::remove);
this.messageProperties.putAll(messageChangedProps);
}
@@ -349,6 +352,7 @@ public class MessageModuleEngine extends ModuleEngine {
/**
* 服务全部启动后被调用
*/
@Override
public void onServersPostStart() {
if (this.messageAgents == null) {
return;
@@ -425,6 +429,7 @@ public class MessageModuleEngine extends ModuleEngine {
/**
* 服务全部停掉前被调用
*/
@Override
public void onServersPreStop() {
if (application.isCompileMode() && this.messageAgents != null) {
Set<String> names = new HashSet<>();
@@ -445,6 +450,7 @@ public class MessageModuleEngine extends ModuleEngine {
/**
* 服务全部停掉后被调用
*/
@Override
public void onServersPostStop() {
if (this.messageAgents != null) {
Set<String> names = new HashSet<>();

View File

@@ -102,11 +102,13 @@ public class MessageRecord implements Serializable {
this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, respTopic, traceid, content);
}
protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) {
protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid,
String groupid, String topic, String respTopic, String traceid, byte[] content) {
this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, respTopic, traceid, content);
}
protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) {
protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid,
String groupid, String topic, String respTopic, String traceid, byte[] content) {
this.seqid = seqid;
this.ctype = ctype;
this.version = version;
@@ -364,7 +366,4 @@ public class MessageRecord implements Serializable {
return sb.toString();
}
// public static void main(String[] args) throws Throwable {
// System.out.println(new MessageRecord(333, 2, 3, null, "tt", null, "xxx".getBytes()));
// }
}

View File

@@ -44,8 +44,11 @@ public class MessageRespFuture implements Runnable {
public void run() { //timeout
messageClient.respQueue.remove(this.seqid);
future.completeExceptionally(new TimeoutException("message-record: " + message));
messageClient.logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms"
+ (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : ""));
messageClient.logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message
+ " timeout " + (System.currentTimeMillis() - createTime) + "ms"
+ (message.userid != null || (message.groupid != null && !message.groupid.isEmpty())
? (message.userid != null ? (", userid:" + message.userid)
: (", groupid:" + message.groupid)) : ""));
}
public long getSeqid() {

View File

@@ -41,18 +41,22 @@ public class MessageRespProcessor implements MessageProcessor {
}
final long deplay = now - msg.createTime;
if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + "ms, mq.seqid = " + msg.getSeqid() + ")");
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay
+ "ms, mq.seqid = " + msg.getSeqid() + ")");
}
messageClient.getMessageAgent().execute(() -> {
Traces.currentTraceid(traceid);
resp.future.complete(msg);
long comems = System.currentTimeMillis() - now;
if ((deplay > 1000 || comems > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slower = " + deplay + "ms, mq.complete-slower = " + comems + "ms) mqresp.msg: " + msg);
logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slower = " + deplay
+ "ms, mq.complete-slower = " + comems + "ms) mqresp.msg: " + msg);
} else if ((deplay > 50 || comems > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slowly = " + deplay + "ms, mq.complete-slowly = " + comems + "ms) mqresp.msg: " + msg);
logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-slowly = " + deplay
+ "ms, mq.complete-slowly = " + comems + "ms) mqresp.msg: " + msg);
} else if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-normal = " + deplay + "ms, mq.complete-normal = " + comems + "ms) mqresp.msg: " + msg);
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-normal = " + deplay
+ "ms, mq.complete-normal = " + comems + "ms) mqresp.msg: " + msg);
}
Traces.removeTraceid();
});

View File

@@ -61,17 +61,21 @@ public abstract class MessageServlet implements MessageProcessor {
context.execute(servlet, request, response);
long exems = System.currentTimeMillis() - now;
if ((delay > 1000 || block > 100 || exems > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, getClass().getSimpleName() + ".process (mq.delay-slower = " + delay + " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message);
logger.log(Level.FINE, getClass().getSimpleName() + ".process (mq.delay-slower = " + delay
+ " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message);
} else if ((delay > 50 || block > 10 || exems > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, getClass().getSimpleName() + ".process (mq.delay-slowly = " + delay + " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message);
logger.log(Level.FINER, getClass().getSimpleName() + ".process (mq.delay-slowly = " + delay
+ " ms, mq.block = " + block + " ms, mq.executes = " + exems + " ms) message: " + message);
} else if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".process (mq.delay-normal = " + delay + " ms, mq.block = " + block + " ms, mq.execute = " + exems + " ms) message: " + message);
logger.log(Level.FINEST, getClass().getSimpleName() + ".process (mq.delay-normal = " + delay
+ " ms, mq.block = " + block + " ms, mq.execute = " + exems + " ms) message: " + message);
}
} catch (Throwable ex) {
if (response != null) {
onError(response, message, ex);
}
logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message="
+ message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
} finally {
Traces.removeTraceid();
}

View File

@@ -36,11 +36,13 @@ public class SncpMessageResponse extends SncpResponse {
if (out == null) {
final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize);
writeHeader(result, 0, retcode);
messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, (byte[]) null));
messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(),
MessageRecord.CTYPE_BSON, message.getRespTopic(), null, (byte[]) null));
return;
}
final ByteArray result = out.toByteArray();
writeHeader(result, result.length() - headerSize, retcode);
messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, result.getBytes()));
messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(),
MessageRecord.CTYPE_BSON, message.getRespTopic(), null, result.getBytes()));
}
}

View File

@@ -31,7 +31,8 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
private final ByteArray readArray = new ByteArray();
private final ObjectPool<ClientResponse<R, P>> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
private final ObjectPool<ClientResponse<R, P>> respPool =
ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
protected final ClientConnection<R, P> connection;

View File

@@ -675,7 +675,8 @@ public abstract class Sncp {
List<AsmMethodParam> params = methodBean.getParams();
for (int i = 0; i < paramTypes.length; i++) {
AsmMethodParam param = params.get(i);
mv.visitLocalVariable(param.getName(), param.description(paramTypes[i]), param.signature(paramTypes[i]), l0, l2, insns.get(i));
mv.visitLocalVariable(param.getName(), param.description(paramTypes[i]),
param.signature(paramTypes[i]), l0, l2, insns.get(i));
}
}
mv.visitMaxs(20, 20);

View File

@@ -65,7 +65,9 @@ public interface SncpAsyncHandler<V, A> extends CompletionHandler<V, A> {
FieldVisitor fv;
MethodDebugVisitor mv;
AnnotationVisitor av0;
cw.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null, handlerInterface ? "java/lang/Object" : handlerClassName, handlerInterface && handlerClass != sncpHandlerClass ? new String[]{handlerClassName, sncpHandlerName} : new String[]{sncpHandlerName});
cw.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null,
handlerInterface ? "java/lang/Object" : handlerClassName,
handlerInterface && handlerClass != sncpHandlerClass ? new String[]{handlerClassName, sncpHandlerName} : new String[]{sncpHandlerName});
{ //handler 属性
fv = cw.visitField(ACC_PRIVATE, "factHandler", realHandlerDesc, null, null);
@@ -182,6 +184,10 @@ public interface SncpAsyncHandler<V, A> extends CompletionHandler<V, A> {
static class HandlerInner {
static final Map<Class, Creator<SncpAsyncHandler>> creatorMap = new ConcurrentHashMap<>();
private HandlerInner() {
//do nothing
}
}
}

View File

@@ -135,15 +135,18 @@ public class SncpHeader {
}
public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, byte keepAlive, int bodyLength, int retcode) {
return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), keepAlive, bodyLength, retcode);
return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(),
clientRequest.traceBytes(), keepAlive, bodyLength, retcode);
}
public ByteArray writeTo(ByteArray array, SncpResponse response, byte keepAlive, int bodyLength, int retcode) {
SncpRequest request = response.request();
return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), keepAlive, bodyLength, retcode);
return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(),
request.traceBytes(), keepAlive, bodyLength, retcode);
}
private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, byte keepAlive, int bodyLength, int retcode) {
private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid,
byte[] traces, byte keepAlive, int bodyLength, int retcode) {
if (newAddrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length);
}

View File

@@ -90,7 +90,8 @@ public class SncpRemoteInfo<S extends Service> {
this.topic = messageAgent == null ? null : Sncp.generateSncpReqTopic(resourceName, resourceType, messageAgent.getNodeid());
for (Map.Entry<Uint128, Method> en : loadRemoteMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) {
this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient));
this.actions.put(en.getKey().toString(),
new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient));
}
}
@@ -187,8 +188,10 @@ public class SncpRemoteInfo<S extends Service> {
final int retcode = header.getRetcode();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + JsonConvert.root().convertTo(params)
+ ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
+ ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode)
+ "), params=" + JsonConvert.root().convertTo(params));
throw new SncpException("remote service(" + action.method + ") deal error (retcode=" + retcode
+ ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
final int respBodyLength = header.getBodyLength();
byte[] body = new byte[respBodyLength];
@@ -332,7 +335,8 @@ public class SncpRemoteInfo<S extends Service> {
protected final SncpHeader header;
@SuppressWarnings("unchecked")
SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, Uint128 serviceid, Uint128 actionid, final SncpClient sncpClient) {
SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method,
Uint128 serviceid, Uint128 actionid, final SncpClient sncpClient) {
this.actionid = actionid == null ? Sncp.actionid(method) : actionid;
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass);
this.returnObjectType = rt == void.class || rt == Void.class ? null : rt;
@@ -425,7 +429,8 @@ public class SncpRemoteInfo<S extends Service> {
this.paramHandlerClass = handlerFuncClass;
this.paramHandlerResultType = handlerResultType;
this.paramHandlerAttachIndex = handlerAttachIndex;
this.header = SncpHeader.create(sncpClient == null ? null : sncpClient.getClientSncpAddress(), serviceid, resourceType.getName(), actionid, method.getName());
this.header = SncpHeader.create(sncpClient == null ? null : sncpClient.getClientSncpAddress(),
serviceid, resourceType.getName(), actionid, method.getName());
if (this.paramHandlerIndex >= 0 && method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}

View File

@@ -129,8 +129,7 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
@Override
protected ObjectPool<SncpResponse> createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
Creator<SncpResponse> creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context));
ObjectPool<SncpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle);
return pool;
return ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle);
}
}