diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 1c8547c87..31ccc12eb 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -16,6 +16,7 @@ import java.nio.file.Path; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.logging.*; import javax.net.ssl.SSLContext; @@ -148,9 +149,13 @@ public final class Application { //CacheSource 资源 final List cacheSources = new CopyOnWriteArrayList<>(); + private final ReentrantLock cacheSourceLock = new ReentrantLock(); + //DataSource 资源 final List dataSources = new CopyOnWriteArrayList<>(); + private final ReentrantLock dataSourceLock = new ReentrantLock(); + //NodeServer 资源, 顺序必须是sncps, others, watchs final List servers = new CopyOnWriteArrayList<>(); @@ -167,6 +172,8 @@ public final class Application { //只存放不以system.property.、mimetype.property.、redkale.开头的配置项 private final Properties envProperties = new Properties(); + private final ReentrantLock envPropertiesLock = new ReentrantLock(); + //配置信息,只读版Properties private final Environment environment; @@ -1223,7 +1230,8 @@ public final class Application { } CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { - synchronized (cacheSources) { + cacheSourceLock.lock(); + try { long st = System.currentTimeMillis(); CacheSource old = resourceFactory.find(sourceName, CacheSource.class); if (old != null) { @@ -1262,11 +1270,14 @@ public final class Application { logger.log(Level.SEVERE, "load application CaheSource error: " + sourceConf, e); } return null; + } finally { + cacheSourceLock.unlock(); } } DataSource loadDataSource(final String sourceName, boolean autoMemory) { - synchronized (dataSources) { + dataSourceLock.lock(); + try { DataSource old = resourceFactory.find(sourceName, DataSource.class); if (old != null) { return old; @@ -1313,6 +1324,8 @@ public final class Application { logger.log(Level.SEVERE, "load application DataSource error: " + sourceConf, e); } return null; + } finally { + dataSourceLock.unlock(); } } @@ -1647,6 +1660,7 @@ public final class Application { this.servicecdl = new CountDownLatch(serconfs.size()); CountDownLatch sercdl = new CountDownLatch(serconfs.size()); final AtomicBoolean inited = new AtomicBoolean(false); + final ReentrantLock nodeLock = new ReentrantLock(); final Map> nodeClasses = new HashMap<>(); for (final AnyValue serconf : serconfs) { Thread thread = new Thread() { @@ -1677,7 +1691,8 @@ public final class Application { server = new NodeHttpServer(Application.this, serconf); } else { if (!inited.get()) { - synchronized (nodeClasses) { + nodeLock.lock(); + try { if (!inited.getAndSet(true)) { //加载自定义的协议,如:SOCKS ClassFilter profilter = new ClassFilter(classLoader, NodeProtocol.class, NodeServer.class, (Class[]) null); ClassFilter.Loader.load(home, classLoader, ((excludelibs != null ? (excludelibs + ";") : "") + serconf.getValue("excludelibs", "")).split(";"), profilter); @@ -1696,6 +1711,8 @@ public final class Application { nodeClasses.put(p, type); } } + } finally { + nodeLock.unlock(); } } Class nodeClass = nodeClasses.get(protocol); @@ -1955,7 +1972,8 @@ public final class Application { if (events == null || events.isEmpty()) { return; } - synchronized (envProperties) { + envPropertiesLock.lock(); + try { Properties envRegisterProps = new Properties(); Set envRemovedKeys = new HashSet<>(); Properties envChangedProps = new Properties(); @@ -2349,6 +2367,8 @@ public final class Application { clusterRemovedKeys.forEach(k -> this.clusterProperties.remove(k)); this.clusterProperties.putAll(clusterChangedProps); } + } finally { + envPropertiesLock.unlock(); } } diff --git a/src/main/java/org/redkale/boot/LoggingSearchHandler.java b/src/main/java/org/redkale/boot/LoggingSearchHandler.java index 1c261a6b5..67376385f 100644 --- a/src/main/java/org/redkale/boot/LoggingSearchHandler.java +++ b/src/main/java/org/redkale/boot/LoggingSearchHandler.java @@ -79,14 +79,18 @@ public class LoggingSearchHandler extends LoggingBaseHandler { int size = batchSize; while (--size > 0) { log = logqueue.poll(); - if (log == null) break; + if (log == null) { + break; + } logList.add(log); } source.insert(logList); } } catch (Exception e) { ErrorManager err = getErrorManager(); - if (err != null) err.error(null, e, ErrorManager.WRITE_FAILURE); + if (err != null) { + err.error(null, e, ErrorManager.WRITE_FAILURE); + } } finally { logList.clear(); } @@ -96,31 +100,49 @@ public class LoggingSearchHandler extends LoggingBaseHandler { }.start(); } - private synchronized void initSource() { - if (retryCount.get() < 1) return; + private void initSource() { + if (retryCount.get() < 1) { + return; + } try { Utility.sleep(3000); //如果SearchSource自身在打印日志,需要停顿一点时间让SearchSource初始化完成 if (application == null) { Utility.sleep(3000); } - if (application == null) return; + if (application == null) { + return; + } this.source = (SearchSource) application.loadDataSource(sourceResourceName, false); - if (retryCount.get() == 1 && this.source == null) System.err.println("ERROR: not load logging.source(" + sourceResourceName + ")"); + if (retryCount.get() == 1 && this.source == null) { + System.err.println("ERROR: not load logging.source(" + sourceResourceName + ")"); + } } catch (Exception t) { ErrorManager err = getErrorManager(); - if (err != null) err.error(null, t, ErrorManager.WRITE_FAILURE); + if (err != null) { + err.error(null, t, ErrorManager.WRITE_FAILURE); + } } finally { retryCount.decrementAndGet(); } } private static boolean checkTagName(String name) { //只能是字母、数字、短横、点、%、$和下划线 - if (name.isEmpty()) return false; + if (name.isEmpty()) { + return false; + } for (char ch : name.toCharArray()) { - if (ch >= '0' && ch <= '9') continue; - if (ch >= 'a' && ch <= 'z') continue; - if (ch >= 'A' && ch <= 'Z') continue; - if (ch == '_' || ch == '-' || ch == '%' || ch == '$' || ch == '.') continue; + if (ch >= '0' && ch <= '9') { + continue; + } + if (ch >= 'a' && ch <= 'z') { + continue; + } + if (ch >= 'A' && ch <= 'Z') { + continue; + } + if (ch == '_' || ch == '-' || ch == '%' || ch == '$' || ch == '.') { + continue; + } return false; } return true; @@ -135,7 +157,9 @@ public class LoggingSearchHandler extends LoggingBaseHandler { } String tagstr = manager.getProperty(cname + ".tag"); if (tagstr != null && !tagstr.isEmpty()) { - if (!checkTagName(tagstr.replaceAll("\\$\\{.+\\}", ""))) throw new RuntimeException("found illegal logging.property " + cname + ".tag = " + tagstr); + if (!checkTagName(tagstr.replaceAll("\\$\\{.+\\}", ""))) { + throw new RuntimeException("found illegal logging.property " + cname + ".tag = " + tagstr); + } this.tag = tagstr.replace("${" + RESNAME_APP_NAME + "}", System.getProperty(RESNAME_APP_NAME, "")); if (this.tag.contains("%")) { this.tagDateFormat = this.tag; @@ -169,11 +193,15 @@ public class LoggingSearchHandler extends LoggingBaseHandler { } } catch (Exception e) { } - if (getFormatter() == null) setFormatter(new SimpleFormatter()); + if (getFormatter() == null) { + setFormatter(new SimpleFormatter()); + } String encodingstr = manager.getProperty(cname + ".encoding"); try { - if (encodingstr != null) setEncoding(encodingstr); + if (encodingstr != null) { + setEncoding(encodingstr); + } } catch (Exception e) { } @@ -188,12 +216,18 @@ public class LoggingSearchHandler extends LoggingBaseHandler { @Override public void publish(LogRecord log) { - if (!isLoggable(log)) return; - if (denyRegx != null && denyRegx.matcher(log.getMessage()).find()) return; + if (!isLoggable(log)) { + return; + } + if (denyRegx != null && denyRegx.matcher(log.getMessage()).find()) { + return; + } if (log.getSourceClassName() != null) { StackTraceElement[] ses = new Throwable().getStackTrace(); for (int i = 2; i < ses.length; i++) { - if (ses[i].getClassName().startsWith("java.util.logging")) continue; + if (ses[i].getClassName().startsWith("java.util.logging")) { + continue; + } log.setSourceClassName(ses[i].getClassName()); log.setSourceMethodName(ses[i].getMethodName()); break; diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index 23993cb97..ae45bd459 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -10,6 +10,7 @@ import java.lang.reflect.*; import java.net.*; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.stream.Stream; import org.redkale.annotation.*; @@ -130,7 +131,8 @@ public class NodeHttpServer extends NodeServer { if (loader != null) { nodeService = (Service) loader.load(sncpResFactory, srcResourceName, srcObj, resourceName, field, attachment); } - synchronized (regFactory) { + regFactory.lock(); + try { if (nodeService == null) { nodeService = (Service) rf.find(resourceName, WebSocketNode.class); } @@ -156,6 +158,8 @@ public class NodeHttpServer extends NodeServer { field.set(srcObj, nodeService); logger.fine("Load Service " + nodeService); return nodeService; + } finally { + regFactory.unlock(); } } catch (Exception e) { logger.log(Level.SEVERE, "WebSocketNode inject error", e); @@ -250,8 +254,9 @@ public class NodeHttpServer extends NodeServer { final List> webss = sb == null ? null : new ArrayList<>(); if (rest && serverConf != null) { final List restedObjects = new ArrayList<>(); + final ReentrantLock restedLock = new ReentrantLock(); for (AnyValue restConf : serverConf.getAnyValues("rest")) { - loadRestServlet(webSocketFilter, restConf, restedObjects, sb, rests, webss); + loadRestServlet(webSocketFilter, restConf, restedObjects, restedLock, sb, rests, webss); } } int max = 0; @@ -335,8 +340,11 @@ public class NodeHttpServer extends NodeServer { } @SuppressWarnings("unchecked") - protected void loadRestServlet(final ClassFilter webSocketFilter, final AnyValue restConf, final List restedObjects, final StringBuilder sb, - final List> rests, final List> webss) throws Exception { + protected void loadRestServlet(final ClassFilter webSocketFilter, + final AnyValue restConf, final List restedObjects, + final ReentrantLock restedLock, final StringBuilder sb, + final List> rests, + final List> webss) throws Exception { if (!rest) { return; } @@ -402,12 +410,15 @@ public class NodeHttpServer extends NodeServer { if (!restFilter.accept(stypename)) { return; } - synchronized (restedObjects) { + restedLock.lock(); + try { if (restedObjects.contains(service)) { logger.log(Level.WARNING, stype.getName() + " repeat create rest servlet, so ignore"); return; } restedObjects.add(service); //避免重复创建Rest对象 + } finally { + restedLock.unlock(); } HttpServlet servlet = httpServer.addRestServlet(serverClassLoader, service, userType, baseServletType, prefix); if (servlet == null) { diff --git a/src/main/java/org/redkale/convert/ArrayDecoder.java b/src/main/java/org/redkale/convert/ArrayDecoder.java index 1437e4ac1..70191f9e6 100644 --- a/src/main/java/org/redkale/convert/ArrayDecoder.java +++ b/src/main/java/org/redkale/convert/ArrayDecoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import java.util.function.IntFunction; import org.redkale.util.Creator; @@ -36,7 +37,9 @@ public class ArrayDecoder implements Decodeable { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public ArrayDecoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -59,8 +62,11 @@ public class ArrayDecoder implements Decodeable { this.componentArrayFunction = Creator.arrayFunction(this.componentClass); } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -83,12 +89,12 @@ public class ArrayDecoder implements Decodeable { } if (this.componentDecoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/ArrayEncoder.java b/src/main/java/org/redkale/convert/ArrayEncoder.java index 7c618da5f..a8fdca3f9 100644 --- a/src/main/java/org/redkale/convert/ArrayEncoder.java +++ b/src/main/java/org/redkale/convert/ArrayEncoder.java @@ -6,6 +6,7 @@ package org.redkale.convert; import java.lang.reflect.*; +import java.util.concurrent.locks.*; /** * 数组的序列化操作类
@@ -33,7 +34,9 @@ public class ArrayEncoder implements Encodeable { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public ArrayEncoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -55,8 +58,11 @@ public class ArrayEncoder implements Encodeable { this.subTypeFinal = (this.componentType instanceof Class) && Modifier.isFinal(((Class) this.componentType).getModifiers()); } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -80,12 +86,12 @@ public class ArrayEncoder implements Encodeable { Encodeable itemEncoder = this.componentEncoder; if (itemEncoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } itemEncoder = this.componentEncoder; } diff --git a/src/main/java/org/redkale/convert/CollectionDecoder.java b/src/main/java/org/redkale/convert/CollectionDecoder.java index 86eba87f2..d190d5544 100644 --- a/src/main/java/org/redkale/convert/CollectionDecoder.java +++ b/src/main/java/org/redkale/convert/CollectionDecoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import org.redkale.util.Creator; /** @@ -32,7 +33,9 @@ public class CollectionDecoder implements Decodeable> { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public CollectionDecoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -53,8 +56,11 @@ public class CollectionDecoder implements Decodeable> { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -88,12 +94,12 @@ public class CollectionDecoder implements Decodeable> { } if (this.componentDecoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/CollectionEncoder.java b/src/main/java/org/redkale/convert/CollectionEncoder.java index e70ed8955..20bce50d4 100644 --- a/src/main/java/org/redkale/convert/CollectionEncoder.java +++ b/src/main/java/org/redkale/convert/CollectionEncoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.Collection; +import java.util.concurrent.locks.*; /** * Collection的序列化操作类
@@ -27,7 +28,9 @@ public class CollectionEncoder implements Encodeable> { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public CollectionEncoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -44,8 +47,11 @@ public class CollectionEncoder implements Encodeable> { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -67,12 +73,12 @@ public class CollectionEncoder implements Encodeable> { } if (this.componentEncoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/ConvertFactory.java b/src/main/java/org/redkale/convert/ConvertFactory.java index b8338adf7..bd97f2667 100644 --- a/src/main/java/org/redkale/convert/ConvertFactory.java +++ b/src/main/java/org/redkale/convert/ConvertFactory.java @@ -14,6 +14,7 @@ import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.stream.*; import org.redkale.annotation.ConstructorParameters; @@ -64,6 +65,8 @@ public abstract class ConvertFactory { final Set ignoreMapColumns = new HashSet(); + final ReentrantLock ignoreMapColumnLock = new ReentrantLock(); + //key:需要屏蔽的字段;value:排除的字段名 private final ConcurrentHashMap> ignoreAlls = new ConcurrentHashMap(); @@ -222,18 +225,15 @@ public abstract class ConvertFactory { return defProtobufConvert; } } - synchronized (loaderInited) { - if (!loaderInited.get()) { - Iterator it = ServiceLoader.load(ConvertProvider.class).iterator(); - RedkaleClassLoader.putServiceLoader(ConvertProvider.class); - while (it.hasNext()) { - ConvertProvider cl = it.next(); - RedkaleClassLoader.putReflectionPublicConstructors(cl.getClass(), cl.getClass().getName()); - if (cl.type() == ConvertType.PROTOBUF) { - defProtobufConvert = cl.convert(); - } + if (loaderInited.compareAndSet(false, true)) { + Iterator it = ServiceLoader.load(ConvertProvider.class).iterator(); + RedkaleClassLoader.putServiceLoader(ConvertProvider.class); + while (it.hasNext()) { + ConvertProvider cl = it.next(); + RedkaleClassLoader.putReflectionPublicConstructors(cl.getClass(), cl.getClass().getName()); + if (cl.type() == ConvertType.PROTOBUF) { + defProtobufConvert = cl.convert(); } - loaderInited.set(true); } } return type == ConvertType.PROTOBUF ? defProtobufConvert : null; @@ -858,7 +858,8 @@ public abstract class ConvertFactory { public final void register(final Class type, boolean ignore, String... columns) { if (type == Map.class) { - synchronized (ignoreMapColumns) { + ignoreMapColumnLock.lock(); + try { if (ignore) { for (String column : columns) { ignoreMapColumns.add(column); @@ -868,6 +869,8 @@ public abstract class ConvertFactory { ignoreMapColumns.remove(column); } } + } finally { + ignoreMapColumnLock.unlock(); } return; } @@ -878,7 +881,8 @@ public abstract class ConvertFactory { public final void register(final Class type, boolean ignore, Collection columns) { if (type == Map.class) { - synchronized (ignoreMapColumns) { + ignoreMapColumnLock.lock(); + try { if (ignore) { for (String column : columns) { ignoreMapColumns.add(column); @@ -888,6 +892,8 @@ public abstract class ConvertFactory { ignoreMapColumns.remove(column); } } + } finally { + ignoreMapColumnLock.unlock(); } return; } diff --git a/src/main/java/org/redkale/convert/MapDecoder.java b/src/main/java/org/redkale/convert/MapDecoder.java index 07ecd1e51..b40c3ef2d 100644 --- a/src/main/java/org/redkale/convert/MapDecoder.java +++ b/src/main/java/org/redkale/convert/MapDecoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import org.redkale.util.Creator; /** @@ -36,7 +37,9 @@ public class MapDecoder implements Decodeable> { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public MapDecoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -67,8 +70,11 @@ public class MapDecoder implements Decodeable> { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -95,12 +101,12 @@ public class MapDecoder implements Decodeable> { public Map convertFrom(Reader in, DeMember member) { if (this.keyDecoder == null || this.valueDecoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/MapEncoder.java b/src/main/java/org/redkale/convert/MapEncoder.java index d0aaaf420..5a04ae97a 100644 --- a/src/main/java/org/redkale/convert/MapEncoder.java +++ b/src/main/java/org/redkale/convert/MapEncoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import java.util.function.BiFunction; /** @@ -30,7 +31,9 @@ public class MapEncoder implements Encodeable> { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); protected final Set ignoreMapColumns; @@ -45,13 +48,19 @@ public class MapEncoder implements Encodeable> { this.keyEncoder = factory.getAnyEncoder(); this.valueEncoder = factory.getAnyEncoder(); } - synchronized (factory.ignoreMapColumns) { + factory.ignoreMapColumnLock.lock(); + try { this.ignoreMapColumns = factory.ignoreMapColumns.isEmpty() ? null : new HashSet<>(factory.ignoreMapColumns); + } finally { + factory.ignoreMapColumnLock.unlock(); } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -70,12 +79,12 @@ public class MapEncoder implements Encodeable> { if (this.keyEncoder == null || this.valueEncoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/ObjectDecoder.java b/src/main/java/org/redkale/convert/ObjectDecoder.java index 31023ce04..2498e991f 100644 --- a/src/main/java/org/redkale/convert/ObjectDecoder.java +++ b/src/main/java/org/redkale/convert/ObjectDecoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import org.redkale.convert.ext.StringSimpledCoder; import org.redkale.util.*; @@ -41,7 +42,9 @@ public class ObjectDecoder implements Decodeable { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); protected ObjectDecoder(Type type) { this.type = ((type instanceof Class) && ((Class) type).isInterface()) ? Object.class : type; @@ -295,8 +298,11 @@ public class ObjectDecoder implements Decodeable { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -319,12 +325,12 @@ public class ObjectDecoder implements Decodeable { return (T) factory.loadDecoder(factory.getEntityAlias(clazz)).convertFrom(objin); } if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } if (this.creator == null) { diff --git a/src/main/java/org/redkale/convert/ObjectEncoder.java b/src/main/java/org/redkale/convert/ObjectEncoder.java index d9e7d74a5..2ea97c22e 100644 --- a/src/main/java/org/redkale/convert/ObjectEncoder.java +++ b/src/main/java/org/redkale/convert/ObjectEncoder.java @@ -7,6 +7,7 @@ package org.redkale.convert; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import org.redkale.annotation.ConstructorParameters; import org.redkale.convert.ext.StringSimpledCoder; import org.redkale.util.*; @@ -36,7 +37,9 @@ public class ObjectEncoder implements Encodeable { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); protected ObjectEncoder(Type type) { this.type = type; @@ -252,8 +255,11 @@ public class ObjectEncoder implements Encodeable { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -265,12 +271,12 @@ public class ObjectEncoder implements Encodeable { return; } if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } if (value.getClass() != this.typeClass && !this.type.equals(out.specify())) { diff --git a/src/main/java/org/redkale/convert/OptionalCoder.java b/src/main/java/org/redkale/convert/OptionalCoder.java index 13f5d51f0..308133e53 100644 --- a/src/main/java/org/redkale/convert/OptionalCoder.java +++ b/src/main/java/org/redkale/convert/OptionalCoder.java @@ -6,7 +6,8 @@ package org.redkale.convert; import java.lang.reflect.*; -import java.util.*; +import java.util.Optional; +import java.util.concurrent.locks.*; /** * Optional 的SimpledCoder实现 @@ -30,7 +31,9 @@ public class OptionalCoder extends Simple protected volatile boolean inited = false; - private final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); @SuppressWarnings("unchecked") public OptionalCoder(final ConvertFactory factory, final Type type) { @@ -61,8 +64,11 @@ public class OptionalCoder extends Simple } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -75,12 +81,12 @@ public class OptionalCoder extends Simple } if (this.encoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } @@ -91,12 +97,12 @@ public class OptionalCoder extends Simple public Optional convertFrom(R in) { if (this.decoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/StreamDecoder.java b/src/main/java/org/redkale/convert/StreamDecoder.java index bc1629883..f9e0ae542 100644 --- a/src/main/java/org/redkale/convert/StreamDecoder.java +++ b/src/main/java/org/redkale/convert/StreamDecoder.java @@ -5,10 +5,9 @@ */ package org.redkale.convert; -import org.redkale.util.Creator; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; +import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.locks.*; import java.util.stream.Stream; /** @@ -32,7 +31,9 @@ public class StreamDecoder implements Decodeable> { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public StreamDecoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -47,8 +48,11 @@ public class StreamDecoder implements Decodeable> { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -69,12 +73,12 @@ public class StreamDecoder implements Decodeable> { } if (this.componentDecoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/convert/StreamEncoder.java b/src/main/java/org/redkale/convert/StreamEncoder.java index b2c707ea8..663c47bf5 100644 --- a/src/main/java/org/redkale/convert/StreamEncoder.java +++ b/src/main/java/org/redkale/convert/StreamEncoder.java @@ -6,6 +6,7 @@ package org.redkale.convert; import java.lang.reflect.*; +import java.util.concurrent.locks.*; import java.util.stream.Stream; /** @@ -27,7 +28,9 @@ public class StreamEncoder implements Encodeable> { protected volatile boolean inited = false; - protected final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition condition = lock.newCondition(); public StreamEncoder(final ConvertFactory factory, final Type type) { this.type = type; @@ -44,8 +47,11 @@ public class StreamEncoder implements Encodeable> { } } finally { inited = true; - synchronized (lock) { - lock.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } } @@ -68,12 +74,12 @@ public class StreamEncoder implements Encodeable> { } if (this.componentEncoder == null) { if (!this.inited) { - synchronized (lock) { - try { - lock.wait(); - } catch (Exception e) { - e.printStackTrace(); - } + lock.lock(); + try { + condition.await(); + } catch (Exception e) { + } finally { + lock.unlock(); } } } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index ce851a884..39debe132 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -8,6 +8,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.*; import java.util.stream.Collectors; import org.redkale.annotation.AutoLoad; @@ -46,9 +47,13 @@ public abstract class MessageAgent implements Resourcable { protected MessageProducers sncpProducer; - protected final Object httpProducerLock = new Object(); + protected final ReentrantLock httpProducerLock = new ReentrantLock(); - protected final Object sncpProducerLock = new Object(); + protected final ReentrantLock sncpProducerLock = new ReentrantLock(); + + protected final ReentrantLock httpNodesLock = new ReentrantLock(); + + protected final ReentrantLock sncpNodesLock = new ReentrantLock(); protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); @@ -222,7 +227,8 @@ public abstract class MessageAgent implements Resourcable { //获取指定topic的生产处理器 public MessageProducers getSncpProducer() { if (this.sncpProducer == null) { - synchronized (sncpProducerLock) { + sncpProducerLock.lock(); + try { if (this.sncpProducer == null) { long s = System.currentTimeMillis(); MessageProducer[] producers = new MessageProducer[producerCount]; @@ -237,6 +243,8 @@ public abstract class MessageAgent implements Resourcable { } this.sncpProducer = new MessageProducers(producers); } + } finally { + sncpProducerLock.unlock(); } } return this.sncpProducer; @@ -244,7 +252,8 @@ public abstract class MessageAgent implements Resourcable { public MessageProducers getHttpProducer() { if (this.httpProducer == null) { - synchronized (httpProducerLock) { + httpProducerLock.lock(); + try { if (this.httpProducer == null) { long s = System.currentTimeMillis(); MessageProducer[] producers = new MessageProducer[producerCount]; @@ -259,6 +268,8 @@ public abstract class MessageAgent implements Resourcable { } this.httpProducer = new MessageProducers(producers); } + } finally { + httpProducerLock.unlock(); } } return this.httpProducer; @@ -282,7 +293,7 @@ public abstract class MessageAgent implements Resourcable { //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String[] topics, String group, MessageProcessor processor); - public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { + public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) { return; @@ -299,14 +310,19 @@ public abstract class MessageAgent implements Resourcable { } String[] topics = generateHttpReqTopics(service); String consumerid = generateHttpConsumerid(topics, service); - if (messageNodes.containsKey(consumerid)) { - throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); + httpNodesLock.lock(); + try { + if (messageNodes.containsKey(consumerid)) { + throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); + } + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpProducer(), ns, service, servlet); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); + } finally { + httpNodesLock.unlock(); } - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); } - public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { + public final void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) { return; @@ -317,11 +333,16 @@ public abstract class MessageAgent implements Resourcable { } String topic = generateSncpReqTopic(service); String consumerid = generateSncpConsumerid(topic, service); - if (messageNodes.containsKey(consumerid)) { - throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); + sncpNodesLock.lock(); + try { + if (messageNodes.containsKey(consumerid)) { + throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); + } + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpProducer(), ns, service, servlet); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); + } finally { + sncpNodesLock.unlock(); } - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); } //格式: sncp.req.user diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index bf50f40e3..a46ffbba9 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.*; @@ -58,7 +59,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl private Consumer writeBufferConsumer; - private final Object pipelineLock = new Object(); + private final ReentrantLock pipelineLock = new ReentrantLock(); private ByteBufferWriter pipelineWriter; @@ -519,7 +520,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl //返回pipelineCount个数数据是否全部写入完毕 public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) { - synchronized (pipelineLock) { + pipelineLock.lock(); + try { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { writer = ByteBufferWriter.create(getWriteBufferSupplier()); @@ -547,6 +549,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } return false; } + } finally { + pipelineLock.unlock(); } } @@ -557,7 +561,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl //返回pipelineCount个数数据是否全部写入完毕 public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) { - synchronized (pipelineLock) { + pipelineLock.lock(); + try { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { writer = ByteBufferWriter.create(getWriteBufferSupplier()); @@ -585,6 +590,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } return false; } + } finally { + pipelineLock.unlock(); } } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index dc63b379b..97c88b532 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -31,10 +31,10 @@ public class AsyncIOGroup extends AsyncGroup { private boolean started; - private boolean closed; - private boolean skipClose; + private final AtomicBoolean closed = new AtomicBoolean(); + final AsyncIOThread[] ioReadThreads; final AsyncIOThread[] ioWriteThreads; @@ -132,7 +132,7 @@ public class AsyncIOGroup extends AsyncGroup { if (started) { return this; } - if (closed) { + if (closed.get()) { throw new RuntimeException("group is closed"); } for (int i = 0; i < this.ioReadThreads.length; i++) { @@ -162,21 +162,19 @@ public class AsyncIOGroup extends AsyncGroup { return this; } - public synchronized AsyncIOGroup dispose() { - if (closed) { - return this; + public AsyncIOGroup dispose() { + if (closed.compareAndSet(false, true)) { + for (AsyncIOThread t : this.ioReadThreads) { + t.close(); + } + for (AsyncIOThread t : this.ioWriteThreads) { + t.close(); + } + if (connectThread != null) { + connectThread.close(); + } + this.timeoutExecutor.shutdownNow(); } - for (AsyncIOThread t : this.ioReadThreads) { - t.close(); - } - for (AsyncIOThread t : this.ioWriteThreads) { - t.close(); - } - if (connectThread != null) { - connectThread.close(); - } - this.timeoutExecutor.shutdownNow(); - closed = true; return this; } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 1e54f2c90..d4138b455 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -10,6 +10,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.*; import java.util.logging.*; import org.redkale.util.*; @@ -38,7 +39,7 @@ public class AsyncIOThread extends WorkThread { private final Queue> registerQueue = Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); - private boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(); public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, null); @@ -50,7 +51,7 @@ public class AsyncIOThread extends WorkThread { } protected boolean isClosed() { - return closed; + return closed.get(); } public static AsyncIOThread currAsyncIOThread() { @@ -148,7 +149,7 @@ public class AsyncIOThread extends WorkThread { try { register.accept(selector); } catch (Throwable t) { - if (!this.closed) { + if (!this.closed.get()) { logger.log(Level.INFO, getName() + " register run failed", t); } } @@ -158,7 +159,7 @@ public class AsyncIOThread extends WorkThread { try { command.run(); } catch (Throwable t) { - if (!this.closed) { + if (!this.closed.get()) { logger.log(Level.INFO, getName() + " command run failed", t); } } @@ -202,16 +203,15 @@ public class AsyncIOThread extends WorkThread { } } } catch (Throwable ex) { - if (!this.closed) { + if (!this.closed.get()) { logger.log(Level.FINE, getName() + " selector run failed", ex); } } } } - public synchronized void close() { - if (!this.closed) { - this.closed = true; + public void close() { + if (this.closed.compareAndSet(false, true)) { try { this.selector.close(); } catch (Exception e) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 655f0fa4b..942cff445 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -471,7 +471,7 @@ abstract class AsyncNioConnection extends AsyncConnection { ByteBuffer bb; @Override - public synchronized int read() throws IOException { + public int read() throws IOException { if (bb == null || !bb.hasRemaining()) { int r = readBuffer(); if (r < 1) { @@ -482,7 +482,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - public synchronized int read(byte b[], int off, int len) throws IOException { + public int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { diff --git a/src/main/java/org/redkale/net/DispatcherServlet.java b/src/main/java/org/redkale/net/DispatcherServlet.java index e5426bffc..ccf8025ca 100644 --- a/src/main/java/org/redkale/net/DispatcherServlet.java +++ b/src/main/java/org/redkale/net/DispatcherServlet.java @@ -8,6 +8,7 @@ package org.redkale.net; import java.io.Serializable; import java.util.*; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.logging.Level; import java.util.stream.Stream; @@ -36,11 +37,11 @@ public abstract class DispatcherServlet servlets = new HashSet<>(); - private final Object mappingLock = new Object(); + private final ReentrantLock mappingLock = new ReentrantLock(); private Map mappings = new HashMap<>(); @@ -48,6 +49,8 @@ public abstract class DispatcherServlet> filters = new ArrayList<>(); + protected final ReentrantLock filtersLock = new ReentrantLock(); + protected Application application; protected Filter headFilter; @@ -61,65 +64,84 @@ public abstract class DispatcherServlet newservlets = new HashSet<>(servlets); newservlets.add(servlet); this.servlets = newservlets; + } finally { + servletLock.unlock(); } } protected void removeServlet(S servlet) { - synchronized (servletLock) { + servletLock.lock(); + try { Set newservlets = new HashSet<>(servlets); newservlets.remove(servlet); this.servlets = newservlets; doAfterRemove(servlet); + } finally { + servletLock.unlock(); } } public boolean containsServlet(Class servletClass) { - synchronized (servletLock) { + servletLock.lock(); + try { for (S servlet : new HashSet<>(servlets)) { if (servlet.getClass().equals(servletClass)) { return true; } } return false; + } finally { + servletLock.unlock(); } } public boolean containsServlet(String servletClassName) { - synchronized (servletLock) { + servletLock.lock(); + try { for (S servlet : new HashSet<>(servlets)) { if (servlet.getClass().getName().equals(servletClassName)) { return true; } } return false; + } finally { + servletLock.unlock(); } } protected void putMapping(K key, S servlet) { - synchronized (mappingLock) { + mappingLock.lock(); + try { Map newmappings = new HashMap<>(mappings); newmappings.put(key, servlet); this.mappings = newmappings; + } finally { + mappingLock.unlock(); } } protected void removeMapping(K key) { - synchronized (mappingLock) { + mappingLock.lock(); + try { if (mappings.containsKey(key)) { Map newmappings = new HashMap<>(mappings); S s = newmappings.remove(key); this.mappings = newmappings; doAfterRemove(s); } + } finally { + mappingLock.unlock(); } } protected void removeMapping(S servlet) { - synchronized (mappingLock) { + mappingLock.lock(); + try { List keys = new ArrayList<>(); Map newmappings = new HashMap<>(mappings); for (Map.Entry en : newmappings.entrySet()) { @@ -130,6 +152,8 @@ public abstract class DispatcherServlet filter : filters) { @@ -159,28 +184,36 @@ public abstract class DispatcherServlet filter, AnyValue conf) { filter._conf = conf; - synchronized (filters) { + filtersLock.lock(); + try { this.filters.add(filter); this.allFilterAsync = this.allFilterAsync && isAsync(filter); Collections.sort(this.filters); + } finally { + filtersLock.unlock(); } } @@ -224,7 +257,8 @@ public abstract class DispatcherServlet, R extends ClientR private final AtomicInteger connSeqno = new AtomicInteger(); - private boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(); protected ScheduledFuture timeoutFuture; @@ -180,28 +180,26 @@ public abstract class Client, R extends ClientR protected void handlePingResult(C conn, P result) { } - public synchronized void close() { - if (this.closed) { - return; - } - this.timeoutScheduler.shutdownNow(); - for (ClientConnection conn : this.connArray) { - if (conn == null) { - continue; - } - final R closeReq = closeRequestSupplier == null ? null : closeRequestSupplier.get(); - if (closeReq == null) { - conn.dispose(null); - } else { - try { - conn.writeChannel(closeReq).get(1, TimeUnit.SECONDS); - } catch (Exception e) { + public void close() { + if (closed.compareAndSet(false, true)) { + this.timeoutScheduler.shutdownNow(); + for (ClientConnection conn : this.connArray) { + if (conn == null) { + continue; + } + final R closeReq = closeRequestSupplier == null ? null : closeRequestSupplier.get(); + if (closeReq == null) { + conn.dispose(null); + } else { + try { + conn.writeChannel(closeReq).get(1, TimeUnit.SECONDS); + } catch (Exception e) { + } + conn.dispose(null); } - conn.dispose(null); } + group.close(); } - group.close(); - this.closed = true; } public final CompletableFuture

sendAsync(R request) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 3f649803b..000f1d3ef 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -10,6 +10,7 @@ import java.nio.channels.ClosedChannelException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import java.util.function.*; import org.redkale.net.*; @@ -39,6 +40,10 @@ public abstract class ClientConnection implements Co protected final List pauseRequests = new CopyOnWriteArrayList<>(); + private final ReentrantLock pauseLock = new ReentrantLock(); + + private final Condition pauseCondition = pauseLock.newCondition(); + protected final AsyncConnection channel; private final ClientCodec codec; @@ -181,6 +186,25 @@ public abstract class ClientConnection implements Co } } + void signalPauseRequest() { + pauseLock.lock(); + try { + pauseCondition.signalAll(); + } finally { + pauseLock.unlock(); + } + } + + void awaitPauseRequest() { + pauseLock.lock(); + try { + pauseCondition.await(3_000, TimeUnit.SECONDS); + } catch (Exception e) { + } finally { + pauseLock.unlock(); + } + } + public boolean isAuthenticated() { return authenticated; } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index ff8e6936d..1139488bd 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -51,9 +51,7 @@ public class ClientWriteIOThread extends AsyncIOThread { } finally { conn.pauseResuming.set(false); conn.pauseWriting.set(false); - synchronized (conn.pauseRequests) { - conn.pauseRequests.notify(); - } + conn.signalPauseRequest(); } } } @@ -77,12 +75,7 @@ public class ClientWriteIOThread extends AsyncIOThread { entry.conn.offerRespFuture(entry); if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseResuming.get()) { - try { - synchronized (entry.conn.pauseRequests) { - entry.conn.pauseRequests.wait(3_000); - } - } catch (InterruptedException ie) { - } + entry.conn.awaitPauseRequest(); } entry.conn.pauseRequests.add(entry); } else { @@ -94,12 +87,7 @@ public class ClientWriteIOThread extends AsyncIOThread { entry.conn.offerRespFuture(entry); if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseResuming.get()) { - try { - synchronized (entry.conn.pauseRequests) { - entry.conn.pauseRequests.wait(3_000); - } - } catch (InterruptedException ie) { - } + entry.conn.awaitPauseRequest(); } entry.conn.pauseRequests.add(entry); } else { diff --git a/src/main/java/org/redkale/net/http/HttpContext.java b/src/main/java/org/redkale/net/http/HttpContext.java index ab38c4ef6..23db82030 100644 --- a/src/main/java/org/redkale/net/http/HttpContext.java +++ b/src/main/java/org/redkale/net/http/HttpContext.java @@ -63,14 +63,12 @@ public class HttpContext extends Context { // // protected void addRequestURINode(String path) { // RequestURINode node = new RequestURINode(path); -// synchronized (this) { // if (this.uriCacheNodes != null) { // for (int i = 0; i < uriCacheNodes.length; i++) { // if (uriCacheNodes[i].path.equals(path)) return; // } // } // this.uriCacheNodes = Utility.append(this.uriCacheNodes, node); -// } // } @Override protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) { @@ -85,7 +83,7 @@ public class HttpContext extends Context { protected void updateWebSocketWriteIOThread(WebSocket webSocket) { WebSocketWriteIOThread writeIOThread = webSocketWriterIOThreadFunc.apply(webSocket); updateWriteIOThread(webSocket._channel, writeIOThread); - webSocket._writeIOThread = writeIOThread; + webSocket._writeIOThread = writeIOThread; } protected String createSessionid() { @@ -96,16 +94,11 @@ public class HttpContext extends Context { @SuppressWarnings("unchecked") protected Creator loadAsyncHandlerCreator(Class handlerClass) { - Creator creator = asyncHandlerCreators.get(handlerClass); - if (creator == null) { - creator = createAsyncHandlerCreator(handlerClass); - asyncHandlerCreators.put(handlerClass, creator); - } - return creator; + return asyncHandlerCreators.computeIfAbsent(handlerClass, c -> createAsyncHandlerCreator(c)); } @SuppressWarnings("unchecked") - private static synchronized Creator createAsyncHandlerCreator(Class handlerClass) { + private static Creator createAsyncHandlerCreator(Class handlerClass) { //生成规则与SncpAsyncHandler.Factory 很类似 //------------------------------------------------------------- final boolean handlerinterface = handlerClass.isInterface(); diff --git a/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java b/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java index f73c1dbc6..978854fa8 100644 --- a/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java @@ -7,6 +7,7 @@ package org.redkale.net.http; import java.io.IOException; import java.util.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; import java.util.regex.Pattern; @@ -40,7 +41,9 @@ public class HttpDispatcherServlet extends DispatcherServlet allMapStrings = new HashMap<>(); - private final Object excludeLock = new Object(); + private final ReentrantLock allMapLock = new ReentrantLock(); + + private final ReentrantLock excludeLock = new ReentrantLock(); protected HttpContext context; @@ -54,7 +57,8 @@ public class HttpDispatcherServlet extends DispatcherServlet removeHttpServlet(final Predicate predicateEntry, final Predicate> predicateFilter) { List servlets = new ArrayList<>(); - synchronized (allMapStrings) { + allMapLock.lock(); + try { List keys = new ArrayList<>(); if (regxArray != null) { for (MappingEntry me : regxArray) { @@ -96,6 +100,8 @@ public class HttpDispatcherServlet extends DispatcherServlet { diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index aa542e95d..83e8bb204 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -14,6 +14,7 @@ import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.logging.Level; import org.redkale.boot.Application; @@ -43,7 +44,7 @@ public class HttpServer extends Server safeBufferPool; - private final Object groupLock = new Object(); + private final ReentrantLock groupLock = new ReentrantLock(); private WebSocketAsyncGroup asyncGroup; @@ -546,12 +547,15 @@ public class HttpServer extends Server { if (asyncGroup == null) { - synchronized (groupLock) { + groupLock.lock(); + try { if (asyncGroup == null) { WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, bufferCapacity, safeBufferPool); g.start(); asyncGroup = g; } + } finally { + groupLock.unlock(); } } return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread(); diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index e413441f5..27a7e9092 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.*; import java.util.logging.*; import java.util.stream.Stream; @@ -126,7 +127,7 @@ public abstract class WebSocket { boolean initiateClosed; //收到客户端发送的CLOSE消息 - private boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(); protected WebSocket() { } @@ -956,14 +957,7 @@ public abstract class WebSocket { //closeRunner CompletableFuture kill(int code, String reason) { - if (closed) { - return null; - } - synchronized (this) { - if (closed) { - return null; - } - closed = true; + if (closed.compareAndSet(false, true)) { if (_channel == null) { return null; } @@ -974,6 +968,8 @@ public abstract class WebSocket { return future; } return CompletableFuture.allOf(future, closeFuture); + } else { + return null; } } @@ -983,7 +979,7 @@ public abstract class WebSocket { * @return boolean */ public final boolean isClosed() { - return this.closed; + return this.closed.get(); } @Override diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index dccd5c967..2418b279d 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -71,8 +71,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - private final MessageDigest digest = getMessageDigest(); - private final BiConsumer restMessageConsumer = createRestOnMessageConsumer(); protected Type messageRestType; //RestWebSocket时会被修改 @@ -276,10 +274,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //onOpen成功或者存在delayPackets webSocket._sessionid = sessionid; request.setKeepAlive(true); - byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes(); - synchronized (digest) { - bytes = digest.digest(bytes); - } + byte[] bytes = sha1(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + response.setStatus(101); response.setHeader("Connection", "Upgrade"); response.addHeader("Upgrade", "websocket"); @@ -295,7 +291,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer); //webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket); response.getContext().updateWebSocketWriteIOThread(webSocket); - + Runnable createUseridHandler = () -> { CompletableFuture userFuture = webSocket.createUserid(); if (userFuture == null) { @@ -422,9 +418,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return null; } - private static MessageDigest getMessageDigest() { + private static byte[] sha1(String str) { try { - return MessageDigest.getInstance("SHA-1"); + return MessageDigest.getInstance("SHA-1").digest(str.getBytes()); } catch (Exception e) { throw new HttpException(e); } diff --git a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java index 9c14389cd..8d2c2d8ac 100644 --- a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java @@ -6,6 +6,7 @@ package org.redkale.net.sncp; import java.io.IOException; +import java.util.concurrent.locks.ReentrantLock; import org.redkale.net.DispatcherServlet; import org.redkale.service.Service; import org.redkale.util.*; @@ -19,13 +20,14 @@ import org.redkale.util.*; */ public class SncpDispatcherServlet extends DispatcherServlet { - private final Object sncplock = new Object(); + private final ReentrantLock sncplock = new ReentrantLock(); private final byte[] pongBytes = Sncp.getPongBytes(); @Override public void addServlet(SncpServlet servlet, Object attachment, AnyValue conf, Uint128... mappings) { - synchronized (sncplock) { + sncplock.lock(); + try { for (SncpServlet s : getServlets()) { if (s.service == servlet.service) { throw new SncpException(s.service + " repeat addSncpServlet"); @@ -34,12 +36,15 @@ public class SncpDispatcherServlet extends DispatcherServlet SncpServlet removeSncpServlet(Service service) { SncpServlet rs = null; - synchronized (sncplock) { + sncplock.lock(); + try { for (SncpServlet servlet : getServlets()) { if (servlet.service == service) { rs = servlet; @@ -50,6 +55,8 @@ public class SncpDispatcherServlet extends DispatcherServlet> container = new ConcurrentHashMap<>(); + protected final ReentrantLock containerLock = new ReentrantLock(); + protected final BiConsumer futureCompleteConsumer = (r, t) -> { if (t != null) { logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t); @@ -196,7 +199,8 @@ public final class CacheMemorySource extends AbstractCacheSource { public long hincrby(final String key, String field, long num) { CacheEntry entry = container.get(key); if (entry == null) { - synchronized (container) { + containerLock.lock(); + try { entry = container.get(key); if (entry == null) { ConcurrentHashMap map = new ConcurrentHashMap(); @@ -204,11 +208,14 @@ public final class CacheMemorySource extends AbstractCacheSource { entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); container.put(key, entry); } + } finally { + containerLock.unlock(); } } Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); if (!(val instanceof AtomicLong)) { - synchronized (entry.mapValue) { + entry.mapLock.lock(); + try { if (!(val instanceof AtomicLong)) { if (val == null) { val = new AtomicLong(); @@ -217,6 +224,8 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.mapValue.put(field, val); } + } finally { + entry.mapLock.unlock(); } } return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); @@ -226,7 +235,8 @@ public final class CacheMemorySource extends AbstractCacheSource { public double hincrbyFloat(final String key, String field, double num) { CacheEntry entry = container.get(key); if (entry == null) { - synchronized (container) { + containerLock.lock(); + try { entry = container.get(key); if (entry == null) { ConcurrentHashMap map = new ConcurrentHashMap(); @@ -234,11 +244,14 @@ public final class CacheMemorySource extends AbstractCacheSource { entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); container.put(key, entry); } + } finally { + containerLock.unlock(); } } Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); if (!(val instanceof AtomicLong)) { - synchronized (entry.mapValue) { + entry.mapLock.lock(); + try { if (!(val instanceof AtomicLong)) { if (val == null) { val = new AtomicLong(); @@ -247,6 +260,8 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.mapValue.put(field, val); } + } finally { + entry.mapLock.unlock(); } } return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num))); @@ -1068,12 +1083,15 @@ public final class CacheMemorySource extends AbstractCacheSource { public long incrby(final String key, long num) { CacheEntry entry = container.get(key); if (entry == null) { - synchronized (container) { + containerLock.lock(); + try { entry = container.get(key); if (entry == null) { entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null); container.put(key, entry); } + } finally { + containerLock.unlock(); } } return ((AtomicLong) entry.objectValue).addAndGet(num); @@ -1083,12 +1101,15 @@ public final class CacheMemorySource extends AbstractCacheSource { public double incrbyFloat(final String key, double num) { CacheEntry entry = container.get(key); if (entry == null) { - synchronized (container) { + containerLock.lock(); + try { entry = container.get(key); if (entry == null) { entry = new CacheEntry(CacheEntryType.DOUBLE, key, new AtomicLong(), null, null, null); container.put(key, entry); } + } finally { + containerLock.unlock(); } } Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); @@ -1744,6 +1765,8 @@ public final class CacheMemorySource extends AbstractCacheSource { ConcurrentHashMap mapValue; + final ReentrantLock mapLock = new ReentrantLock(); + CopyOnWriteArraySet csetValue; ConcurrentLinkedQueue listValue; diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index af435a047..8bb439619 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -284,7 +284,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { } st.close(); } else { //分库分表 - synchronized (info.disTableLock()) { + info.disTableLock().lock(); + try { final Set newCatalogs = new LinkedHashSet<>(); final List tableCopys = new ArrayList<>(); prepareInfos.forEach((t, p) -> { @@ -375,6 +376,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } } + } finally { + info.disTableLock().unlock(); } } if (info.getTableStrategy() == null) { @@ -2439,7 +2442,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } @ResourceListener - public synchronized void onResourceChange(ResourceEvent[] events) { + public void onResourceChange(ResourceEvent[] events) { String newUrl = this.url; int newConnectTimeoutSeconds = this.connectTimeoutSeconds; int newMaxconns = this.maxConns; @@ -2481,7 +2484,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - public synchronized Connection pollConnection() { + public Connection pollConnection() { Connection conn = queue.poll(); if (conn == null) { if (usingCounter.intValue() >= maxConns) { diff --git a/src/main/java/org/redkale/source/EntityInfo.java b/src/main/java/org/redkale/source/EntityInfo.java index b85411045..49e33b4fd 100644 --- a/src/main/java/org/redkale/source/EntityInfo.java +++ b/src/main/java/org/redkale/source/EntityInfo.java @@ -9,6 +9,7 @@ import java.io.Serializable; import java.lang.reflect.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; import org.redkale.annotation.Comment; @@ -36,6 +37,8 @@ public final class EntityInfo { //全局静态资源 private static final ConcurrentHashMap entityInfos = new ConcurrentHashMap<>(); + private static final ReentrantLock infosLock = new ReentrantLock(); + //日志 private static final Logger logger = Logger.getLogger(EntityInfo.class.getSimpleName()); @@ -104,6 +107,8 @@ public final class EntityInfo { //用于存在database.table_20160202类似这种分布式表, 服务分布式部署时不存在的表名不一定真实不存在 private final Set disTables = new CopyOnWriteArraySet<>(); + private final ReentrantLock disTableLock = new ReentrantLock(); + //不能为null的字段名 private final Set notNullColumns = new CopyOnWriteArraySet<>(); @@ -226,7 +231,8 @@ public final class EntityInfo { if (rs != null && (rs.cache == null || rs.cache.isFullLoaded())) { return rs; } - synchronized (entityInfos) { + infosLock.lock(); + try { rs = entityInfos.get(clazz); if (rs == null) { rs = new EntityInfo(clazz, cacheForbidden, conf, source, fullloader); @@ -239,6 +245,8 @@ public final class EntityInfo { rs.cache.fullLoadAsync(); } return rs; + } finally { + infosLock.unlock(); } } @@ -771,8 +779,8 @@ public final class EntityInfo { return tableStrategy; } - public Object disTableLock() { - return disTables; + public ReentrantLock disTableLock() { + return disTableLock; } public boolean containsDisTable(String tableKey) { diff --git a/src/main/java/org/redkale/source/FilterNodeBean.java b/src/main/java/org/redkale/source/FilterNodeBean.java index abd405551..0bdb3955c 100644 --- a/src/main/java/org/redkale/source/FilterNodeBean.java +++ b/src/main/java/org/redkale/source/FilterNodeBean.java @@ -9,6 +9,7 @@ import java.io.Serializable; import java.lang.reflect.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import org.redkale.persistence.Transient; import static org.redkale.source.FilterExpress.*; import org.redkale.util.*; @@ -23,7 +24,9 @@ import org.redkale.util.*; */ public final class FilterNodeBean implements Comparable> { - private static final ConcurrentHashMap beanodes = new ConcurrentHashMap<>(); + private static final ReentrantLock beanLock = new ReentrantLock(); + + private static final ConcurrentHashMap beanNodes = new ConcurrentHashMap<>(); private Attribute beanAttr; @@ -147,17 +150,20 @@ public final class FilterNodeBean implements Comparable clazz) { - FilterNodeBean rs = beanodes.get(clazz); + FilterNodeBean rs = beanNodes.get(clazz); if (rs != null) { return rs; } - synchronized (beanodes) { - rs = beanodes.get(clazz); + beanLock.lock(); + try { + rs = beanNodes.get(clazz); if (rs == null) { rs = createFilterNodeBean(clazz); - beanodes.put(clazz, rs); + beanNodes.put(clazz, rs); } return rs; + } finally { + beanLock.unlock(); } } diff --git a/src/main/java/org/redkale/util/NonBlockingHashMap.java b/src/main/java/org/redkale/util/NonBlockingHashMap.java index fc6dfc1d4..256189149 100644 --- a/src/main/java/org/redkale/util/NonBlockingHashMap.java +++ b/src/main/java/org/redkale/util/NonBlockingHashMap.java @@ -1042,7 +1042,6 @@ public class NonBlockingHashMap extends AbstractMap return newkvs; // Use the new table already // TODO - use a wait with timeout, so we'll wakeup as soon as the new table // is ready, or after the timeout in any case. - //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup // For now, sleep a tad and see if the 2 guys already trying to make // the table actually get around to making it happen. try { @@ -1875,10 +1874,8 @@ public class NonBlockingHashMap extends AbstractMap // // We could use a wait with timeout, so we'll wakeup as soon as the new // // table is ready, or after the timeout in any case. Annoyingly, this // // breaks the non-blocking property - so for now we just briefly sleep. - // //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup // try { Thread.sleep(r>>17); } catch( InterruptedException e ) { } // if( master._cat != this ) return old; - //} CAT newcat = new CAT(this, t.length * 2, 0); // Take 1 stab at updating the CAT with the new larger size. If this diff --git a/src/main/java/org/redkale/util/RedkaleClassLoader.java b/src/main/java/org/redkale/util/RedkaleClassLoader.java index ae5a3a215..daaae5561 100644 --- a/src/main/java/org/redkale/util/RedkaleClassLoader.java +++ b/src/main/java/org/redkale/util/RedkaleClassLoader.java @@ -11,8 +11,8 @@ import java.net.*; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.*; /** * Redkale内部ClassLoader @@ -65,6 +65,8 @@ public class RedkaleClassLoader extends URLClassLoader { private static final ConcurrentHashMap> reflectionMap = new ConcurrentHashMap<>(); + private static final ReentrantLock reflectionLock = new ReentrantLock(); + public RedkaleClassLoader(ClassLoader parent) { super(new URL[0], parent); } @@ -74,7 +76,9 @@ public class RedkaleClassLoader extends URLClassLoader { } public static URI getConfResourceAsURI(String confURI, String file) { - if (file.startsWith("http:") || file.startsWith("https:") || file.startsWith("ftp:")) return URI.create(file); + if (file.startsWith("http:") || file.startsWith("https:") || file.startsWith("ftp:")) { + return URI.create(file); + } if (confURI != null && !confURI.contains("!")) { //带!的是 /usr/xxx.jar!/META-INF/conf/xxx File f = new File(URI.create(confURI).getPath(), file); if (f.isFile() && f.canRead()) { @@ -146,13 +150,16 @@ public class RedkaleClassLoader extends URLClassLoader { } public static void putReflectionClass(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap<>(); map.put("name", name); reflectionMap.put(name, map); } + } finally { + reflectionLock.unlock(); } } @@ -166,7 +173,8 @@ public class RedkaleClassLoader extends URLClassLoader { } public static void putReflectionField(String name, Field field) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -186,13 +194,18 @@ public class RedkaleClassLoader extends URLClassLoader { break; } } - if (!contains) list.add((Map) Utility.ofMap("name", field.getName())); + if (!contains) { + list.add((Map) Utility.ofMap("name", field.getName())); + } } + } finally { + reflectionLock.unlock(); } } public static void putReflectionMethod(String name, Method method) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -218,13 +231,18 @@ public class RedkaleClassLoader extends URLClassLoader { break; } } - if (!contains) list.add(createMap(method.getName(), method.getParameterTypes())); + if (!contains) { + list.add(createMap(method.getName(), method.getParameterTypes())); + } } + } finally { + reflectionLock.unlock(); } } public static void putReflectionDeclaredConstructors(Class clazz, String name, Class... cts) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -234,8 +252,12 @@ public class RedkaleClassLoader extends URLClassLoader { map.put("allDeclaredConstructors", true); if (clazz != null) { - if (clazz.isInterface()) return; - if (Modifier.isAbstract(clazz.getModifiers())) return; + if (clazz.isInterface()) { + return; + } + if (Modifier.isAbstract(clazz.getModifiers())) { + return; + } try { clazz.getDeclaredConstructor(cts); } catch (Throwable t) { @@ -259,13 +281,18 @@ public class RedkaleClassLoader extends URLClassLoader { break; } } - if (!contains) list.add((Map) Utility.ofMap("name", "", "parameterTypes", types)); + if (!contains) { + list.add((Map) Utility.ofMap("name", "", "parameterTypes", types)); + } } + } finally { + reflectionLock.unlock(); } } public static void putReflectionPublicConstructors(Class clazz, String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -275,8 +302,12 @@ public class RedkaleClassLoader extends URLClassLoader { map.put("allPublicConstructors", true); if (clazz != null) { - if (clazz.isInterface()) return; - if (Modifier.isAbstract(clazz.getModifiers())) return; + if (clazz.isInterface()) { + return; + } + if (Modifier.isAbstract(clazz.getModifiers())) { + return; + } try { clazz.getConstructor(); } catch (Throwable t) { @@ -296,13 +327,18 @@ public class RedkaleClassLoader extends URLClassLoader { break; } } - if (!contains) list.add((Map) Utility.ofMap("name", "", "parameterTypes", new String[0])); + if (!contains) { + list.add((Map) Utility.ofMap("name", "", "parameterTypes", new String[0])); + } } + } finally { + reflectionLock.unlock(); } } public static void putReflectionDeclaredMethods(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -310,11 +346,14 @@ public class RedkaleClassLoader extends URLClassLoader { reflectionMap.put(name, map); } map.put("allDeclaredMethods", true); + } finally { + reflectionLock.unlock(); } } public static void putReflectionPublicMethods(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -322,11 +361,14 @@ public class RedkaleClassLoader extends URLClassLoader { reflectionMap.put(name, map); } map.put("allPublicMethods", true); + } finally { + reflectionLock.unlock(); } } public static void putReflectionDeclaredFields(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -334,11 +376,14 @@ public class RedkaleClassLoader extends URLClassLoader { reflectionMap.put(name, map); } map.put("allDeclaredFields", true); + } finally { + reflectionLock.unlock(); } } public static void putReflectionPublicFields(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -346,11 +391,14 @@ public class RedkaleClassLoader extends URLClassLoader { reflectionMap.put(name, map); } map.put("allPublicFields", true); + } finally { + reflectionLock.unlock(); } } public static void putReflectionDeclaredClasses(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -358,11 +406,14 @@ public class RedkaleClassLoader extends URLClassLoader { reflectionMap.put(name, map); } map.put("allDeclaredClasses", true); + } finally { + reflectionLock.unlock(); } } public static void putReflectionPublicClasses(String name) { - synchronized (reflectionMap) { + reflectionLock.lock(); + try { Map map = reflectionMap.get(name); if (map == null) { map = new LinkedHashMap(); @@ -370,6 +421,8 @@ public class RedkaleClassLoader extends URLClassLoader { reflectionMap.put(name, map); } map.put("allPublicClasses", true); + } finally { + reflectionLock.unlock(); } } @@ -423,7 +476,9 @@ public class RedkaleClassLoader extends URLClassLoader { } do { String loaderName = loader.getClass().getName(); - if (loaderName.startsWith("sun.") && loaderName.contains("ExtClassLoader")) continue; + if (loaderName.startsWith("sun.") && loaderName.contains("ExtClassLoader")) { + continue; + } if (loader instanceof URLClassLoader) { for (URL url : ((URLClassLoader) loader).getURLs()) { set.add(url); diff --git a/src/main/java/org/redkale/util/ResourceFactory.java b/src/main/java/org/redkale/util/ResourceFactory.java index e947df754..b1618b74f 100644 --- a/src/main/java/org/redkale/util/ResourceFactory.java +++ b/src/main/java/org/redkale/util/ResourceFactory.java @@ -12,6 +12,7 @@ import java.math.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; import org.redkale.annotation.*; @@ -44,6 +45,8 @@ public final class ResourceFactory { private static final Logger logger = Logger.getLogger(ResourceFactory.class.getSimpleName()); + private final ReentrantLock lock = new ReentrantLock(); + private final ResourceFactory parent; private final List> chidren = new CopyOnWriteArrayList<>(); @@ -112,6 +115,14 @@ public final class ResourceFactory { this.store.clear(); } + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); + } + /** * 检查资源名是否合法 *

@@ -1116,6 +1127,8 @@ public final class ResourceFactory {
 
     private static class ResourceElement {
 
+        private static final ReentrantLock syncLock = new ReentrantLock();
+
         private static final HashMap listenerMethods = new HashMap<>(); //不使用ConcurrentHashMap是因为value不能存null
 
         public final WeakReference dest;
@@ -1140,7 +1153,8 @@ public final class ResourceFactory {
         }
 
         private static Method findListener(Class clazz, Class fieldType, AtomicBoolean diff) {
-            synchronized (listenerMethods) {
+            syncLock.lock();
+            try {
                 Class loop = clazz;
                 Method m = listenerMethods.get(clazz.getName() + "-" + fieldType.getName());
                 if (m != null) {
@@ -1167,6 +1181,8 @@ public final class ResourceFactory {
                 } while ((loop = loop.getSuperclass()) != Object.class);
                 listenerMethods.put(clazz.getName() + "-" + fieldType.getName(), m);
                 return m;
+            } finally {
+                syncLock.unlock();
             }
         }
     }
diff --git a/src/main/java/org/redkale/util/SignalShutDown.java b/src/main/java/org/redkale/util/SignalShutDown.java
index c5f8612cc..45d1932fe 100644
--- a/src/main/java/org/redkale/util/SignalShutDown.java
+++ b/src/main/java/org/redkale/util/SignalShutDown.java
@@ -43,7 +43,7 @@
 //    }
 //
 //    @Override
-//    public synchronized void handle(sun.misc.Signal sig) {
+//    public void handle(sun.misc.Signal sig) {
 //        String sigstr = sig + "," + sig.getName() + "," + sig.getNumber();
 //        shutdownConsumer.accept(sigstr);
 //    }
diff --git a/src/main/java/org/redkale/util/SimpleProxySelector.java b/src/main/java/org/redkale/util/SimpleProxySelector.java
index d38466f48..d31214d74 100644
--- a/src/main/java/org/redkale/util/SimpleProxySelector.java
+++ b/src/main/java/org/redkale/util/SimpleProxySelector.java
@@ -36,7 +36,7 @@ public class SimpleProxySelector extends ProxySelector {
     }
 
     @Override
-    public synchronized List select(URI uri) {
+    public List select(URI uri) {
         String scheme = uri.getScheme().toLowerCase();
         if (scheme.equals("http") || scheme.equals("https")) {
             return list;
diff --git a/src/main/java/org/redkale/util/TypeToken.java b/src/main/java/org/redkale/util/TypeToken.java
index 78a08dbd4..30493d5ec 100644
--- a/src/main/java/org/redkale/util/TypeToken.java
+++ b/src/main/java/org/redkale/util/TypeToken.java
@@ -7,6 +7,7 @@ package org.redkale.util;
 import java.lang.reflect.Type;
 import java.lang.reflect.*;
 import java.util.*;
+import java.util.concurrent.locks.ReentrantLock;
 import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
 import org.redkale.asm.*;
 import static org.redkale.asm.Opcodes.*;
@@ -24,6 +25,8 @@ import static org.redkale.asm.Opcodes.*;
 @SuppressWarnings("unchecked")
 public abstract class TypeToken {
 
+    private static final ReentrantLock syncLock = new ReentrantLock();
+
     private final Type type;
 
     public TypeToken() {
@@ -487,7 +490,12 @@ public abstract class TypeToken {
                 }
             }
             if (count == actualTypeArguments0.length) {
-                return createParameterizedType0((Class) rawType0, actualTypeArguments0);
+                syncLock.lock();
+                try {
+                    return createParameterizedType0((Class) rawType0, actualTypeArguments0);
+                } finally {
+                    syncLock.unlock();
+                }
             }
         }
         return new ParameterizedType() {
@@ -557,7 +565,7 @@ public abstract class TypeToken {
     }
 
     // 注意:  RetResult[]> 这种泛型带[]的尚未实现支持
-    private static synchronized Type createParameterizedType0(final Class rawType, final Type... actualTypeArguments) {
+    private static Type createParameterizedType0(final Class rawType, final Type... actualTypeArguments) {
         ClassLoader loader = Thread.currentThread().getContextClassLoader();
         StringBuilder tmpps = new StringBuilder(getClassTypeDescriptor(rawType));
         for (Type cz : actualTypeArguments) {
diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java
index 8657d2b92..79d4ffc29 100644
--- a/src/main/java/org/redkale/util/Utility.java
+++ b/src/main/java/org/redkale/util/Utility.java
@@ -16,6 +16,7 @@ import java.security.*;
 import java.time.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.*;
 import java.util.stream.Stream;
 import java.util.zip.GZIPInputStream;
@@ -83,7 +84,7 @@ public final class Utility {
 
     private static final ToLongFunction bufferAddrFunction;
 
-    private static final Object clientLock = new Object();
+    private static final ReentrantLock clientLock = new ReentrantLock();
 
     private static HttpClient httpClient;
 
@@ -4500,10 +4501,13 @@ public final class Utility {
         }
         java.net.http.HttpClient c = client == null ? httpClient : client;
         if (c == null) {
-            synchronized (clientLock) {
+            clientLock.lock();
+            try {
                 if (httpClient == null) {
                     httpClient = java.net.http.HttpClient.newHttpClient();
                 }
+            } finally {
+                clientLock.unlock();
             }
             c = httpClient;
         }