From 78f969d2cc501fdc9e5c725f8f650051e5915e8e Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 1 Feb 2023 19:17:32 +0800 Subject: [PATCH] =?UTF-8?q?synchronized=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/NodeHttpServer.java | 18 ++++++--------- .../boot/watch/TransportWatchService.java | 14 ++++++++--- .../org/redkale/convert/ConvertFactory.java | 20 +++------------- .../java/org/redkale/mq/MessageClient.java | 8 ++++++- .../java/org/redkale/mq/MessageProducers.java | 12 +++------- src/main/java/org/redkale/net/Transport.java | 18 ++++++++++++--- .../org/redkale/net/TransportGroupInfo.java | 23 +++++++++++++++---- .../org/redkale/net/client/ClientAddress.java | 10 +++----- 8 files changed, 68 insertions(+), 55 deletions(-) diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index ae45bd459..f3523bf50 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -9,7 +9,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.*; import java.net.*; import java.util.*; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.stream.Stream; @@ -250,8 +250,8 @@ public class NodeHttpServer extends NodeServer { ss.add(new AbstractMap.SimpleEntry<>("HttpServlet (type=" + clazz.getName() + ")", mappings)); } } - final List> rests = sb == null ? null : new ArrayList<>(); - final List> webss = sb == null ? null : new ArrayList<>(); + final CopyOnWriteArrayList> rests = sb == null ? null : new CopyOnWriteArrayList<>(); + final CopyOnWriteArrayList> webss = sb == null ? null : new CopyOnWriteArrayList<>(); if (rest && serverConf != null) { final List restedObjects = new ArrayList<>(); final ReentrantLock restedLock = new ReentrantLock(); @@ -343,8 +343,8 @@ public class NodeHttpServer extends NodeServer { protected void loadRestServlet(final ClassFilter webSocketFilter, final AnyValue restConf, final List restedObjects, final ReentrantLock restedLock, final StringBuilder sb, - final List> rests, - final List> webss) throws Exception { + final CopyOnWriteArrayList> rests, + final CopyOnWriteArrayList> webss) throws Exception { if (!rest) { return; } @@ -440,9 +440,7 @@ public class NodeHttpServer extends NodeServer { for (int i = 0; i < mappings.length; i++) { mappings[i] = prefix2 + mappings[i]; } - synchronized (rests) { - rests.add(new AbstractMap.SimpleEntry<>(Sncp.getResourceType(service).getName() + "#" + name, mappings)); - } + rests.add(new AbstractMap.SimpleEntry<>(Sncp.getResourceType(service).getName() + "#" + name, mappings)); } } finally { scdl.countDown(); @@ -512,9 +510,7 @@ public class NodeHttpServer extends NodeServer { for (int i = 0; i < mappings.length; i++) { mappings[i] = prefix2 + mappings[i]; } - synchronized (webss) { - webss.add(new AbstractMap.SimpleEntry<>(stype.getName() + "#" + rs.name(), mappings)); - } + webss.add(new AbstractMap.SimpleEntry<>(stype.getName() + "#" + rs.name(), mappings)); } } } diff --git a/src/main/java/org/redkale/boot/watch/TransportWatchService.java b/src/main/java/org/redkale/boot/watch/TransportWatchService.java index 3fde080f8..705daad7d 100644 --- a/src/main/java/org/redkale/boot/watch/TransportWatchService.java +++ b/src/main/java/org/redkale/boot/watch/TransportWatchService.java @@ -10,7 +10,7 @@ import java.net.InetSocketAddress; import java.nio.channels.AsynchronousSocketChannel; import java.util.List; import java.util.concurrent.TimeUnit; -import org.redkale.annotation.Comment; +import java.util.concurrent.locks.ReentrantLock; import org.redkale.annotation.*; import org.redkale.boot.Application; import org.redkale.net.*; @@ -36,6 +36,8 @@ public class TransportWatchService extends AbstractWatchService { @Comment("Node节点IP地址已存在") public static final int RET_TRANSPORT_ADDR_EXISTS = 1606_0003; + protected final ReentrantLock lock = new ReentrantLock(); + @Resource protected Application application; @@ -63,7 +65,8 @@ public class TransportWatchService extends AbstractWatchService { if (transportFactory.findGroupName(address) != null) { return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is exists"); } - synchronized (this) { + lock.lock(); + try { if (transportFactory.findGroupInfo(group) == null) { return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")"); } @@ -87,6 +90,8 @@ public class TransportWatchService extends AbstractWatchService { } } //application.restoreConfig(); + } finally { + lock.unlock(); } return RetResult.success(); } @@ -102,7 +107,8 @@ public class TransportWatchService extends AbstractWatchService { if (!group.equals(transportFactory.findGroupName(address))) { return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") not belong to group(" + group + ")"); } - synchronized (this) { + lock.lock(); + try { if (transportFactory.findGroupInfo(group) == null) { return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")"); } @@ -125,6 +131,8 @@ public class TransportWatchService extends AbstractWatchService { } } //application.restoreConfig(); + } finally { + lock.unlock(); } return RetResult.success(); } diff --git a/src/main/java/org/redkale/convert/ConvertFactory.java b/src/main/java/org/redkale/convert/ConvertFactory.java index bd97f2667..880320f24 100644 --- a/src/main/java/org/redkale/convert/ConvertFactory.java +++ b/src/main/java/org/redkale/convert/ConvertFactory.java @@ -12,7 +12,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -835,25 +835,11 @@ public abstract class ConvertFactory { * @param excludeColumns 需要排除的字段名 */ public final void registerIgnoreAll(final Class type, String... excludeColumns) { - Set set = ignoreAlls.get(type); - if (set == null) { - ignoreAlls.put(type, new HashSet<>(Arrays.asList(excludeColumns))); - } else { - synchronized (set) { - set.addAll(Arrays.asList(excludeColumns)); - } - } + ignoreAlls.computeIfAbsent(type, t -> new CopyOnWriteArraySet<>()).addAll(Arrays.asList(excludeColumns)); } public final void registerIgnoreAll(final Class type, Collection excludeColumns) { - Set set = ignoreAlls.get(type); - if (set == null) { - ignoreAlls.put(type, new HashSet<>(excludeColumns)); - } else { - synchronized (set) { - set.addAll(new ArrayList(excludeColumns)); - } - } + ignoreAlls.computeIfAbsent(type, t -> new CopyOnWriteArraySet<>()).addAll(excludeColumns); } public final void register(final Class type, boolean ignore, String... columns) { diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 55c973a0a..3647c683f 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -8,6 +8,7 @@ package org.redkale.mq; import java.nio.charset.StandardCharsets; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import org.redkale.convert.Convert; import org.redkale.convert.json.JsonConvert; @@ -28,6 +29,8 @@ public abstract class MessageClient { protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + protected final MessageAgent messageAgent; protected final AtomicLong msgSeqno; @@ -58,7 +61,8 @@ public abstract class MessageClient { boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST); try { if (this.respConsumer == null) { - synchronized (this) { + lock.lock(); + try { if (this.respConsumerid == null) { this.respConsumerid = "consumer-" + this.respTopic; } @@ -100,6 +104,8 @@ public abstract class MessageClient { } this.respConsumer = one; } + } finally { + lock.unlock(); } } if (needresp && (message.getRespTopic() == null || message.getRespTopic().isEmpty())) { diff --git a/src/main/java/org/redkale/mq/MessageProducers.java b/src/main/java/org/redkale/mq/MessageProducers.java index 91a0507c8..764979730 100644 --- a/src/main/java/org/redkale/mq/MessageProducers.java +++ b/src/main/java/org/redkale/mq/MessageProducers.java @@ -28,16 +28,10 @@ public class MessageProducers { } public MessageProducer getProducer(MessageRecord message) { - if (this.producers.length == 1) return this.producers[0]; - int hash = index.incrementAndGet(); - if (index.get() > 1000 * producers.length) { - synchronized (index) { - if (index.get() > 1000 * producers.length) { - index.addAndGet(-1000 * producers.length); - } - } + if (this.producers.length == 1) { + return this.producers[0]; } - return producers[hash % producers.length]; + return producers[Math.abs(index.incrementAndGet()) % producers.length]; } public CompletableFuture apply(MessageRecord message) { diff --git a/src/main/java/org/redkale/net/Transport.java b/src/main/java/org/redkale/net/Transport.java index 932797808..e3d286fb3 100644 --- a/src/main/java/org/redkale/net/Transport.java +++ b/src/main/java/org/redkale/net/Transport.java @@ -13,6 +13,7 @@ import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.logging.Level; import javax.net.ssl.SSLContext; @@ -35,6 +36,8 @@ public final class Transport { protected final AtomicInteger seq = new AtomicInteger(-1); + protected final ReentrantLock lock = new ReentrantLock(); + protected final TransportFactory factory; protected final String name; //即的name属性 @@ -88,7 +91,8 @@ public final class Transport { public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { final TransportNode[] oldNodes = this.transportNodes; - synchronized (this) { + lock.lock(); + try { boolean same = false; if (this.transportNodes != null && addresses != null && this.transportNodes.length == addresses.size()) { same = true; @@ -122,6 +126,8 @@ public final class Transport { } this.transportNodes = list.toArray(new TransportNode[list.size()]); } + } finally { + lock.unlock(); } InetSocketAddress[] rs = new InetSocketAddress[oldNodes.length]; for (int i = 0; i < rs.length; i++) { @@ -137,7 +143,8 @@ public final class Transport { if (clientAddress != null && clientAddress.equals(addr)) { return false; } - synchronized (this) { + lock.lock(); + try { if (this.transportNodes.length == 0) { this.transportNodes = new TransportNode[]{new TransportNode(factory.poolMaxConns, addr)}; } else { @@ -149,6 +156,8 @@ public final class Transport { this.transportNodes = Utility.append(transportNodes, new TransportNode(factory.poolMaxConns, addr)); } return true; + } finally { + lock.unlock(); } } @@ -156,8 +165,11 @@ public final class Transport { if (addr == null) { return false; } - synchronized (this) { + lock.lock(); + try { this.transportNodes = Utility.remove(transportNodes, new TransportNode(factory.poolMaxConns, addr)); + } finally { + lock.unlock(); } return true; } diff --git a/src/main/java/org/redkale/net/TransportGroupInfo.java b/src/main/java/org/redkale/net/TransportGroupInfo.java index db7a45f0f..959f6b138 100644 --- a/src/main/java/org/redkale/net/TransportGroupInfo.java +++ b/src/main/java/org/redkale/net/TransportGroupInfo.java @@ -7,6 +7,7 @@ package org.redkale.net; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.locks.ReentrantLock; import org.redkale.convert.json.JsonConvert; import org.redkale.util.Utility; @@ -20,6 +21,8 @@ import org.redkale.util.Utility; */ public class TransportGroupInfo { + protected final ReentrantLock lock = new ReentrantLock(); + protected String name; //地址 protected String protocol; //协议 取值范围: TCP、UDP @@ -78,11 +81,14 @@ public class TransportGroupInfo { } public boolean containsAddress(InetSocketAddress addr) { - synchronized (this) { + lock.lock(); + try { if (this.addresses == null) { return false; } return this.addresses.contains(addr); + } finally { + lock.unlock(); } } @@ -90,11 +96,14 @@ public class TransportGroupInfo { if (addr == null) { return; } - synchronized (this) { + lock.lock(); + try { if (this.addresses == null) { return; } this.addresses.remove(addr); + } finally { + lock.unlock(); } } @@ -102,11 +111,14 @@ public class TransportGroupInfo { if (addr == null) { return; } - synchronized (this) { + lock.lock(); + try { if (this.addresses == null) { this.addresses = new HashSet<>(); } this.addresses.add(addr); + } finally { + lock.unlock(); } } @@ -114,11 +126,14 @@ public class TransportGroupInfo { if (addrs == null) { return; } - synchronized (this) { + lock.lock(); + try { if (this.addresses == null) { this.addresses = new HashSet<>(); } this.addresses.addAll(addrs); + } finally { + lock.unlock(); } } diff --git a/src/main/java/org/redkale/net/client/ClientAddress.java b/src/main/java/org/redkale/net/client/ClientAddress.java index fa4c5538f..06e1d68c3 100644 --- a/src/main/java/org/redkale/net/client/ClientAddress.java +++ b/src/main/java/org/redkale/net/client/ClientAddress.java @@ -15,7 +15,7 @@ import org.redkale.net.*; * 详情见: https://redkale.org * * @author zhangjx - * + * * @since 2.7.0 */ public class ClientAddress implements java.io.Serializable { @@ -66,12 +66,8 @@ public class ClientAddress implements java.io.Serializable { if (addr == null) { SocketAddress[] addrs = this.addresses; if (addrs == null) { - synchronized (this) { - if (this.addresses == null) { - this.addresses = createAddressArray(this.weights); - addrs = this.addresses; - } - } + this.addresses = createAddressArray(this.weights); + addrs = this.addresses; } addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)]; }