synchronized优化

This commit is contained in:
redkale
2023-02-01 19:17:32 +08:00
parent ad981835f9
commit 78f969d2cc
8 changed files with 68 additions and 55 deletions

View File

@@ -9,7 +9,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.stream.Stream;
@@ -250,8 +250,8 @@ public class NodeHttpServer extends NodeServer {
ss.add(new AbstractMap.SimpleEntry<>("HttpServlet (type=" + clazz.getName() + ")", mappings));
}
}
final List<AbstractMap.SimpleEntry<String, String[]>> rests = sb == null ? null : new ArrayList<>();
final List<AbstractMap.SimpleEntry<String, String[]>> webss = sb == null ? null : new ArrayList<>();
final CopyOnWriteArrayList<AbstractMap.SimpleEntry<String, String[]>> rests = sb == null ? null : new CopyOnWriteArrayList<>();
final CopyOnWriteArrayList<AbstractMap.SimpleEntry<String, String[]>> webss = sb == null ? null : new CopyOnWriteArrayList<>();
if (rest && serverConf != null) {
final List<Object> restedObjects = new ArrayList<>();
final ReentrantLock restedLock = new ReentrantLock();
@@ -343,8 +343,8 @@ public class NodeHttpServer extends NodeServer {
protected void loadRestServlet(final ClassFilter<? extends WebSocket> webSocketFilter,
final AnyValue restConf, final List<Object> restedObjects,
final ReentrantLock restedLock, final StringBuilder sb,
final List<AbstractMap.SimpleEntry<String, String[]>> rests,
final List<AbstractMap.SimpleEntry<String, String[]>> webss) throws Exception {
final CopyOnWriteArrayList<AbstractMap.SimpleEntry<String, String[]>> rests,
final CopyOnWriteArrayList<AbstractMap.SimpleEntry<String, String[]>> webss) throws Exception {
if (!rest) {
return;
}
@@ -440,9 +440,7 @@ public class NodeHttpServer extends NodeServer {
for (int i = 0; i < mappings.length; i++) {
mappings[i] = prefix2 + mappings[i];
}
synchronized (rests) {
rests.add(new AbstractMap.SimpleEntry<>(Sncp.getResourceType(service).getName() + "#" + name, mappings));
}
rests.add(new AbstractMap.SimpleEntry<>(Sncp.getResourceType(service).getName() + "#" + name, mappings));
}
} finally {
scdl.countDown();
@@ -512,9 +510,7 @@ public class NodeHttpServer extends NodeServer {
for (int i = 0; i < mappings.length; i++) {
mappings[i] = prefix2 + mappings[i];
}
synchronized (webss) {
webss.add(new AbstractMap.SimpleEntry<>(stype.getName() + "#" + rs.name(), mappings));
}
webss.add(new AbstractMap.SimpleEntry<>(stype.getName() + "#" + rs.name(), mappings));
}
}
}

View File

@@ -10,7 +10,7 @@ import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redkale.annotation.Comment;
import java.util.concurrent.locks.ReentrantLock;
import org.redkale.annotation.*;
import org.redkale.boot.Application;
import org.redkale.net.*;
@@ -36,6 +36,8 @@ public class TransportWatchService extends AbstractWatchService {
@Comment("Node节点IP地址已存在")
public static final int RET_TRANSPORT_ADDR_EXISTS = 1606_0003;
protected final ReentrantLock lock = new ReentrantLock();
@Resource
protected Application application;
@@ -63,7 +65,8 @@ public class TransportWatchService extends AbstractWatchService {
if (transportFactory.findGroupName(address) != null) {
return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") is exists");
}
synchronized (this) {
lock.lock();
try {
if (transportFactory.findGroupInfo(group) == null) {
return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")");
}
@@ -87,6 +90,8 @@ public class TransportWatchService extends AbstractWatchService {
}
}
//application.restoreConfig();
} finally {
lock.unlock();
}
return RetResult.success();
}
@@ -102,7 +107,8 @@ public class TransportWatchService extends AbstractWatchService {
if (!group.equals(transportFactory.findGroupName(address))) {
return new RetResult(RET_TRANSPORT_ADDR_ILLEGAL, "InetSocketAddress(addr=" + addr + ", port=" + port + ") not belong to group(" + group + ")");
}
synchronized (this) {
lock.lock();
try {
if (transportFactory.findGroupInfo(group) == null) {
return new RetResult(RET_TRANSPORT_GROUP_NOT_EXISTS, "not found group (" + group + ")");
}
@@ -125,6 +131,8 @@ public class TransportWatchService extends AbstractWatchService {
}
}
//application.restoreConfig();
} finally {
lock.unlock();
}
return RetResult.success();
}

View File

