synchronized优化

This commit is contained in:
redkale
2023-02-01 17:23:33 +08:00
parent f7f9e8db41
commit ad981835f9
42 changed files with 683 additions and 335 deletions

View File

@@ -16,6 +16,7 @@ import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.*;
import javax.net.ssl.SSLContext;
@@ -148,9 +149,13 @@ public final class Application {
//CacheSource 资源
final List<CacheSource> cacheSources = new CopyOnWriteArrayList<>();
private final ReentrantLock cacheSourceLock = new ReentrantLock();
//DataSource 资源
final List<DataSource> dataSources = new CopyOnWriteArrayList<>();
private final ReentrantLock dataSourceLock = new ReentrantLock();
//NodeServer 资源, 顺序必须是sncps, others, watchs
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
@@ -167,6 +172,8 @@ public final class Application {
//只存放不以system.property.、mimetype.property.、redkale.开头的配置项
private final Properties envProperties = new Properties();
private final ReentrantLock envPropertiesLock = new ReentrantLock();
//配置信息只读版Properties
private final Environment environment;
@@ -1223,7 +1230,8 @@ public final class Application {
}
CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
synchronized (cacheSources) {
cacheSourceLock.lock();
try {
long st = System.currentTimeMillis();
CacheSource old = resourceFactory.find(sourceName, CacheSource.class);
if (old != null) {
@@ -1262,11 +1270,14 @@ public final class Application {
logger.log(Level.SEVERE, "load application CaheSource error: " + sourceConf, e);
}
return null;
} finally {
cacheSourceLock.unlock();
}
}
DataSource loadDataSource(final String sourceName, boolean autoMemory) {
synchronized (dataSources) {
dataSourceLock.lock();
try {
DataSource old = resourceFactory.find(sourceName, DataSource.class);
if (old != null) {
return old;
@@ -1313,6 +1324,8 @@ public final class Application {
logger.log(Level.SEVERE, "load application DataSource error: " + sourceConf, e);
}
return null;
} finally {
dataSourceLock.unlock();
}
}
@@ -1647,6 +1660,7 @@ public final class Application {
this.servicecdl = new CountDownLatch(serconfs.size());
CountDownLatch sercdl = new CountDownLatch(serconfs.size());
final AtomicBoolean inited = new AtomicBoolean(false);
final ReentrantLock nodeLock = new ReentrantLock();
final Map<String, Class<? extends NodeServer>> nodeClasses = new HashMap<>();
for (final AnyValue serconf : serconfs) {
Thread thread = new Thread() {
@@ -1677,7 +1691,8 @@ public final class Application {
server = new NodeHttpServer(Application.this, serconf);
} else {
if (!inited.get()) {
synchronized (nodeClasses) {
nodeLock.lock();
try {
if (!inited.getAndSet(true)) { //加载自定义的协议SOCKS
ClassFilter profilter = new ClassFilter(classLoader, NodeProtocol.class, NodeServer.class, (Class[]) null);
ClassFilter.Loader.load(home, classLoader, ((excludelibs != null ? (excludelibs + ";") : "") + serconf.getValue("excludelibs", "")).split(";"), profilter);
@@ -1696,6 +1711,8 @@ public final class Application {
nodeClasses.put(p, type);
}
}
} finally {
nodeLock.unlock();
}
}
Class<? extends NodeServer> nodeClass = nodeClasses.get(protocol);
@@ -1955,7 +1972,8 @@ public final class Application {
if (events == null || events.isEmpty()) {
return;
}
synchronized (envProperties) {
envPropertiesLock.lock();
try {
Properties envRegisterProps = new Properties();
Set<String> envRemovedKeys = new HashSet<>();
Properties envChangedProps = new Properties();
@@ -2349,6 +2367,8 @@ public final class Application {
clusterRemovedKeys.forEach(k -> this.clusterProperties.remove(k));
this.clusterProperties.putAll(clusterChangedProps);
}
} finally {
envPropertiesLock.unlock();
}
}

View File

@@ -79,14 +79,18 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
int size = batchSize;
while (--size > 0) {
log = logqueue.poll();
if (log == null) break;
if (log == null) {
break;
}
logList.add(log);
}
source.insert(logList);
}
} catch (Exception e) {
ErrorManager err = getErrorManager();
if (err != null) err.error(null, e, ErrorManager.WRITE_FAILURE);
if (err != null) {
err.error(null, e, ErrorManager.WRITE_FAILURE);
}
} finally {
logList.clear();
}
@@ -96,31 +100,49 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
}.start();
}
private synchronized void initSource() {
if (retryCount.get() < 1) return;
private void initSource() {
if (retryCount.get() < 1) {
return;
}
try {
Utility.sleep(3000); //如果SearchSource自身在打印日志需要停顿一点时间让SearchSource初始化完成
if (application == null) {
Utility.sleep(3000);
}
if (application == null) return;
if (application == null) {
return;
}
this.source = (SearchSource) application.loadDataSource(sourceResourceName, false);
if (retryCount.get() == 1 && this.source == null) System.err.println("ERROR: not load logging.source(" + sourceResourceName + ")");
if (retryCount.get() == 1 && this.source == null) {
System.err.println("ERROR: not load logging.source(" + sourceResourceName + ")");
}
} catch (Exception t) {
ErrorManager err = getErrorManager();
if (err != null) err.error(null, t, ErrorManager.WRITE_FAILURE);
if (err != null) {
err.error(null, t, ErrorManager.WRITE_FAILURE);
}
} finally {
retryCount.decrementAndGet();
}
}
private static boolean checkTagName(String name) { //只能是字母、数字、短横、点、%、$和下划线
if (name.isEmpty()) return false;
if (name.isEmpty()) {
return false;
}
for (char ch : name.toCharArray()) {
if (ch >= '0' && ch <= '9') continue;
if (ch >= 'a' && ch <= 'z') continue;
if (ch >= 'A' && ch <= 'Z') continue;
if (ch == '_' || ch == '-' || ch == '%' || ch == '$' || ch == '.') continue;
if (ch >= '0' && ch <= '9') {
continue;
}
if (ch >= 'a' && ch <= 'z') {
continue;
}
if (ch >= 'A' && ch <= 'Z') {
continue;
}
if (ch == '_' || ch == '-' || ch == '%' || ch == '$' || ch == '.') {
continue;
}
return false;
}
return true;
@@ -135,7 +157,9 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
}
String tagstr = manager.getProperty(cname + ".tag");
if (tagstr != null && !tagstr.isEmpty()) {
if (!checkTagName(tagstr.replaceAll("\\$\\{.+\\}", ""))) throw new RuntimeException("found illegal logging.property " + cname + ".tag = " + tagstr);
if (!checkTagName(tagstr.replaceAll("\\$\\{.+\\}", ""))) {
throw new RuntimeException("found illegal logging.property " + cname + ".tag = " + tagstr);
}
this.tag = tagstr.replace("${" + RESNAME_APP_NAME + "}", System.getProperty(RESNAME_APP_NAME, ""));
if (this.tag.contains("%")) {
this.tagDateFormat = this.tag;
@@ -169,11 +193,15 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
}
} catch (Exception e) {
}
if (getFormatter() == null) setFormatter(new SimpleFormatter());
if (getFormatter() == null) {
setFormatter(new SimpleFormatter());
}
String encodingstr = manager.getProperty(cname + ".encoding");
try {
if (encodingstr != null) setEncoding(encodingstr);
if (encodingstr != null) {
setEncoding(encodingstr);
}
} catch (Exception e) {
}
@@ -188,12 +216,18 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
@Override
public void publish(LogRecord log) {
if (!isLoggable(log)) return;
if (denyRegx != null && denyRegx.matcher(log.getMessage()).find()) return;
if (!isLoggable(log)) {
return;
}
if (denyRegx != null && denyRegx.matcher(log.getMessage()).find()) {
return;
}
if (log.getSourceClassName() != null) {
StackTraceElement[] ses = new Throwable().getStackTrace();
for (int i = 2; i < ses.length; i++) {
if (ses[i].getClassName().startsWith("java.util.logging")) continue;
if (ses[i].getClassName().startsWith("java.util.logging")) {
continue;
}
log.setSourceClassName(ses[i].getClassName());
log.setSourceMethodName(ses[i].getMethodName());
break;

View File

@@ -10,6 +10,7 @@ import java.lang.reflect.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.stream.Stream;
import org.redkale.annotation.*;
@@ -130,7 +131,8 @@ public class NodeHttpServer extends NodeServer {
if (loader != null) {
nodeService = (Service) loader.load(sncpResFactory, srcResourceName, srcObj, resourceName, field, attachment);
}
synchronized (regFactory) {
regFactory.lock();
try {
if (nodeService == null) {
nodeService = (Service) rf.find(resourceName, WebSocketNode.class);
}
@@ -156,6 +158,8 @@ public class NodeHttpServer extends NodeServer {
field.set(srcObj, nodeService);
logger.fine("Load Service " + nodeService);
return nodeService;
} finally {
regFactory.unlock();
}
} catch (Exception e) {
logger.log(Level.SEVERE, "WebSocketNode inject error", e);
@@ -250,8 +254,9 @@ public class NodeHttpServer extends NodeServer {
final List<AbstractMap.SimpleEntry<String, String[]>> webss = sb == null ? null : new ArrayList<>();
if (rest && serverConf != null) {
final List<Object> restedObjects = new ArrayList<>();
final ReentrantLock restedLock = new ReentrantLock();
for (AnyValue restConf : serverConf.getAnyValues("rest")) {
loadRestServlet(webSocketFilter, restConf, restedObjects, sb, rests, webss);
loadRestServlet(webSocketFilter, restConf, restedObjects, restedLock, sb, rests, webss);
}
}
int max = 0;
@@ -335,8 +340,11 @@ public class NodeHttpServer extends NodeServer {
}
@SuppressWarnings("unchecked")
protected void loadRestServlet(final ClassFilter<? extends WebSocket> webSocketFilter, final AnyValue restConf, final List<Object> restedObjects, final StringBuilder sb,
final List<AbstractMap.SimpleEntry<String, String[]>> rests, final List<AbstractMap.SimpleEntry<String, String[]>> webss) throws Exception {
protected void loadRestServlet(final ClassFilter<? extends WebSocket> webSocketFilter,
final AnyValue restConf, final List<Object> restedObjects,
final ReentrantLock restedLock, final StringBuilder sb,
final List<AbstractMap.SimpleEntry<String, String[]>> rests,
final List<AbstractMap.SimpleEntry<String, String[]>> webss) throws Exception {
if (!rest) {
return;
}
@@ -402,12 +410,15 @@ public class NodeHttpServer extends NodeServer {
if (!restFilter.accept(stypename)) {
return;
}
synchronized (restedObjects) {
restedLock.lock();
try {
if (restedObjects.contains(service)) {
logger.log(Level.WARNING, stype.getName() + " repeat create rest servlet, so ignore");
return;
}
restedObjects.add(service); //避免重复创建Rest对象
} finally {
restedLock.unlock();
}
HttpServlet servlet = httpServer.addRestServlet(serverClassLoader, service, userType, baseServletType, prefix);
if (servlet == null) {

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import java.util.function.IntFunction;
import org.redkale.util.Creator;
@@ -36,7 +37,9 @@ public class ArrayDecoder<T> implements Decodeable<Reader, T[]> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public ArrayDecoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -59,8 +62,11 @@ public class ArrayDecoder<T> implements Decodeable<Reader, T[]> {
this.componentArrayFunction = Creator.arrayFunction(this.componentClass);
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -83,12 +89,12 @@ public class ArrayDecoder<T> implements Decodeable<Reader, T[]> {
}
if (this.componentDecoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -6,6 +6,7 @@
package org.redkale.convert;
import java.lang.reflect.*;
import java.util.concurrent.locks.*;
/**
* 数组的序列化操作类 <br>
@@ -33,7 +34,9 @@ public class ArrayEncoder<T> implements Encodeable<Writer, T[]> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public ArrayEncoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -55,8 +58,11 @@ public class ArrayEncoder<T> implements Encodeable<Writer, T[]> {
this.subTypeFinal = (this.componentType instanceof Class) && Modifier.isFinal(((Class) this.componentType).getModifiers());
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -80,12 +86,12 @@ public class ArrayEncoder<T> implements Encodeable<Writer, T[]> {
Encodeable<Writer, Object> itemEncoder = this.componentEncoder;
if (itemEncoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
itemEncoder = this.componentEncoder;
}

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import org.redkale.util.Creator;
/**
@@ -32,7 +33,9 @@ public class CollectionDecoder<T> implements Decodeable<Reader, Collection<T>> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public CollectionDecoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -53,8 +56,11 @@ public class CollectionDecoder<T> implements Decodeable<Reader, Collection<T>> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -88,12 +94,12 @@ public class CollectionDecoder<T> implements Decodeable<Reader, Collection<T>> {
}
if (this.componentDecoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.Collection;
import java.util.concurrent.locks.*;
/**
* Collection的序列化操作类 <br>
@@ -27,7 +28,9 @@ public class CollectionEncoder<T> implements Encodeable<Writer, Collection<T>> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public CollectionEncoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -44,8 +47,11 @@ public class CollectionEncoder<T> implements Encodeable<Writer, Collection<T>> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -67,12 +73,12 @@ public class CollectionEncoder<T> implements Encodeable<Writer, Collection<T>> {
}
if (this.componentEncoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -14,6 +14,7 @@ import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.*;
import org.redkale.annotation.ConstructorParameters;
@@ -64,6 +65,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
final Set<String> ignoreMapColumns = new HashSet();
final ReentrantLock ignoreMapColumnLock = new ReentrantLock();
//key:需要屏蔽的字段value排除的字段名
private final ConcurrentHashMap<Class, Set<String>> ignoreAlls = new ConcurrentHashMap();
@@ -222,18 +225,15 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
return defProtobufConvert;
}
}
synchronized (loaderInited) {
if (!loaderInited.get()) {
Iterator<ConvertProvider> it = ServiceLoader.load(ConvertProvider.class).iterator();
RedkaleClassLoader.putServiceLoader(ConvertProvider.class);
while (it.hasNext()) {
ConvertProvider cl = it.next();
RedkaleClassLoader.putReflectionPublicConstructors(cl.getClass(), cl.getClass().getName());
if (cl.type() == ConvertType.PROTOBUF) {
defProtobufConvert = cl.convert();
}
if (loaderInited.compareAndSet(false, true)) {
Iterator<ConvertProvider> it = ServiceLoader.load(ConvertProvider.class).iterator();
RedkaleClassLoader.putServiceLoader(ConvertProvider.class);
while (it.hasNext()) {
ConvertProvider cl = it.next();
RedkaleClassLoader.putReflectionPublicConstructors(cl.getClass(), cl.getClass().getName());
if (cl.type() == ConvertType.PROTOBUF) {
defProtobufConvert = cl.convert();
}
loaderInited.set(true);
}
}
return type == ConvertType.PROTOBUF ? defProtobufConvert : null;
@@ -858,7 +858,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
public final void register(final Class type, boolean ignore, String... columns) {
if (type == Map.class) {
synchronized (ignoreMapColumns) {
ignoreMapColumnLock.lock();
try {
if (ignore) {
for (String column : columns) {
ignoreMapColumns.add(column);
@@ -868,6 +869,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
ignoreMapColumns.remove(column);
}
}
} finally {
ignoreMapColumnLock.unlock();
}
return;
}
@@ -878,7 +881,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
public final void register(final Class type, boolean ignore, Collection<String> columns) {
if (type == Map.class) {
synchronized (ignoreMapColumns) {
ignoreMapColumnLock.lock();
try {
if (ignore) {
for (String column : columns) {
ignoreMapColumns.add(column);
@@ -888,6 +892,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
ignoreMapColumns.remove(column);
}
}
} finally {
ignoreMapColumnLock.unlock();
}
return;
}

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import org.redkale.util.Creator;
/**
@@ -36,7 +37,9 @@ public class MapDecoder<K, V> implements Decodeable<Reader, Map<K, V>> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public MapDecoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -67,8 +70,11 @@ public class MapDecoder<K, V> implements Decodeable<Reader, Map<K, V>> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -95,12 +101,12 @@ public class MapDecoder<K, V> implements Decodeable<Reader, Map<K, V>> {
public Map<K, V> convertFrom(Reader in, DeMember member) {
if (this.keyDecoder == null || this.valueDecoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import java.util.function.BiFunction;
/**
@@ -30,7 +31,9 @@ public class MapEncoder<K, V> implements Encodeable<Writer, Map<K, V>> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
protected final Set<String> ignoreMapColumns;
@@ -45,13 +48,19 @@ public class MapEncoder<K, V> implements Encodeable<Writer, Map<K, V>> {
this.keyEncoder = factory.getAnyEncoder();
this.valueEncoder = factory.getAnyEncoder();
}
synchronized (factory.ignoreMapColumns) {
factory.ignoreMapColumnLock.lock();
try {
this.ignoreMapColumns = factory.ignoreMapColumns.isEmpty() ? null : new HashSet<>(factory.ignoreMapColumns);
} finally {
factory.ignoreMapColumnLock.unlock();
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -70,12 +79,12 @@ public class MapEncoder<K, V> implements Encodeable<Writer, Map<K, V>> {
if (this.keyEncoder == null || this.valueEncoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import org.redkale.convert.ext.StringSimpledCoder;
import org.redkale.util.*;
@@ -41,7 +42,9 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
protected ObjectDecoder(Type type) {
this.type = ((type instanceof Class) && ((Class) type).isInterface()) ? Object.class : type;
@@ -295,8 +298,11 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -319,12 +325,12 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
return (T) factory.loadDecoder(factory.getEntityAlias(clazz)).convertFrom(objin);
}
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
if (this.creator == null) {

View File

@@ -7,6 +7,7 @@ package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import org.redkale.annotation.ConstructorParameters;
import org.redkale.convert.ext.StringSimpledCoder;
import org.redkale.util.*;
@@ -36,7 +37,9 @@ public class ObjectEncoder<W extends Writer, T> implements Encodeable<W, T> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
protected ObjectEncoder(Type type) {
this.type = type;
@@ -252,8 +255,11 @@ public class ObjectEncoder<W extends Writer, T> implements Encodeable<W, T> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -265,12 +271,12 @@ public class ObjectEncoder<W extends Writer, T> implements Encodeable<W, T> {
return;
}
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
if (value.getClass() != this.typeClass && !this.type.equals(out.specify())) {

View File

@@ -6,7 +6,8 @@
package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
import java.util.Optional;
import java.util.concurrent.locks.*;
/**
* Optional 的SimpledCoder实现
@@ -30,7 +31,9 @@ public class OptionalCoder<R extends Reader, W extends Writer, T> extends Simple
protected volatile boolean inited = false;
private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
@SuppressWarnings("unchecked")
public OptionalCoder(final ConvertFactory factory, final Type type) {
@@ -61,8 +64,11 @@ public class OptionalCoder<R extends Reader, W extends Writer, T> extends Simple
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -75,12 +81,12 @@ public class OptionalCoder<R extends Reader, W extends Writer, T> extends Simple
}
if (this.encoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
@@ -91,12 +97,12 @@ public class OptionalCoder<R extends Reader, W extends Writer, T> extends Simple
public Optional<T> convertFrom(R in) {
if (this.decoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -5,10 +5,9 @@
*/
package org.redkale.convert;
import org.redkale.util.Creator;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.*;
import java.util.stream.Stream;
/**
@@ -32,7 +31,9 @@ public class StreamDecoder<T> implements Decodeable<Reader, Stream<T>> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public StreamDecoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -47,8 +48,11 @@ public class StreamDecoder<T> implements Decodeable<Reader, Stream<T>> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -69,12 +73,12 @@ public class StreamDecoder<T> implements Decodeable<Reader, Stream<T>> {
}
if (this.componentDecoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -6,6 +6,7 @@
package org.redkale.convert;
import java.lang.reflect.*;
import java.util.concurrent.locks.*;
import java.util.stream.Stream;
/**
@@ -27,7 +28,9 @@ public class StreamEncoder<T> implements Encodeable<Writer, Stream<T>> {
protected volatile boolean inited = false;
protected final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public StreamEncoder(final ConvertFactory factory, final Type type) {
this.type = type;
@@ -44,8 +47,11 @@ public class StreamEncoder<T> implements Encodeable<Writer, Stream<T>> {
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
@@ -68,12 +74,12 @@ public class StreamEncoder<T> implements Encodeable<Writer, Stream<T>> {
}
if (this.componentEncoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}

View File

@@ -8,6 +8,7 @@ package org.redkale.mq;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.*;
import java.util.stream.Collectors;
import org.redkale.annotation.AutoLoad;
@@ -46,9 +47,13 @@ public abstract class MessageAgent implements Resourcable {
protected MessageProducers sncpProducer;
protected final Object httpProducerLock = new Object();
protected final ReentrantLock httpProducerLock = new ReentrantLock();
protected final Object sncpProducerLock = new Object();
protected final ReentrantLock sncpProducerLock = new ReentrantLock();
protected final ReentrantLock httpNodesLock = new ReentrantLock();
protected final ReentrantLock sncpNodesLock = new ReentrantLock();
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
@@ -222,7 +227,8 @@ public abstract class MessageAgent implements Resourcable {
//获取指定topic的生产处理器
public MessageProducers getSncpProducer() {
if (this.sncpProducer == null) {
synchronized (sncpProducerLock) {
sncpProducerLock.lock();
try {
if (this.sncpProducer == null) {
long s = System.currentTimeMillis();
MessageProducer[] producers = new MessageProducer[producerCount];
@@ -237,6 +243,8 @@ public abstract class MessageAgent implements Resourcable {
}
this.sncpProducer = new MessageProducers(producers);
}
} finally {
sncpProducerLock.unlock();
}
}
return this.sncpProducer;
@@ -244,7 +252,8 @@ public abstract class MessageAgent implements Resourcable {
public MessageProducers getHttpProducer() {
if (this.httpProducer == null) {
synchronized (httpProducerLock) {
httpProducerLock.lock();
try {
if (this.httpProducer == null) {
long s = System.currentTimeMillis();
MessageProducer[] producers = new MessageProducer[producerCount];
@@ -259,6 +268,8 @@ public abstract class MessageAgent implements Resourcable {
}
this.httpProducer = new MessageProducers(producers);
}
} finally {
httpProducerLock.unlock();
}
}
return this.httpProducer;
@@ -282,7 +293,7 @@ public abstract class MessageAgent implements Resourcable {
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String[] topics, String group, MessageProcessor processor);
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) {
return;
@@ -299,14 +310,19 @@ public abstract class MessageAgent implements Resourcable {
}
String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service);
if (messageNodes.containsKey(consumerid)) {
throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
httpNodesLock.lock();
try {
if (messageNodes.containsKey(consumerid)) {
throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
}
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor)));
} finally {
httpNodesLock.unlock();
}
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor)));
}
public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
public final void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) {
return;
@@ -317,11 +333,16 @@ public abstract class MessageAgent implements Resourcable {
}
String topic = generateSncpReqTopic(service);
String consumerid = generateSncpConsumerid(topic, service);
if (messageNodes.containsKey(consumerid)) {
throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
sncpNodesLock.lock();
try {
if (messageNodes.containsKey(consumerid)) {
throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
}
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor)));
} finally {
sncpNodesLock.unlock();
}
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor)));
}
//格式: sncp.req.user

View File

@@ -11,6 +11,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.*;
@@ -58,7 +59,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
private Consumer<ByteBuffer> writeBufferConsumer;
private final Object pipelineLock = new Object();
private final ReentrantLock pipelineLock = new ReentrantLock();
private ByteBufferWriter pipelineWriter;
@@ -519,7 +520,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
//返回pipelineCount个数数据是否全部写入完毕
public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) {
synchronized (pipelineLock) {
pipelineLock.lock();
try {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());
@@ -547,6 +549,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
return false;
}
} finally {
pipelineLock.unlock();
}
}
@@ -557,7 +561,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
//返回pipelineCount个数数据是否全部写入完毕
public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) {
synchronized (pipelineLock) {
pipelineLock.lock();
try {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());
@@ -585,6 +590,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
return false;
}
} finally {
pipelineLock.unlock();
}
}

View File

@@ -31,10 +31,10 @@ public class AsyncIOGroup extends AsyncGroup {
private boolean started;
private boolean closed;
private boolean skipClose;
private final AtomicBoolean closed = new AtomicBoolean();
final AsyncIOThread[] ioReadThreads;
final AsyncIOThread[] ioWriteThreads;
@@ -132,7 +132,7 @@ public class AsyncIOGroup extends AsyncGroup {
if (started) {
return this;
}
if (closed) {
if (closed.get()) {
throw new RuntimeException("group is closed");
}
for (int i = 0; i < this.ioReadThreads.length; i++) {
@@ -162,21 +162,19 @@ public class AsyncIOGroup extends AsyncGroup {
return this;
}
public synchronized AsyncIOGroup dispose() {
if (closed) {
return this;
public AsyncIOGroup dispose() {
if (closed.compareAndSet(false, true)) {
for (AsyncIOThread t : this.ioReadThreads) {
t.close();
}
for (AsyncIOThread t : this.ioWriteThreads) {
t.close();
}
if (connectThread != null) {
connectThread.close();
}
this.timeoutExecutor.shutdownNow();
}
for (AsyncIOThread t : this.ioReadThreads) {
t.close();
}
for (AsyncIOThread t : this.ioWriteThreads) {
t.close();
}
if (connectThread != null) {
connectThread.close();
}
this.timeoutExecutor.shutdownNow();
closed = true;
return this;
}

View File

@@ -10,6 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.util.*;
@@ -38,7 +39,7 @@ public class AsyncIOThread extends WorkThread {
private final Queue<Consumer<Selector>> registerQueue = Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>();
private boolean closed;
private final AtomicBoolean closed = new AtomicBoolean();
public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, null);
@@ -50,7 +51,7 @@ public class AsyncIOThread extends WorkThread {
}
protected boolean isClosed() {
return closed;
return closed.get();
}
public static AsyncIOThread currAsyncIOThread() {
@@ -148,7 +149,7 @@ public class AsyncIOThread extends WorkThread {
try {
register.accept(selector);
} catch (Throwable t) {
if (!this.closed) {
if (!this.closed.get()) {
logger.log(Level.INFO, getName() + " register run failed", t);
}
}
@@ -158,7 +159,7 @@ public class AsyncIOThread extends WorkThread {
try {
command.run();
} catch (Throwable t) {
if (!this.closed) {
if (!this.closed.get()) {
logger.log(Level.INFO, getName() + " command run failed", t);
}
}
@@ -202,16 +203,15 @@ public class AsyncIOThread extends WorkThread {
}
}
} catch (Throwable ex) {
if (!this.closed) {
if (!this.closed.get()) {
logger.log(Level.FINE, getName() + " selector run failed", ex);
}
}
}
}
public synchronized void close() {
if (!this.closed) {
this.closed = true;
public void close() {
if (this.closed.compareAndSet(false, true)) {
try {
this.selector.close();
} catch (Exception e) {

View File

@@ -471,7 +471,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
ByteBuffer bb;
@Override
public synchronized int read() throws IOException {
public int read() throws IOException {
if (bb == null || !bb.hasRemaining()) {
int r = readBuffer();
if (r < 1) {
@@ -482,7 +482,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
@Override
public synchronized int read(byte b[], int off, int len) throws IOException {
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {

View File

@@ -8,6 +8,7 @@ package org.redkale.net;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.stream.Stream;
@@ -36,11 +37,11 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
private final LongAdder illegalRequestCounter = new LongAdder(); //错误请求次数
private final Object servletLock = new Object();
private final ReentrantLock servletLock = new ReentrantLock();
private Set<S> servlets = new HashSet<>();
private final Object mappingLock = new Object();
private final ReentrantLock mappingLock = new ReentrantLock();
private Map<K, S> mappings = new HashMap<>();
@@ -48,6 +49,8 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
private final List<Filter<C, R, P>> filters = new ArrayList<>();
protected final ReentrantLock filtersLock = new ReentrantLock();
protected Application application;
protected Filter<C, R, P> headFilter;
@@ -61,65 +64,84 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
protected void putServlet(S servlet) {
synchronized (servletLock) {
servletLock.lock();
try {
Set<S> newservlets = new HashSet<>(servlets);
newservlets.add(servlet);
this.servlets = newservlets;
} finally {
servletLock.unlock();
}
}
protected void removeServlet(S servlet) {
synchronized (servletLock) {
servletLock.lock();
try {
Set<S> newservlets = new HashSet<>(servlets);
newservlets.remove(servlet);
this.servlets = newservlets;
doAfterRemove(servlet);
} finally {
servletLock.unlock();
}
}
public boolean containsServlet(Class<? extends S> servletClass) {
synchronized (servletLock) {
servletLock.lock();
try {
for (S servlet : new HashSet<>(servlets)) {
if (servlet.getClass().equals(servletClass)) {
return true;
}
}
return false;
} finally {
servletLock.unlock();
}
}
public boolean containsServlet(String servletClassName) {
synchronized (servletLock) {
servletLock.lock();
try {
for (S servlet : new HashSet<>(servlets)) {
if (servlet.getClass().getName().equals(servletClassName)) {
return true;
}
}
return false;
} finally {
servletLock.unlock();
}
}
protected void putMapping(K key, S servlet) {
synchronized (mappingLock) {
mappingLock.lock();
try {
Map<K, S> newmappings = new HashMap<>(mappings);
newmappings.put(key, servlet);
this.mappings = newmappings;
} finally {
mappingLock.unlock();
}
}
protected void removeMapping(K key) {
synchronized (mappingLock) {
mappingLock.lock();
try {
if (mappings.containsKey(key)) {
Map<K, S> newmappings = new HashMap<>(mappings);
S s = newmappings.remove(key);
this.mappings = newmappings;
doAfterRemove(s);
}
} finally {
mappingLock.unlock();
}
}
protected void removeMapping(S servlet) {
synchronized (mappingLock) {
mappingLock.lock();
try {
List<K> keys = new ArrayList<>();
Map<K, S> newmappings = new HashMap<>(mappings);
for (Map.Entry<K, S> en : newmappings.entrySet()) {
@@ -130,6 +152,8 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
for (K key : keys) newmappings.remove(key);
this.mappings = newmappings;
doAfterRemove(servlet);
} finally {
mappingLock.unlock();
}
}
@@ -146,7 +170,8 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
if (application != null && application.isCompileMode()) {
return;
}
synchronized (filters) {
filtersLock.lock();
try {
if (!filters.isEmpty()) {
Collections.sort(filters);
for (Filter<C, R, P> filter : filters) {
@@ -159,28 +184,36 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
filter = filter._next;
}
}
} finally {
filtersLock.unlock();
}
}
@Override
@SuppressWarnings("unchecked")
public void destroy(C context, AnyValue config) {
synchronized (filters) {
filtersLock.lock();
try {
if (!filters.isEmpty()) {
for (Filter filter : filters) {
filter.destroy(context, config);
}
}
} finally {
filtersLock.unlock();
}
}
@SuppressWarnings("unchecked")
public void addFilter(Filter<C, R, P> filter, AnyValue conf) {
filter._conf = conf;
synchronized (filters) {
filtersLock.lock();
try {
this.filters.add(filter);
this.allFilterAsync = this.allFilterAsync && isAsync(filter);
Collections.sort(this.filters);
} finally {
filtersLock.unlock();
}
}
@@ -224,7 +257,8 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
if (this.headFilter == null || predicate == null) {
return null;
}
synchronized (filters) {
filtersLock.lock();
try {
Filter filter = this.headFilter;
Filter prev = null;
do {
@@ -251,6 +285,8 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
this.allFilterAsync = async;
}
return (T) filter;
} finally {
filtersLock.unlock();
}
}

View File

@@ -52,7 +52,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
private final AtomicInteger connSeqno = new AtomicInteger();
private boolean closed;
private final AtomicBoolean closed = new AtomicBoolean();
protected ScheduledFuture timeoutFuture;
@@ -180,28 +180,26 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
protected void handlePingResult(C conn, P result) {
}
public synchronized void close() {
if (this.closed) {
return;
}
this.timeoutScheduler.shutdownNow();
for (ClientConnection conn : this.connArray) {
if (conn == null) {
continue;
}
final R closeReq = closeRequestSupplier == null ? null : closeRequestSupplier.get();
if (closeReq == null) {
conn.dispose(null);
} else {
try {
conn.writeChannel(closeReq).get(1, TimeUnit.SECONDS);
} catch (Exception e) {
public void close() {
if (closed.compareAndSet(false, true)) {
this.timeoutScheduler.shutdownNow();
for (ClientConnection conn : this.connArray) {
if (conn == null) {
continue;
}
final R closeReq = closeRequestSupplier == null ? null : closeRequestSupplier.get();
if (closeReq == null) {
conn.dispose(null);
} else {
try {
conn.writeChannel(closeReq).get(1, TimeUnit.SECONDS);
} catch (Exception e) {
}
conn.dispose(null);
}
conn.dispose(null);
}
group.close();
}
group.close();
this.closed = true;
}
public final CompletableFuture<P> sendAsync(R request) {

View File

@@ -10,6 +10,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.function.*;
import org.redkale.net.*;
@@ -39,6 +40,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final List<ClientFuture> pauseRequests = new CopyOnWriteArrayList<>();
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition pauseCondition = pauseLock.newCondition();
protected final AsyncConnection channel;
private final ClientCodec<R, P> codec;
@@ -181,6 +186,25 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
}
void signalPauseRequest() {
pauseLock.lock();
try {
pauseCondition.signalAll();
} finally {
pauseLock.unlock();
}
}
void awaitPauseRequest() {
pauseLock.lock();
try {
pauseCondition.await(3_000, TimeUnit.SECONDS);
} catch (Exception e) {
} finally {
pauseLock.unlock();
}
}
public boolean isAuthenticated() {
return authenticated;
}

View File

@@ -51,9 +51,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
} finally {
conn.pauseResuming.set(false);
conn.pauseWriting.set(false);
synchronized (conn.pauseRequests) {
conn.pauseRequests.notify();
}
conn.signalPauseRequest();
}
}
}
@@ -77,12 +75,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
entry.conn.offerRespFuture(entry);
if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) {
try {
synchronized (entry.conn.pauseRequests) {
entry.conn.pauseRequests.wait(3_000);
}
} catch (InterruptedException ie) {
}
entry.conn.awaitPauseRequest();
}
entry.conn.pauseRequests.add(entry);
} else {
@@ -94,12 +87,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
entry.conn.offerRespFuture(entry);
if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) {
try {
synchronized (entry.conn.pauseRequests) {
entry.conn.pauseRequests.wait(3_000);
}
} catch (InterruptedException ie) {
}
entry.conn.awaitPauseRequest();
}
entry.conn.pauseRequests.add(entry);
} else {

View File

@@ -63,14 +63,12 @@ public class HttpContext extends Context {
//
// protected void addRequestURINode(String path) {
// RequestURINode node = new RequestURINode(path);
// synchronized (this) {
// if (this.uriCacheNodes != null) {
// for (int i = 0; i < uriCacheNodes.length; i++) {
// if (uriCacheNodes[i].path.equals(path)) return;
// }
// }
// this.uriCacheNodes = Utility.append(this.uriCacheNodes, node);
// }
// }
@Override
protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) {
@@ -85,7 +83,7 @@ public class HttpContext extends Context {
protected void updateWebSocketWriteIOThread(WebSocket webSocket) {
WebSocketWriteIOThread writeIOThread = webSocketWriterIOThreadFunc.apply(webSocket);
updateWriteIOThread(webSocket._channel, writeIOThread);
webSocket._writeIOThread = writeIOThread;
webSocket._writeIOThread = writeIOThread;
}
protected String createSessionid() {
@@ -96,16 +94,11 @@ public class HttpContext extends Context {
@SuppressWarnings("unchecked")
protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
Creator<H> creator = asyncHandlerCreators.get(handlerClass);
if (creator == null) {
creator = createAsyncHandlerCreator(handlerClass);
asyncHandlerCreators.put(handlerClass, creator);
}
return creator;
return asyncHandlerCreators.computeIfAbsent(handlerClass, c -> createAsyncHandlerCreator(c));
}
@SuppressWarnings("unchecked")
private static synchronized <H extends CompletionHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) {
private static <H extends CompletionHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) {
//生成规则与SncpAsyncHandler.Factory 很类似
//-------------------------------------------------------------
final boolean handlerinterface = handlerClass.isInterface();

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
import java.util.regex.Pattern;
@@ -40,7 +41,9 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
protected final Map<String, Class> allMapStrings = new HashMap<>();
private final Object excludeLock = new Object();
private final ReentrantLock allMapLock = new ReentrantLock();
private final ReentrantLock excludeLock = new ReentrantLock();
protected HttpContext context;
@@ -54,7 +57,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
private List<HttpServlet> removeHttpServlet(final Predicate<MappingEntry> predicateEntry, final Predicate<Map.Entry<String, WebSocketServlet>> predicateFilter) {
List<HttpServlet> servlets = new ArrayList<>();
synchronized (allMapStrings) {
allMapLock.lock();
try {
List<String> keys = new ArrayList<>();
if (regxArray != null) {
for (MappingEntry me : regxArray) {
@@ -96,6 +100,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
}
}
this.lastRunServlet = null;
} finally {
allMapLock.unlock();
}
return servlets;
}
@@ -174,7 +180,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
if (urlRegx == null || urlRegx.isEmpty()) {
return false;
}
synchronized (excludeLock) {
excludeLock.lock();
try {
if (forbidURIMaps != null && forbidURIMaps.containsKey(urlRegx)) {
return false;
}
@@ -198,6 +205,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
forbidURIMaps.put(urlRegx, predicate);
forbidURIPredicates = Utility.append(forbidURIPredicates, predicate);
return true;
} finally {
excludeLock.unlock();
}
}
@@ -206,7 +215,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
if (urlreg == null || urlreg.isEmpty()) {
return false;
}
synchronized (excludeLock) {
excludeLock.lock();
try {
if (forbidURIMaps == null || forbidURIPredicates == null || !forbidURIMaps.containsKey(urlreg)) {
return false;
}
@@ -231,6 +241,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
}
}
return true;
} finally {
excludeLock.unlock();
}
}
@@ -375,7 +387,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
context.lazyHeaders = false; //启动后运行过程中执行addServlet
}
}
synchronized (allMapStrings) { //需要整段锁住
allMapLock.lock();
try { //需要整段锁住
for (String mappingPath : mappingPaths) {
if (mappingPath == null) {
continue;
@@ -433,6 +446,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
setServletConf(servlet, conf);
servlet._prefix = prefix.toString();
putServlet(servlet);
} finally {
allMapLock.unlock();
}
}
@@ -458,12 +473,15 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
public void postStart(HttpContext context, AnyValue config) {
List filters = getFilters();
synchronized (filters) {
filtersLock.lock();
try {
if (!filters.isEmpty()) {
for (Object filter : filters) {
((HttpFilter) filter).postStart(context, config);
}
}
} finally {
filtersLock.unlock();
}
this.resourceHttpServlet.postStart(context, config);
getServlets().forEach(s -> {

View File

@@ -14,6 +14,7 @@ import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.redkale.boot.Application;
@@ -43,7 +44,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private ObjectPool<ByteBuffer> safeBufferPool;
private final Object groupLock = new Object();
private final ReentrantLock groupLock = new ReentrantLock();
private WebSocketAsyncGroup asyncGroup;
@@ -546,12 +547,15 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
HttpContext rs = new HttpContext(contextConfig);
rs.webSocketWriterIOThreadFunc = ws -> {
if (asyncGroup == null) {
synchronized (groupLock) {
groupLock.lock();
try {
if (asyncGroup == null) {
WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, bufferCapacity, safeBufferPool);
g.start();
asyncGroup = g;
}
} finally {
groupLock.unlock();
}
}
return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread();

View File

@@ -11,6 +11,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
@@ -126,7 +127,7 @@ public abstract class WebSocket<G extends Serializable, T> {
boolean initiateClosed; //收到客户端发送的CLOSE消息
private boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean();
protected WebSocket() {
}
@@ -956,14 +957,7 @@ public abstract class WebSocket<G extends Serializable, T> {
//closeRunner
CompletableFuture<Void> kill(int code, String reason) {
if (closed) {
return null;
}
synchronized (this) {
if (closed) {
return null;
}
closed = true;
if (closed.compareAndSet(false, true)) {
if (_channel == null) {
return null;
}
@@ -974,6 +968,8 @@ public abstract class WebSocket<G extends Serializable, T> {
return future;
}
return CompletableFuture.allOf(future, closeFuture);
} else {
return null;
}
}
@@ -983,7 +979,7 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return boolean
*/
public final boolean isClosed() {
return this.closed;
return this.closed.get();
}
@Override

View File

@@ -71,8 +71,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private final MessageDigest digest = getMessageDigest();
private final BiConsumer<WebSocket, Object> restMessageConsumer = createRestOnMessageConsumer();
protected Type messageRestType; //RestWebSocket时会被修改
@@ -276,10 +274,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//onOpen成功或者存在delayPackets
webSocket._sessionid = sessionid;
request.setKeepAlive(true);
byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes();
synchronized (digest) {
bytes = digest.digest(bytes);
}
byte[] bytes = sha1(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
response.setStatus(101);
response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket");
@@ -295,7 +291,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer);
//webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket);
response.getContext().updateWebSocketWriteIOThread(webSocket);
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) {
@@ -422,9 +418,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return null;
}
private static MessageDigest getMessageDigest() {
private static byte[] sha1(String str) {
try {
return MessageDigest.getInstance("SHA-1");
return MessageDigest.getInstance("SHA-1").digest(str.getBytes());
} catch (Exception e) {
throw new HttpException(e);
}

View File

@@ -6,6 +6,7 @@
package org.redkale.net.sncp;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
import org.redkale.net.DispatcherServlet;
import org.redkale.service.Service;
import org.redkale.util.*;
@@ -19,13 +20,14 @@ import org.redkale.util.*;
*/
public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContext, SncpRequest, SncpResponse, SncpServlet> {
private final Object sncplock = new Object();
private final ReentrantLock sncplock = new ReentrantLock();
private final byte[] pongBytes = Sncp.getPongBytes();
@Override
public void addServlet(SncpServlet servlet, Object attachment, AnyValue conf, Uint128... mappings) {
synchronized (sncplock) {
sncplock.lock();
try {
for (SncpServlet s : getServlets()) {
if (s.service == servlet.service) {
throw new SncpException(s.service + " repeat addSncpServlet");
@@ -34,12 +36,15 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
setServletConf(servlet, conf);
putMapping(servlet.getServiceid(), servlet);
putServlet(servlet);
} finally {
sncplock.unlock();
}
}
public <T> SncpServlet removeSncpServlet(Service service) {
SncpServlet rs = null;
synchronized (sncplock) {
sncplock.lock();
try {
for (SncpServlet servlet : getServlets()) {
if (servlet.service == service) {
rs = servlet;
@@ -50,6 +55,8 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
removeMapping(rs);
removeServlet(rs);
}
} finally {
sncplock.unlock();
}
return rs;
}

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.stream.Stream;
import org.redkale.annotation.AutoLoad;
@@ -41,7 +42,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
//@since 2.8.0 复用另一source资源
public static final String DATA_SOURCE_RESOURCE = "resource";
//@since 2.7.0 格式: x.x.x.x:yyyy
public static final String DATA_SOURCE_PROXY_ADDRESS = "proxy-address";
@@ -111,7 +112,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
//@since 2.7.0
public static final String DATA_SOURCE_TABLECOPY_SQLTEMPLATE = "tablecopy-sqltemplate";
private final Object executorLock = new Object();
private final ReentrantLock executorLock = new ReentrantLock();
private int sourceThreads = Utility.cpus();
@@ -285,10 +286,13 @@ public abstract class AbstractDataSource extends AbstractService implements Data
protected ExecutorService getExecutor() {
ExecutorService executor = this.sourceExecutor;
if (executor == null) {
synchronized (executorLock) {
executorLock.lock();
try {
if (this.sourceExecutor == null) {
this.sourceExecutor = WorkThread.createExecutor(sourceThreads, "Redkale-DataSource-WorkThread-" + resourceName() + "-%s");
}
} finally {
executorLock.unlock();
}
executor = this.sourceExecutor;
}

View File

@@ -10,6 +10,7 @@ import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
import java.util.regex.Pattern;
@@ -54,6 +55,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
protected final ConcurrentHashMap<String, CacheEntry<Object>> container = new ConcurrentHashMap<>();
protected final ReentrantLock containerLock = new ReentrantLock();
protected final BiConsumer futureCompleteConsumer = (r, t) -> {
if (t != null) {
logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t);
@@ -196,7 +199,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
public long hincrby(final String key, String field, long num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
containerLock.lock();
try {
entry = container.get(key);
if (entry == null) {
ConcurrentHashMap<String, Serializable> map = new ConcurrentHashMap();
@@ -204,11 +208,14 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map);
container.put(key, entry);
}
} finally {
containerLock.unlock();
}
}
Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
synchronized (entry.mapValue) {
entry.mapLock.lock();
try {
if (!(val instanceof AtomicLong)) {
if (val == null) {
val = new AtomicLong();
@@ -217,6 +224,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.mapValue.put(field, val);
}
} finally {
entry.mapLock.unlock();
}
}
return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num);
@@ -226,7 +235,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
public double hincrbyFloat(final String key, String field, double num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
containerLock.lock();
try {
entry = container.get(key);
if (entry == null) {
ConcurrentHashMap<String, Serializable> map = new ConcurrentHashMap();
@@ -234,11 +244,14 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map);
container.put(key, entry);
}
} finally {
containerLock.unlock();
}
}
Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
synchronized (entry.mapValue) {
entry.mapLock.lock();
try {
if (!(val instanceof AtomicLong)) {
if (val == null) {
val = new AtomicLong();
@@ -247,6 +260,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.mapValue.put(field, val);
}
} finally {
entry.mapLock.unlock();
}
}
return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num)));
@@ -1068,12 +1083,15 @@ public final class CacheMemorySource extends AbstractCacheSource {
public long incrby(final String key, long num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
containerLock.lock();
try {
entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null);
container.put(key, entry);
}
} finally {
containerLock.unlock();
}
}
return ((AtomicLong) entry.objectValue).addAndGet(num);
@@ -1083,12 +1101,15 @@ public final class CacheMemorySource extends AbstractCacheSource {
public double incrbyFloat(final String key, double num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
containerLock.lock();
try {
entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.DOUBLE, key, new AtomicLong(), null, null, null);
container.put(key, entry);
}
} finally {
containerLock.unlock();
}
}
Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num));
@@ -1744,6 +1765,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
ConcurrentHashMap<String, Serializable> mapValue;
final ReentrantLock mapLock = new ReentrantLock();
CopyOnWriteArraySet<T> csetValue;
ConcurrentLinkedQueue<T> listValue;

View File

@@ -284,7 +284,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
st.close();
} else { //分库分表
synchronized (info.disTableLock()) {
info.disTableLock().lock();
try {
final Set<String> newCatalogs = new LinkedHashSet<>();
final List<String> tableCopys = new ArrayList<>();
prepareInfos.forEach((t, p) -> {
@@ -375,6 +376,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
}
} finally {
info.disTableLock().unlock();
}
}
if (info.getTableStrategy() == null) {
@@ -2439,7 +2442,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
@ResourceListener
public synchronized void onResourceChange(ResourceEvent[] events) {
public void onResourceChange(ResourceEvent[] events) {
String newUrl = this.url;
int newConnectTimeoutSeconds = this.connectTimeoutSeconds;
int newMaxconns = this.maxConns;
@@ -2481,7 +2484,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
public synchronized Connection pollConnection() {
public Connection pollConnection() {
Connection conn = queue.poll();
if (conn == null) {
if (usingCounter.intValue() >= maxConns) {

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.annotation.Comment;
@@ -36,6 +37,8 @@ public final class EntityInfo<T> {
//全局静态资源
private static final ConcurrentHashMap<Class, EntityInfo> entityInfos = new ConcurrentHashMap<>();
private static final ReentrantLock infosLock = new ReentrantLock();
//日志
private static final Logger logger = Logger.getLogger(EntityInfo.class.getSimpleName());
@@ -104,6 +107,8 @@ public final class EntityInfo<T> {
//用于存在database.table_20160202类似这种分布式表, 服务分布式部署时不存在的表名不一定真实不存在
private final Set<String> disTables = new CopyOnWriteArraySet<>();
private final ReentrantLock disTableLock = new ReentrantLock();
//不能为null的字段名
private final Set<String> notNullColumns = new CopyOnWriteArraySet<>();
@@ -226,7 +231,8 @@ public final class EntityInfo<T> {
if (rs != null && (rs.cache == null || rs.cache.isFullLoaded())) {
return rs;
}
synchronized (entityInfos) {
infosLock.lock();
try {
rs = entityInfos.get(clazz);
if (rs == null) {
rs = new EntityInfo(clazz, cacheForbidden, conf, source, fullloader);
@@ -239,6 +245,8 @@ public final class EntityInfo<T> {
rs.cache.fullLoadAsync();
}
return rs;
} finally {
infosLock.unlock();
}
}
@@ -771,8 +779,8 @@ public final class EntityInfo<T> {
return tableStrategy;
}
public Object disTableLock() {
return disTables;
public ReentrantLock disTableLock() {
return disTableLock;
}
public boolean containsDisTable(String tableKey) {

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.redkale.persistence.Transient;
import static org.redkale.source.FilterExpress.*;
import org.redkale.util.*;
@@ -23,7 +24,9 @@ import org.redkale.util.*;
*/
public final class FilterNodeBean<T extends FilterBean> implements Comparable<FilterNodeBean<T>> {
private static final ConcurrentHashMap<Class, FilterNodeBean> beanodes = new ConcurrentHashMap<>();
private static final ReentrantLock beanLock = new ReentrantLock();
private static final ConcurrentHashMap<Class, FilterNodeBean> beanNodes = new ConcurrentHashMap<>();
private Attribute<T, Serializable> beanAttr;
@@ -147,17 +150,20 @@ public final class FilterNodeBean<T extends FilterBean> implements Comparable<Fi
}
public static FilterNodeBean load(Class<? extends FilterBean> clazz) {
FilterNodeBean rs = beanodes.get(clazz);
FilterNodeBean rs = beanNodes.get(clazz);
if (rs != null) {
return rs;
}
synchronized (beanodes) {
rs = beanodes.get(clazz);
beanLock.lock();
try {
rs = beanNodes.get(clazz);
if (rs == null) {
rs = createFilterNodeBean(clazz);
beanodes.put(clazz, rs);
beanNodes.put(clazz, rs);
}
return rs;
} finally {
beanLock.unlock();
}
}

View File

@@ -1042,7 +1042,6 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
return newkvs; // Use the new table already
// TODO - use a wait with timeout, so we'll wakeup as soon as the new table
// is ready, or after the timeout in any case.
//synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup
// For now, sleep a tad and see if the 2 guys already trying to make
// the table actually get around to making it happen.
try {
@@ -1875,10 +1874,8 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
// // We could use a wait with timeout, so we'll wakeup as soon as the new
// // table is ready, or after the timeout in any case. Annoyingly, this
// // breaks the non-blocking property - so for now we just briefly sleep.
// //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup
// try { Thread.sleep(r>>17); } catch( InterruptedException e ) { }
// if( master._cat != this ) return old;
//}
CAT newcat = new CAT(this, t.length * 2, 0);
// Take 1 stab at updating the CAT with the new larger size. If this

View File

@@ -11,8 +11,8 @@ import java.net.*;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
/**
* Redkale内部ClassLoader
@@ -65,6 +65,8 @@ public class RedkaleClassLoader extends URLClassLoader {
private static final ConcurrentHashMap<String, Map<String, Object>> reflectionMap = new ConcurrentHashMap<>();
private static final ReentrantLock reflectionLock = new ReentrantLock();
public RedkaleClassLoader(ClassLoader parent) {
super(new URL[0], parent);
}
@@ -74,7 +76,9 @@ public class RedkaleClassLoader extends URLClassLoader {
}
public static URI getConfResourceAsURI(String confURI, String file) {
if (file.startsWith("http:") || file.startsWith("https:") || file.startsWith("ftp:")) return URI.create(file);
if (file.startsWith("http:") || file.startsWith("https:") || file.startsWith("ftp:")) {
return URI.create(file);
}
if (confURI != null && !confURI.contains("!")) { //带!的是 /usr/xxx.jar!/META-INF/conf/xxx
File f = new File(URI.create(confURI).getPath(), file);
if (f.isFile() && f.canRead()) {
@@ -146,13 +150,16 @@ public class RedkaleClassLoader extends URLClassLoader {
}
public static void putReflectionClass(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap<>();
map.put("name", name);
reflectionMap.put(name, map);
}
} finally {
reflectionLock.unlock();
}
}
@@ -166,7 +173,8 @@ public class RedkaleClassLoader extends URLClassLoader {
}
public static void putReflectionField(String name, Field field) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -186,13 +194,18 @@ public class RedkaleClassLoader extends URLClassLoader {
break;
}
}
if (!contains) list.add((Map) Utility.ofMap("name", field.getName()));
if (!contains) {
list.add((Map) Utility.ofMap("name", field.getName()));
}
}
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionMethod(String name, Method method) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -218,13 +231,18 @@ public class RedkaleClassLoader extends URLClassLoader {
break;
}
}
if (!contains) list.add(createMap(method.getName(), method.getParameterTypes()));
if (!contains) {
list.add(createMap(method.getName(), method.getParameterTypes()));
}
}
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionDeclaredConstructors(Class clazz, String name, Class... cts) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -234,8 +252,12 @@ public class RedkaleClassLoader extends URLClassLoader {
map.put("allDeclaredConstructors", true);
if (clazz != null) {
if (clazz.isInterface()) return;
if (Modifier.isAbstract(clazz.getModifiers())) return;
if (clazz.isInterface()) {
return;
}
if (Modifier.isAbstract(clazz.getModifiers())) {
return;
}
try {
clazz.getDeclaredConstructor(cts);
} catch (Throwable t) {
@@ -259,13 +281,18 @@ public class RedkaleClassLoader extends URLClassLoader {
break;
}
}
if (!contains) list.add((Map) Utility.ofMap("name", "<init>", "parameterTypes", types));
if (!contains) {
list.add((Map) Utility.ofMap("name", "<init>", "parameterTypes", types));
}
}
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionPublicConstructors(Class clazz, String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -275,8 +302,12 @@ public class RedkaleClassLoader extends URLClassLoader {
map.put("allPublicConstructors", true);
if (clazz != null) {
if (clazz.isInterface()) return;
if (Modifier.isAbstract(clazz.getModifiers())) return;
if (clazz.isInterface()) {
return;
}
if (Modifier.isAbstract(clazz.getModifiers())) {
return;
}
try {
clazz.getConstructor();
} catch (Throwable t) {
@@ -296,13 +327,18 @@ public class RedkaleClassLoader extends URLClassLoader {
break;
}
}
if (!contains) list.add((Map) Utility.ofMap("name", "<init>", "parameterTypes", new String[0]));
if (!contains) {
list.add((Map) Utility.ofMap("name", "<init>", "parameterTypes", new String[0]));
}
}
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionDeclaredMethods(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -310,11 +346,14 @@ public class RedkaleClassLoader extends URLClassLoader {
reflectionMap.put(name, map);
}
map.put("allDeclaredMethods", true);
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionPublicMethods(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -322,11 +361,14 @@ public class RedkaleClassLoader extends URLClassLoader {
reflectionMap.put(name, map);
}
map.put("allPublicMethods", true);
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionDeclaredFields(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -334,11 +376,14 @@ public class RedkaleClassLoader extends URLClassLoader {
reflectionMap.put(name, map);
}
map.put("allDeclaredFields", true);
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionPublicFields(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -346,11 +391,14 @@ public class RedkaleClassLoader extends URLClassLoader {
reflectionMap.put(name, map);
}
map.put("allPublicFields", true);
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionDeclaredClasses(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -358,11 +406,14 @@ public class RedkaleClassLoader extends URLClassLoader {
reflectionMap.put(name, map);
}
map.put("allDeclaredClasses", true);
} finally {
reflectionLock.unlock();
}
}
public static void putReflectionPublicClasses(String name) {
synchronized (reflectionMap) {
reflectionLock.lock();
try {
Map<String, Object> map = reflectionMap.get(name);
if (map == null) {
map = new LinkedHashMap();
@@ -370,6 +421,8 @@ public class RedkaleClassLoader extends URLClassLoader {
reflectionMap.put(name, map);
}
map.put("allPublicClasses", true);
} finally {
reflectionLock.unlock();
}
}
@@ -423,7 +476,9 @@ public class RedkaleClassLoader extends URLClassLoader {
}
do {
String loaderName = loader.getClass().getName();
if (loaderName.startsWith("sun.") && loaderName.contains("ExtClassLoader")) continue;
if (loaderName.startsWith("sun.") && loaderName.contains("ExtClassLoader")) {
continue;
}
if (loader instanceof URLClassLoader) {
for (URL url : ((URLClassLoader) loader).getURLs()) {
set.add(url);

View File

@@ -12,6 +12,7 @@ import java.math.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.annotation.*;
@@ -44,6 +45,8 @@ public final class ResourceFactory {
private static final Logger logger = Logger.getLogger(ResourceFactory.class.getSimpleName());
private final ReentrantLock lock = new ReentrantLock();
private final ResourceFactory parent;
private final List<WeakReference<ResourceFactory>> chidren = new CopyOnWriteArrayList<>();
@@ -112,6 +115,14 @@ public final class ResourceFactory {
this.store.clear();
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
/**
* 检查资源名是否合法
* <blockquote><pre>
@@ -1116,6 +1127,8 @@ public final class ResourceFactory {
private static class ResourceElement<T> {
private static final ReentrantLock syncLock = new ReentrantLock();
private static final HashMap<String, Method> listenerMethods = new HashMap<>(); //不使用ConcurrentHashMap是因为value不能存null
public final WeakReference<T> dest;
@@ -1140,7 +1153,8 @@ public final class ResourceFactory {
}
private static Method findListener(Class clazz, Class fieldType, AtomicBoolean diff) {
synchronized (listenerMethods) {
syncLock.lock();
try {
Class loop = clazz;
Method m = listenerMethods.get(clazz.getName() + "-" + fieldType.getName());
if (m != null) {
@@ -1167,6 +1181,8 @@ public final class ResourceFactory {
} while ((loop = loop.getSuperclass()) != Object.class);
listenerMethods.put(clazz.getName() + "-" + fieldType.getName(), m);
return m;
} finally {
syncLock.unlock();
}
}
}

View File

@@ -43,7 +43,7 @@
// }
//
// @Override
// public synchronized void handle(sun.misc.Signal sig) {
// public void handle(sun.misc.Signal sig) {
// String sigstr = sig + "," + sig.getName() + "," + sig.getNumber();
// shutdownConsumer.accept(sigstr);
// }

View File

@@ -36,7 +36,7 @@ public class SimpleProxySelector extends ProxySelector {
}
@Override
public synchronized List<Proxy> select(URI uri) {
public List<Proxy> select(URI uri) {
String scheme = uri.getScheme().toLowerCase();
if (scheme.equals("http") || scheme.equals("https")) {
return list;

View File

@@ -7,6 +7,7 @@ package org.redkale.util;
import java.lang.reflect.Type;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
@@ -24,6 +25,8 @@ import static org.redkale.asm.Opcodes.*;
@SuppressWarnings("unchecked")
public abstract class TypeToken<T> {
private static final ReentrantLock syncLock = new ReentrantLock();
private final Type type;
public TypeToken() {
@@ -487,7 +490,12 @@ public abstract class TypeToken<T> {
}
}
if (count == actualTypeArguments0.length) {
return createParameterizedType0((Class) rawType0, actualTypeArguments0);
syncLock.lock();
try {
return createParameterizedType0((Class) rawType0, actualTypeArguments0);
} finally {
syncLock.unlock();
}
}
}
return new ParameterizedType() {
@@ -557,7 +565,7 @@ public abstract class TypeToken<T> {
}
// 注意: RetResult<Map<String, Long>[]> 这种泛型带[]的尚未实现支持
private static synchronized Type createParameterizedType0(final Class rawType, final Type... actualTypeArguments) {
private static Type createParameterizedType0(final Class rawType, final Type... actualTypeArguments) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
StringBuilder tmpps = new StringBuilder(getClassTypeDescriptor(rawType));
for (Type cz : actualTypeArguments) {

View File

@@ -16,6 +16,7 @@ import java.security.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
@@ -83,7 +84,7 @@ public final class Utility {
private static final ToLongFunction<Object> bufferAddrFunction;
private static final Object clientLock = new Object();
private static final ReentrantLock clientLock = new ReentrantLock();
private static HttpClient httpClient;
@@ -4500,10 +4501,13 @@ public final class Utility {
}
java.net.http.HttpClient c = client == null ? httpClient : client;
if (c == null) {
synchronized (clientLock) {
clientLock.lock();
try {
if (httpClient == null) {
httpClient = java.net.http.HttpClient.newHttpClient();
}
} finally {
clientLock.unlock();
}
c = httpClient;
}