From 8d4ebe653dbe66f1a372c5ff19a051309f20feec Mon Sep 17 00:00:00 2001 From: Redkale Date: Fri, 27 Jan 2023 15:39:12 +0800 Subject: [PATCH] =?UTF-8?q?net=E6=B5=BC=E6=A8=BA=E5=AF=B2=E7=92=81?= =?UTF-8?q?=E2=84=83=E6=9A=9F=E9=8D=A3=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/boot/LoggingFileHandler.java | 16 +++++------ .../org/redkale/mq/HttpMessageClient.java | 28 +++++++++---------- .../redkale/mq/HttpMessageClusterClient.java | 8 +++--- .../redkale/mq/HttpMessageLocalClient.java | 8 +++--- .../java/org/redkale/mq/MessageClient.java | 10 +++---- .../org/redkale/mq/MessageRespFutureNode.java | 8 +++--- .../org/redkale/mq/SncpMessageClient.java | 6 ++-- src/main/java/org/redkale/net/AsyncGroup.java | 1 + .../java/org/redkale/net/AsyncIOThread.java | 6 ++-- .../redkale/net/AsyncNioTcpConnection.java | 6 ++-- 10 files changed, 48 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/redkale/boot/LoggingFileHandler.java b/src/main/java/org/redkale/boot/LoggingFileHandler.java index 077d24cc2..3d4a509c9 100644 --- a/src/main/java/org/redkale/boot/LoggingFileHandler.java +++ b/src/main/java/org/redkale/boot/LoggingFileHandler.java @@ -94,9 +94,9 @@ public class LoggingFileHandler extends LoggingBaseHandler { protected Pattern denyregx; - private final AtomicLong loglength = new AtomicLong(); + private final AtomicLong logLength = new AtomicLong(); - private final AtomicLong logunusuallength = new AtomicLong(); + private final AtomicLong logUnusualLength = new AtomicLong(); private File logfile; @@ -139,7 +139,7 @@ public class LoggingFileHandler extends LoggingBaseHandler { while (true) { try { LogRecord log = logqueue.take(); - final boolean bigger = (limit > 0 && limit <= loglength.get()); + final boolean bigger = (limit > 0 && limit <= logLength.get()); final boolean changeday = tomorrow <= log.getMillis(); if (bigger || changeday) { updateTomorrow(); @@ -163,7 +163,7 @@ public class LoggingFileHandler extends LoggingBaseHandler { } if (unusual != null && changeday && logunusualstream != null) { logunusualstream.close(); - if (limit > 0 && limit <= logunusuallength.get()) { + if (limit > 0 && limit <= logUnusualLength.get()) { for (int i = Math.min(count - 2, logunusualindex.get() - 1); i > 0; i--) { File greater = new File(logunusualfile.getPath() + "." + i); if (greater.exists()) { @@ -182,14 +182,14 @@ public class LoggingFileHandler extends LoggingBaseHandler { logindex.incrementAndGet(); logfile = new File(patternDateFormat == null ? pattern : Utility.formatTime(patternDateFormat, -1, System.currentTimeMillis())); logfile.getParentFile().mkdirs(); - loglength.set(logfile.length()); + logLength.set(logfile.length()); logstream = new FileOutputStream(logfile, append); } if (unusual != null && logunusualstream == null) { logunusualindex.incrementAndGet(); logunusualfile = new File(unusualDateFormat == null ? unusual : Utility.formatTime(unusualDateFormat, -1, System.currentTimeMillis())); logunusualfile.getParentFile().mkdirs(); - logunusuallength.set(logunusualfile.length()); + logUnusualLength.set(logunusualfile.length()); logunusualstream = new FileOutputStream(logunusualfile, append); } //----------------------写日志------------------------- @@ -197,10 +197,10 @@ public class LoggingFileHandler extends LoggingBaseHandler { String encoding = getEncoding(); byte[] bytes = encoding == null ? message.getBytes() : message.getBytes(encoding); logstream.write(bytes); - loglength.addAndGet(bytes.length); + logLength.addAndGet(bytes.length); if (unusual != null && (log.getLevel() == Level.WARNING || log.getLevel() == Level.SEVERE)) { logunusualstream.write(bytes); - logunusuallength.addAndGet(bytes.length); + logUnusualLength.addAndGet(bytes.length); } } catch (Exception e) { ErrorManager err = getErrorManager(); diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index 95e1e506c..4aba7b4ab 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -9,7 +9,7 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.logging.Logger; import org.redkale.convert.json.JsonConvert; import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST; @@ -62,7 +62,7 @@ public class HttpMessageClient extends MessageClient { produceMessage(generateHttpReqTopic(request, null), 0, null, request, null); } - public final void produceMessage(HttpSimpleRequest request, AtomicLong counter) { + public final void produceMessage(HttpSimpleRequest request, LongAdder counter) { produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter); } @@ -74,7 +74,7 @@ public class HttpMessageClient extends MessageClient { produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } - public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); } @@ -82,7 +82,7 @@ public class HttpMessageClient extends MessageClient { produceMessage(topic, 0, null, request, null); } - public final void produceMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { + public final void produceMessage(String topic, HttpSimpleRequest request, LongAdder counter) { produceMessage(topic, 0, null, request, counter); } @@ -94,7 +94,7 @@ public class HttpMessageClient extends MessageClient { broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null); } - public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) { + public final void broadcastMessage(HttpSimpleRequest request, LongAdder counter) { broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter); } @@ -106,7 +106,7 @@ public class HttpMessageClient extends MessageClient { broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } - public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); } @@ -114,7 +114,7 @@ public class HttpMessageClient extends MessageClient { broadcastMessage(topic, 0, null, request, null); } - public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { + public final void broadcastMessage(String topic, HttpSimpleRequest request, LongAdder counter) { broadcastMessage(topic, 0, null, request, counter); } @@ -153,7 +153,7 @@ public class HttpMessageClient extends MessageClient { return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null); } - public final CompletableFuture> sendMessage(HttpSimpleRequest request, AtomicLong counter) { + public final CompletableFuture> sendMessage(HttpSimpleRequest request, LongAdder counter) { return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter); } @@ -165,7 +165,7 @@ public class HttpMessageClient extends MessageClient { return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } - public final CompletableFuture> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public final CompletableFuture> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); } @@ -173,28 +173,28 @@ public class HttpMessageClient extends MessageClient { return sendMessage(topic, 0, null, request, null); } - public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { + public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, LongAdder counter) { return sendMessage(topic, 0, null, request, counter); } public final CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { - return sendMessage(topic, userid, null, request, (AtomicLong) null); + return sendMessage(topic, userid, null, request, (LongAdder) null); } - public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } - public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); diff --git a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java index e12f337b2..1a54012b6 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java @@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; import org.redkale.annotation.Resource; import org.redkale.boot.Application; @@ -55,7 +55,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { return localClient.sendMessage(topic, userid, groupid, request, counter); } else { @@ -64,7 +64,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { localClient.produceMessage(topic, userid, groupid, request, counter); } else { @@ -73,7 +73,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { mqtpAsync(userid, request); } diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java index 90b716a3e..0f3a7965d 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java @@ -10,7 +10,7 @@ import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; import org.redkale.boot.*; import org.redkale.convert.Convert; @@ -113,7 +113,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { HttpServlet servlet = findHttpServlet(topic); if (servlet == null) { if (logger.isLoggable(Level.FINE)) { @@ -145,7 +145,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { HttpDispatcherServlet ps = dispatcherServlet(); HttpServlet servlet = ps.findServletByTopic(topic); if (servlet == null) { @@ -164,7 +164,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { HttpDispatcherServlet ps = dispatcherServlet(); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpResponse resp = new HttpMessageLocalResponse(req, null); diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index d6745ff1a..55c973a0a 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -7,7 +7,7 @@ package org.redkale.mq; import java.nio.charset.StandardCharsets; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.*; import java.util.logging.Level; import org.redkale.convert.Convert; import org.redkale.convert.json.JsonConvert; @@ -53,7 +53,7 @@ public abstract class MessageClient { return this.respConsumer.shutdown(); } - protected CompletableFuture sendMessage(final MessageRecord message, boolean needresp, AtomicLong counter) { + protected CompletableFuture sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) { CompletableFuture future = new CompletableFuture<>(); boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST); try { @@ -73,9 +73,9 @@ public abstract class MessageClient { if (node.scheduledFuture != null) { node.scheduledFuture.cancel(true); } - AtomicLong ncer = node.getCounter(); + LongAdder ncer = node.getCounter(); if (ncer != null) { - ncer.decrementAndGet(); + ncer.decrement(); } final long cha = now - msg.createTime; if (finest) { @@ -106,7 +106,7 @@ public abstract class MessageClient { message.setRespTopic(respTopic); } if (counter != null) { - counter.incrementAndGet(); + counter.increment(); } getProducer().apply(message); if (needresp) { diff --git a/src/main/java/org/redkale/mq/MessageRespFutureNode.java b/src/main/java/org/redkale/mq/MessageRespFutureNode.java index e4f8202c0..265a2a8f2 100644 --- a/src/main/java/org/redkale/mq/MessageRespFutureNode.java +++ b/src/main/java/org/redkale/mq/MessageRespFutureNode.java @@ -6,7 +6,7 @@ package org.redkale.mq; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.logging.*; /** @@ -25,7 +25,7 @@ public class MessageRespFutureNode implements Runnable { protected final long createTime; - protected final AtomicLong counter; + protected final LongAdder counter; protected final CompletableFuture future; @@ -37,7 +37,7 @@ public class MessageRespFutureNode implements Runnable { protected ScheduledFuture scheduledFuture; - public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap respNodes, AtomicLong counter, CompletableFuture future) { + public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap respNodes, LongAdder counter, CompletableFuture future) { this.logger = logger; this.message = message; this.seqid = message.getSeqid(); @@ -63,7 +63,7 @@ public class MessageRespFutureNode implements Runnable { return createTime; } - public AtomicLong getCounter() { + public LongAdder getCounter() { return counter; } diff --git a/src/main/java/org/redkale/mq/SncpMessageClient.java b/src/main/java/org/redkale/mq/SncpMessageClient.java index 32bf5c580..1f9e2fd9f 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClient.java +++ b/src/main/java/org/redkale/mq/SncpMessageClient.java @@ -6,7 +6,7 @@ package org.redkale.mq; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; /** * @@ -39,7 +39,7 @@ public class SncpMessageClient extends MessageClient { } //只发送消息,不需要响应 - public final void produceMessage(MessageRecord message, AtomicLong counter) { + public final void produceMessage(MessageRecord message, LongAdder counter) { sendMessage(message, false, counter); } @@ -49,7 +49,7 @@ public class SncpMessageClient extends MessageClient { } //发送消息,需要响应 - public final CompletableFuture sendMessage(MessageRecord message, AtomicLong counter) { + public final CompletableFuture sendMessage(MessageRecord message, LongAdder counter) { return sendMessage(message, true, counter); } diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index cfc24b7d9..1739540ab 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -63,4 +63,5 @@ public abstract class AsyncGroup { public abstract AsyncGroup start(); public abstract AsyncGroup close(); + } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 701d197a6..ed9b2b760 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -10,7 +10,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.*; import java.util.logging.*; import org.redkale.util.*; @@ -32,7 +32,7 @@ public class AsyncIOThread extends WorkThread { final Selector selector; //如果有read/write两IOThread,只记readThread - final AtomicInteger connCounter = new AtomicInteger(); + final LongAdder connCounter = new LongAdder(); private final Supplier bufferSupplier; @@ -142,7 +142,7 @@ public class AsyncIOThread extends WorkThread { } public int currConnections() { - return connCounter.get(); + return connCounter.intValue(); } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 9a3436af4..7d9e69312 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -39,7 +39,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { } } this.remoteAddress = addr; - ioReadThread.connCounter.incrementAndGet(); + ioReadThread.connCounter.increment(); } @Override @@ -269,9 +269,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { @Override public final void close() throws IOException { super.close(); - ioReadThread.connCounter.decrementAndGet(); - channel.shutdownInput(); - channel.shutdownOutput(); + ioReadThread.connCounter.decrement(); channel.close(); if (this.connectKey != null) { this.connectKey.cancel();