@@ -12,7 +12,7 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -835,25 +835,11 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
* @param excludeColumns 需要排除的字段名
*/
public final void registerIgnoreAll(final Class type, String... excludeColumns) {
Set<String> set = ignoreAlls.get(type);
if (set == null) {
ignoreAlls.put(type, new HashSet<>(Arrays.asList(excludeColumns)));
} else {
synchronized (set) {
set.addAll(Arrays.asList(excludeColumns));
}
}
ignoreAlls.computeIfAbsent(type, t -> new CopyOnWriteArraySet<>()).addAll(Arrays.asList(excludeColumns));
}
public final void registerIgnoreAll(final Class type, Collection<String> excludeColumns) {
Set<String> set = ignoreAlls.get(type);
if (set == null) {
ignoreAlls.put(type, new HashSet<>(excludeColumns));
} else {
synchronized (set) {
set.addAll(new ArrayList(excludeColumns));
}
}
ignoreAlls.computeIfAbsent(type, t -> new CopyOnWriteArraySet<>()).addAll(excludeColumns);
}
public final void register(final Class type, boolean ignore, String... columns) {

View File

@@ -8,6 +8,7 @@ package org.redkale.mq;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
@@ -28,6 +29,8 @@ public abstract class MessageClient {
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
protected final MessageAgent messageAgent;
protected final AtomicLong msgSeqno;
@@ -58,7 +61,8 @@ public abstract class MessageClient {
boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST);
try {
if (this.respConsumer == null) {
synchronized (this) {
lock.lock();
try {
if (this.respConsumerid == null) {
this.respConsumerid = "consumer-" + this.respTopic;
}
@@ -100,6 +104,8 @@ public abstract class MessageClient {
}
this.respConsumer = one;
}
} finally {
lock.unlock();
}
}
if (needresp && (message.getRespTopic() == null || message.getRespTopic().isEmpty())) {

View File

@@ -28,16 +28,10 @@ public class MessageProducers {
}
public MessageProducer getProducer(MessageRecord message) {
if (this.producers.length == 1) return this.producers[0];
int hash = index.incrementAndGet();
if (index.get() > 1000 * producers.length) {
synchronized (index) {
if (index.get() > 1000 * producers.length) {
index.addAndGet(-1000 * producers.length);
}
}
if (this.producers.length == 1) {
return this.producers[0];
}
return producers[hash % producers.length];
return producers[Math.abs(index.incrementAndGet()) % producers.length];
}
public CompletableFuture<Void> apply(MessageRecord message) {

View File

@@ -13,6 +13,7 @@ import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
@@ -35,6 +36,8 @@ public final class Transport {
protected final AtomicInteger seq = new AtomicInteger(-1);
protected final ReentrantLock lock = new ReentrantLock();
protected final TransportFactory factory;
protected final String name; //即<group>的name属性
@@ -88,7 +91,8 @@ public final class Transport {
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
final TransportNode[] oldNodes = this.transportNodes;
synchronized (this) {
lock.lock();
try {
boolean same = false;
if (this.transportNodes != null && addresses != null && this.transportNodes.length == addresses.size()) {
same = true;
@@ -122,6 +126,8 @@ public final class Transport {
}
this.transportNodes = list.toArray(new TransportNode[list.size()]);
}
} finally {
lock.unlock();
}
InetSocketAddress[] rs = new InetSocketAddress[oldNodes.length];
for (int i = 0; i < rs.length; i++) {
@@ -137,7 +143,8 @@ public final class Transport {
if (clientAddress != null && clientAddress.equals(addr)) {
return false;
}
synchronized (this) {
lock.lock();
try {
if (this.transportNodes.length == 0) {
this.transportNodes = new TransportNode[]{new TransportNode(factory.poolMaxConns, addr)};
} else {
@@ -149,6 +156,8 @@ public final class Transport {
this.transportNodes = Utility.append(transportNodes, new TransportNode(factory.poolMaxConns, addr));
}
return true;
} finally {
lock.unlock();
}
}
@@ -156,8 +165,11 @@ public final class Transport {
if (addr == null) {
return false;
}
synchronized (this) {
lock.lock();
try {
this.transportNodes = Utility.remove(transportNodes, new TransportNode(factory.poolMaxConns, addr));
} finally {
lock.unlock();
}
return true;
}

View File

@@ -7,6 +7,7 @@ package org.redkale.net;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility;
@@ -20,6 +21,8 @@ import org.redkale.util.Utility;
*/
public class TransportGroupInfo {
protected final ReentrantLock lock = new ReentrantLock();
protected String name; //地址
protected String protocol; //协议 取值范围: TCP、UDP
@@ -78,11 +81,14 @@ public class TransportGroupInfo {
}
public boolean containsAddress(InetSocketAddress addr) {
synchronized (this) {
lock.lock();
try {
if (this.addresses == null) {
return false;
}
return this.addresses.contains(addr);
} finally {
lock.unlock();
}
}
@@ -90,11 +96,14 @@ public class TransportGroupInfo {
if (addr == null) {
return;
}
synchronized (this) {
lock.lock();
try {
if (this.addresses == null) {
return;
}
this.addresses.remove(addr);
} finally {
lock.unlock();
}
}
@@ -102,11 +111,14 @@ public class TransportGroupInfo {
if (addr == null) {
return;
}
synchronized (this) {
lock.lock();
try {
if (this.addresses == null) {
this.addresses = new HashSet<>();
}
this.addresses.add(addr);
} finally {
lock.unlock();
}
}
@@ -114,11 +126,14 @@ public class TransportGroupInfo {
if (addrs == null) {
return;
}
synchronized (this) {
lock.lock();
try {
if (this.addresses == null) {
this.addresses = new HashSet<>();
}
this.addresses.addAll(addrs);
} finally {
lock.unlock();
}
}

View File

@@ -15,7 +15,7 @@ import org.redkale.net.*;
* 详情见: https://redkale.org
*
* @author zhangjx
*
*
* @since 2.7.0
*/
public class ClientAddress implements java.io.Serializable {
@@ -66,12 +66,8 @@ public class ClientAddress implements java.io.Serializable {
if (addr == null) {
SocketAddress[] addrs = this.addresses;
if (addrs == null) {
synchronized (this) {
if (this.addresses == null) {
this.addresses = createAddressArray(this.weights);
addrs = this.addresses;
}
}
this.addresses = createAddressArray(this.weights);
addrs = this.addresses;
}
addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)];
}