This commit is contained in:
Redkale
2017-08-06 12:25:56 +08:00
parent d6df2055b2
commit e62f7ea63d
4 changed files with 42 additions and 31 deletions

View File

@@ -3,13 +3,15 @@
<persistence>
<!-- 系统基本库 -->
<persistence-unit name="demouser">
<!-- 为NONE表示不启动缓存@Cacheable 失效; 非NONE值(通常用ALL)表示开启缓存。 -->
<shared-cache-mode>NONE</shared-cache-mode>
<properties>
<!--
DataSource的实现类没有设置默认为org.redkale.source.DataJdbcSource的实现使用常规基于JDBC的数据库驱动一般无需设置
-->
<property name="javax.persistence.datasource" value="org.redkale.source.DataJdbcSource"/>
<!--
是否开启缓存(标记为@Cacheable的Entity类),值目前只支持两种: ALL: 所有开启缓存。 NONE: 关闭所有缓存
-->
<property name="javax.persistence.cachemode" value="ALL"/>
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbuser?characterEncoding=utf8"/>
<!--
@@ -42,7 +44,6 @@
</persistence-unit>
<!-- IM消息库 -->
<persistence-unit name="demoim">
<shared-cache-mode>NONE</shared-cache-mode>
<properties>
<!-- jdbc:mysql://127.0.0.1:3306/dbim?autoReconnect=true&amp;autoReconnectForPools=true&amp;characterEncoding=utf8 -->
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbim?characterEncoding=utf8"/>

View File

@@ -56,7 +56,7 @@ public abstract class NodeServer {
//ClassLoader
protected RedkaleClassLoader serverClassLoader;
protected final Thread serverThread;
//当前Server的SNCP协议的组
@@ -173,7 +173,7 @@ public abstract class NodeServer {
final TransportFactory appTranFactory = application.getTransportFactory();
final AnyValue resources = application.config.getAnyValue("resources");
final Map<String, AnyValue> cacheResource = new HashMap<>();
//final Map<String, AnyValue> dataResources = new HashMap<>();
final Map<String, AnyValue> dataResources = new HashMap<>();
if (resources != null) {
for (AnyValue sourceConf : resources.getAnyValues("source")) {
try {
@@ -183,9 +183,7 @@ public abstract class NodeServer {
} else if (CacheSource.class.isAssignableFrom(type)) {
cacheResource.put(sourceConf.getValue("name", ""), sourceConf);
} else if (DataSource.class.isAssignableFrom(type)) {
//dataResources.put(sourceConf.getValue("name", ""), sourceConf);
//暂时不支持DataSource通过<resources>设置
logger.log(Level.SEVERE, "load application source resource, but not CacheSource error: " + sourceConf);
dataResources.put(sourceConf.getValue("name", ""), sourceConf);
} else {
logger.log(Level.SEVERE, "load application source resource, but not CacheSource error: " + sourceConf);
}
@@ -258,23 +256,32 @@ public abstract class NodeServer {
final Service srcService = (Service) src;
SncpClient client = Sncp.getSncpClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final AnyValue sourceConf = cacheResource.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("type"));
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
final CacheSource source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
if (sourceType == CacheMemorySource.class) {
CacheMemorySource memorySource = (CacheMemorySource) source;
memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后
AnyValue sourceConf = cacheResource.get(resourceName);
if (sourceConf == null) sourceConf = dataResources.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
Object source;
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
application.dataSources.add((DataSource) source);
appResFactory.register(resourceName, DataSource.class, source);
} else { // CacheSource
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
if (sourceType == CacheMemorySource.class) {
CacheMemorySource memorySource = (CacheMemorySource) source;
memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后
}
application.cacheSources.add((CacheSource) source);
appResFactory.register(resourceName, genericType, source);
appResFactory.register(resourceName, CacheSource.class, source);
}
application.cacheSources.add(source);
appResFactory.register(resourceName, genericType, source);
appResFactory.register(resourceName, CacheSource.class, source);
field.set(src, source);
rf.inject(source, self); //
if (source instanceof Service) ((Service) source).init(sourceConf);
@@ -531,7 +538,7 @@ public abstract class NodeServer {
public void setServerClassLoader(RedkaleClassLoader serverClassLoader) {
Objects.requireNonNull(this.serverClassLoader);
this.serverClassLoader = serverClassLoader;
this.serverThread.setContextClassLoader(serverClassLoader);
this.serverThread.setContextClassLoader(serverClassLoader);
}
public InetSocketAddress getSncpAddress() {

View File

@@ -15,6 +15,7 @@ import java.util.function.*;
import java.util.logging.*;
import javax.annotation.Resource;
import org.redkale.service.*;
import static org.redkale.source.DataSources.*;
import org.redkale.util.*;
/**
@@ -29,7 +30,7 @@ import org.redkale.util.*;
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(DataSource.class)
public class DataJdbcSource extends AbstractService implements DataSource, Service, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
public class DataJdbcSource extends AbstractService implements DataSource, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
protected static final Flipper FLIPPER_ONE = new Flipper(1);
@@ -57,7 +58,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
this.conf = null;
this.readPool = new PoolJdbcSource(this, "read", readprop);
this.writePool = new PoolJdbcSource(this, "write", writeprop);
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty("shared-cache-mode"));
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
}
@Override
@@ -447,7 +448,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
}
String sql = "DELETE " + (this.readPool.isMysql() ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1))
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
+ ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
conn.setReadOnly(false);
@@ -703,7 +704,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
+ " SET " + info.getSQLColumn("a", column) + " = ?"
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
conn.setReadOnly(false);
Blob blob = conn.createBlob();
@@ -716,7 +717,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
+ " SET " + info.getSQLColumn("a", column) + " = " + info.formatToString(value)
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
conn.setReadOnly(false);
final Statement stmt = conn.createStatement();
@@ -921,7 +922,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
}
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
//注LIMIT 仅支持MySQL 且在多表关联式会异常, 该BUG尚未解决
sql += info.createSQLOrderby(flipper) + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
@@ -1107,7 +1108,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
}
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
conn.setReadOnly(false);
if (blobs != null) {

View File

@@ -21,6 +21,8 @@ public final class DataSources {
public static final String JDBC_DATASOURCE_CLASS = "javax.persistence.datasource";
public static final String JDBC_CACHE_MODE = "javax.persistence.cachemode";
public static final String JDBC_CONNECTIONSMAX = "javax.persistence.connections.limit";
public static final String JDBC_CONTAIN_SQLTEMPLATE = "javax.persistence.contain.sqltemplate";
@@ -122,8 +124,8 @@ public final class DataSources {
String value = reader.getAttributeValue(null, "value");
if (name == null) continue;
result.put(name, value);
} else if (flag && "shared-cache-mode".equalsIgnoreCase(reader.getLocalName())) {
result.put(reader.getLocalName(), reader.getElementText());
} else if (flag && "shared-cache-mode".equalsIgnoreCase(reader.getLocalName())) { //兼容shared-cache-mode属性
result.put(JDBC_CACHE_MODE, reader.getElementText());
}
}
}