This commit is contained in:
地平线
2015-07-21 11:03:18 +08:00
parent 54a7c94cf5
commit 29e5695060
9 changed files with 357 additions and 291 deletions

View File

@@ -180,7 +180,7 @@ public final class Application {
final File root = new File(System.getProperty(RESNAME_HOME));
File persist = new File(root, "conf/persistence.xml");
if (persist.isFile()) System.setProperty(DataJDBCSource.DATASOURCE_CONFPATH, persist.getCanonicalPath());
if (persist.isFile()) System.setProperty(DataDefaultSource.DATASOURCE_CONFPATH, persist.getCanonicalPath());
logger.log(Level.INFO, RESNAME_HOME + "=" + root.getCanonicalPath() + "\r\n" + RESNAME_ADDR + "=" + this.localAddress.getHostAddress());
String lib = config.getValue("lib", "").trim().replace("${APP_HOME}", root.getCanonicalPath());
lib = lib.isEmpty() ? (root.getCanonicalPath() + "/conf") : (lib + ";" + root.getCanonicalPath() + "/conf");

View File

@@ -92,27 +92,21 @@ public final class HttpResourceServlet extends HttpServlet {
protected WatchThread watchThread;
protected List<SimpleEntry<File, WatchThread>> resx;
protected Predicate<String> ranges;
@Override
public void init(Context context, AnyValue config) {
String[] rootstrs = null;
if (config != null) {
rootstrs = config.getValue("webroot", "root").trim().split(";");
for (int i = 0; i < rootstrs.length; i++) {
String rootstr = rootstrs[i];
if (rootstr.indexOf(':') < 0 && rootstr.indexOf('/') != 0 && System.getProperty("APP_HOME") != null) {
rootstrs[i] = new File(System.getProperty("APP_HOME"), rootstr).getPath();
}
String rootstr = config.getValue("webroot", "root");
if (rootstr.indexOf(':') < 0 && rootstr.indexOf('/') != 0 && System.getProperty("APP_HOME") != null) {
rootstr = new File(System.getProperty("APP_HOME"), rootstr).getPath();
}
String rangesValue = config.getValue("ranges");
this.ranges = rangesValue != null ? Pattern.compile(rangesValue).asPredicate() : null;
try {
this.root = new File(rootstrs[0]).getCanonicalFile();
this.root = new File(rootstr).getCanonicalFile();
} catch (IOException ioe) {
this.root = new File(rootstrs[0]);
this.root = new File(rootstr);
}
AnyValue cacheconf = config.getAnyValue("caches");
if (cacheconf != null) {
@@ -131,7 +125,7 @@ public final class HttpResourceServlet extends HttpServlet {
}
this.locationRewrites = locations.isEmpty() ? null : locations.toArray(new SimpleEntry[locations.size()]);
}
if (this.cachelimit < 1) return;
if (this.cachelimit < 1) return; //不缓存不需要开启WatchThread监听
if (this.root != null) {
try {
this.watchThread = new WatchThread(this.root);
@@ -139,19 +133,6 @@ public final class HttpResourceServlet extends HttpServlet {
} catch (IOException ex) {
logger.log(Level.WARNING, HttpResourceServlet.class.getSimpleName() + " start watch-thread error", ex);
}
if (rootstrs != null && rootstrs.length > 1) {
resx = new ArrayList<>(rootstrs.length - 1);
for (int i = 1; i < rootstrs.length; i++) {
try {
File f = new File(rootstrs[i]).getCanonicalFile();
WatchThread t = new WatchThread(f);
t.start();
resx.add(new SimpleEntry<>(f, t));
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}
}
@@ -193,10 +174,15 @@ public final class HttpResourceServlet extends HttpServlet {
}
if (uri.length() == 0 || uri.equals("/")) uri = "/index.html";
//System.out.println(request);
FileEntry entry = watchThread == null ? createFileEntry(uri) : files.get(uri);
if (entry == null) {
FileEntry entry;
if (watchThread == null) {
entry = createFileEntry(uri);
if (entry != null && watchThread != null) files.put(uri, entry);
} else { //有缓存
entry = files.get(uri);
if (entry == null) {
entry = createFileEntry(uri);
if (entry != null) files.put(uri, entry);
}
}
if (entry == null) {
response.finish404();
@@ -207,32 +193,15 @@ public final class HttpResourceServlet extends HttpServlet {
private FileEntry createFileEntry(String uri) {
File file = new File(root, uri);
if (!file.isFile() || !file.canRead()) {
if (resx != null) {
for (SimpleEntry<File, WatchThread> en : resx) {
File f = new File(en.getKey(), uri);
if (f.isFile() && f.canRead()) {
FileEntry fe = new FileEntry(this, f);
if (watchThread == null) return fe;
try {
Path p = f.getParentFile().toPath();
keymaps.put(p.register(en.getValue().watcher, ENTRY_MODIFY, ENTRY_DELETE), p);
} catch (IOException e) {
logger.log(Level.INFO, HttpResourceServlet.class.getSimpleName() + " create FileEntry(" + uri + ") erroneous", e);
}
return fe;
}
}
}
return null;
}
if (file.isDirectory()) file = new File(file, "index.html");
if (!file.isFile() || !file.canRead()) return null;
FileEntry en = new FileEntry(this, file);
if (watchThread == null) return en;
try {
Path p = file.getParentFile().toPath();
keymaps.put(p.register(watchThread.watcher, ENTRY_MODIFY, ENTRY_DELETE), p);
} catch (IOException e) {
logger.log(Level.INFO, HttpResourceServlet.class.getSimpleName() + " create FileEntry(" + uri + ") erroneous", e);
logger.log(Level.INFO, HttpResourceServlet.class.getSimpleName() + " watch FileEntry(" + uri + ") erroneous", e);
}
return en;
}
@@ -258,7 +227,7 @@ public final class HttpResourceServlet extends HttpServlet {
}
long length = this.file.length();
if (length > this.servlet.cachelengthmax) return;
if (this.servlet.cachedLength.longValue() + length > this.servlet.cachelimit) return;
if (this.servlet.cachedLength.longValue() + length > this.servlet.cachelimit) return; //超过缓存总容量
try {
FileInputStream in = new FileInputStream(file);
ByteArrayOutputStream out = new ByteArrayOutputStream((int) file.length());

View File

@@ -133,7 +133,7 @@ public class DataCacheListenerService implements DataCacheListener, Service {
public final <T> void onSendInsert(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, T... entitys) {
if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendInsert " + Arrays.toString(entitys));
((DataJDBCSource) sourcesmap.get(sourceName)).insertCache(entitys);
((DataDefaultSource) sourcesmap.get(sourceName)).insertCache(entitys);
if (!this.localGroupName.equals(group)) sendInsert(this.localGroupName, true, sourceName, clazz, entitys); //不是同一机房来的资源需要同步到其他同机房的节点上
}
@@ -213,7 +213,7 @@ public class DataCacheListenerService implements DataCacheListener, Service {
public final <T> void onSendUpdate(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, T... entitys) {
if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onSendUpdate " + Arrays.toString(entitys));
((DataJDBCSource) sourcesmap.get(sourceName)).updateCache(clazz, entitys);
((DataDefaultSource) sourcesmap.get(sourceName)).updateCache(clazz, entitys);
if (!this.localGroupName.equals(group)) sendUpdate(this.localGroupName, true, sourceName, clazz, entitys); //不是同一机房来的资源需要同步到其他同机房的节点上
}
@@ -293,7 +293,7 @@ public class DataCacheListenerService implements DataCacheListener, Service {
public final <T> void onSendDelete(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, Serializable... ids) {
if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onSendDelete " + clazz.getName() + " " + Arrays.toString(ids));
((DataJDBCSource) sourcesmap.get(sourceName)).deleteCache(clazz, ids);
((DataDefaultSource) sourcesmap.get(sourceName)).deleteCache(clazz, ids);
if (!this.localGroupName.equals(group)) sendDelete(this.localGroupName, true, sourceName, clazz, ids); //不是同一机房来的资源需要同步到其他同机房的节点上
}
}

View File

@@ -7,7 +7,7 @@ package com.wentch.redkale.service;
import com.wentch.redkale.source.DataSQLListener;
import com.wentch.redkale.source.DataSource;
import com.wentch.redkale.source.DataJDBCSource;
import com.wentch.redkale.source.DataDefaultSource;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.util.AutoLoad;
import java.io.*;
@@ -172,7 +172,7 @@ public class DataSQLListenerService implements DataSQLListener, Service {
}
public final void onSend(String sourceName, String... sqls) {
((DataJDBCSource) sourcemaps.get(sourceName)).execute(sqls);
((DataDefaultSource) sourcemaps.get(sourceName)).execute(sqls);
}
}

View File

@@ -24,7 +24,7 @@ import javax.xml.stream.*;
* @author zhangjx
*/
@SuppressWarnings("unchecked")
public final class DataJDBCSource implements DataSource {
public final class DataDefaultSource implements DataSource {
public static final String DATASOURCE_CONFPATH = "DATASOURCE_CONFPATH";
@@ -42,7 +42,7 @@ public final class DataJDBCSource implements DataSource {
private static final Flipper FLIPPER_ONE = new Flipper(1);
final Logger logger = Logger.getLogger(DataJDBCSource.class.getSimpleName());
final Logger logger = Logger.getLogger(DataDefaultSource.class.getSimpleName());
final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
@@ -55,13 +55,13 @@ public final class DataJDBCSource implements DataSource {
private final JDBCPoolSource writePool;
@Resource(name = "property.datasource.nodeid")
int nodeid;
private int nodeid;
@Resource
DataSQLListener writeListener;
private DataSQLListener writeListener;
@Resource
DataCacheListener cacheListener;
private DataCacheListener cacheListener;
private static class DataJDBCConnection extends DataConnection {
@@ -108,17 +108,17 @@ public final class DataJDBCSource implements DataSource {
private final Function<Class, List> fullloader = (t) -> queryList(t, (FilterNode) null);
public DataJDBCSource() throws IOException {
public DataDefaultSource() throws IOException {
this("");
}
public DataJDBCSource(final String unitName) throws IOException {
public DataDefaultSource(final String unitName) throws IOException {
this(unitName, System.getProperty(DATASOURCE_CONFPATH) == null
? DataJDBCSource.class.getResource("/META-INF/persistence.xml")
? DataDefaultSource.class.getResource("/META-INF/persistence.xml")
: new File(System.getProperty(DATASOURCE_CONFPATH)).toURI().toURL());
}
public DataJDBCSource(final String unitName, URL url) throws IOException {
public DataDefaultSource(final String unitName, URL url) throws IOException {
if (url == null) url = this.getClass().getResource("/persistence.xml");
InputStream in = url.openStream();
Map<String, Properties> map = loadProperties(in);
@@ -156,7 +156,7 @@ public final class DataJDBCSource implements DataSource {
EntityInfo.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty("shared-cache-mode"));
}
public DataJDBCSource(String unitName, Properties readprop, Properties writeprop) {
public DataDefaultSource(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName;
this.conf = null;
this.readPool = new JDBCPoolSource(this, "read", readprop);
@@ -164,7 +164,7 @@ public final class DataJDBCSource implements DataSource {
EntityInfo.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty("shared-cache-mode"));
}
public static Map<String, DataJDBCSource> create(final InputStream in) {
public static Map<String, DataDefaultSource> create(final InputStream in) {
Map<String, Properties> map = loadProperties(in);
Map<String, Properties[]> maps = new HashMap<>();
map.entrySet().stream().forEach((en) -> {
@@ -179,9 +179,9 @@ public final class DataJDBCSource implements DataSource {
maps.put(en.getKey(), new Properties[]{en.getValue(), en.getValue()});
}
});
Map<String, DataJDBCSource> result = new HashMap<>();
Map<String, DataDefaultSource> result = new HashMap<>();
maps.entrySet().stream().forEach((en) -> {
result.put(en.getKey(), new DataJDBCSource(en.getKey(), en.getValue()[0], en.getValue()[1]));
result.put(en.getKey(), new DataDefaultSource(en.getKey(), en.getValue()[0], en.getValue()[1]));
});
return result;
}
@@ -280,11 +280,12 @@ public final class DataJDBCSource implements DataSource {
return readPool.poll();
}
private Connection createWriteSQLConnection() {
private <T> Connection createWriteSQLConnection() {
return writePool.poll();
}
private void closeSQLConnection(final Connection sqlconn) {
if (sqlconn == null) return;
try {
sqlconn.close();
} catch (Exception e) {
@@ -349,9 +350,15 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void insert(T... values) {
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
insert(null, info, values);
return;
}
Connection conn = createWriteSQLConnection();
try {
insert(conn, values);
insert(conn, info, values);
} finally {
closeSQLConnection(conn);
}
@@ -366,121 +373,123 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void insert(final DataConnection conn, T... values) {
insert((Connection) conn.getConnection(), values);
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
insert((Connection) conn.getConnection(), info, values);
}
private <T> void insert(final Connection conn, T... values) {
private <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return;
try {
final Class<T> clazz = (Class<T>) values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
final String sql = info.insertSQL;
if (debug.get()) logger.finest(clazz.getSimpleName() + " insert sql=" + sql);
final PreparedStatement prestmt = info.autoGenerated
? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
final Class primaryType = info.getPrimary().type();
final Attribute primary = info.getPrimary();
final boolean distributed = info.distributed;
Attribute<T, Serializable>[] attrs = info.insertAttributes;
String[] sqls = null;
if (distributed && !info.initedPrimaryValue && primaryType.isPrimitive()) {
synchronized (info) {
if (!info.initedPrimaryValue) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT MAX(" + info.getPrimarySQLColumn() + ") FROM " + info.getTable());
if (rs.next()) {
if (primaryType == int.class) {
int v = rs.getInt(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
} else {
long v = rs.getLong(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
}
}
rs.close();
stmt.close();
if (info.distributeTables != null) {
for (final Class t : info.distributeTables) {
EntityInfo<T> infox = loadEntityInfo(t);
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT MAX(" + info.getPrimarySQLColumn() + ") FROM " + infox.getTable()); // 必须是同一字段名
if (rs.next()) {
if (primaryType == int.class) {
int v = rs.getInt(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
} else {
long v = rs.getLong(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
}
if (!info.isVirtualEntity()) {
final String sql = info.insertSQL;
if (debug.get()) logger.finest(info.getType().getSimpleName() + " insert sql=" + sql);
final PreparedStatement prestmt = info.autoGenerated
? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
final Class primaryType = info.getPrimary().type();
final Attribute primary = info.getPrimary();
final boolean distributed = info.distributed;
Attribute<T, Serializable>[] attrs = info.insertAttributes;
String[] sqls = null;
if (distributed && !info.initedPrimaryValue && primaryType.isPrimitive()) {
synchronized (info) {
if (!info.initedPrimaryValue) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT MAX(" + info.getPrimarySQLColumn() + ") FROM " + info.getTable());
if (rs.next()) {
if (primaryType == int.class) {
int v = rs.getInt(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
} else {
long v = rs.getLong(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
}
rs.close();
stmt.close();
}
rs.close();
stmt.close();
if (info.distributeTables != null) {
for (final Class t : info.distributeTables) {
EntityInfo<T> infox = loadEntityInfo(t);
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT MAX(" + info.getPrimarySQLColumn() + ") FROM " + infox.getTable()); // 必须是同一字段名
if (rs.next()) {
if (primaryType == int.class) {
int v = rs.getInt(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
} else {
long v = rs.getLong(1) / info.allocationSize;
if (v > info.primaryValue.get()) info.primaryValue.set(v);
}
}
rs.close();
stmt.close();
}
}
info.initedPrimaryValue = true;
}
}
}
if (writeListener == null) {
for (final T value : values) {
int i = 0;
if (distributed) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.addBatch();
}
} else {
char[] sqlchars = sql.toCharArray();
sqls = new String[values.length];
String[] ps = new String[attrs.length];
int index = 0;
for (final T value : values) {
int i = 0;
if (distributed) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
Object a = attr.get(value);
ps[i] = formatToString(a);
prestmt.setObject(++i, a);
}
prestmt.addBatch();
//-----------------------------
StringBuilder sb = new StringBuilder(128);
i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
sb.append(ps[i++]);
} else {
sb.append(ch);
}
}
info.initedPrimaryValue = true;
sqls[index++] = sb.toString();
}
}
}
if (writeListener == null) {
for (final T value : values) {
int i = 0;
if (distributed) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.addBatch();
}
} else {
char[] sqlchars = sql.toCharArray();
sqls = new String[values.length];
String[] ps = new String[attrs.length];
int index = 0;
for (final T value : values) {
int i = 0;
if (distributed) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
Object a = attr.get(value);
ps[i] = formatToString(a);
prestmt.setObject(++i, a);
}
prestmt.addBatch();
//-----------------------------
StringBuilder sb = new StringBuilder(128);
i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
sb.append(ps[i++]);
prestmt.executeBatch();
if (writeListener != null) writeListener.insert(name, sqls);
if (info.autoGenerated) {
ResultSet set = prestmt.getGeneratedKeys();
int i = -1;
while (set.next()) {
if (primaryType == int.class) {
primary.set(values[++i], set.getInt(1));
} else if (primaryType == long.class) {
primary.set(values[++i], set.getLong(1));
} else {
sb.append(ch);
primary.set(values[++i], set.getObject(1));
}
}
sqls[index++] = sb.toString();
set.close();
}
}
prestmt.executeBatch();
if (writeListener != null) writeListener.insert(name, sqls);
if (info.autoGenerated) {
ResultSet set = prestmt.getGeneratedKeys();
int i = -1;
while (set.next()) {
if (primaryType == int.class) {
primary.set(values[++i], set.getInt(1));
} else if (primaryType == long.class) {
primary.set(values[++i], set.getLong(1));
} else {
primary.set(values[++i], set.getObject(1));
}
}
set.close();
prestmt.close();
}
if (cache != null) {
for (final T value : values) {
cache.insert(value);
}
if (cacheListener != null) cacheListener.insert(name, clazz, values);
if (cacheListener != null) cacheListener.insert(name, info.getType(), values);
}
prestmt.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -505,9 +514,15 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void delete(T... values) {
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
delete(null, info, values);
return;
}
Connection conn = createWriteSQLConnection();
try {
delete(conn, values);
delete(conn, info, values);
} finally {
closeSQLConnection(conn);
}
@@ -515,27 +530,32 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void delete(final DataConnection conn, T... values) {
delete((Connection) conn.getConnection(), values);
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
delete((Connection) conn.getConnection(), info, values);
}
private <T> void delete(final Connection conn, T... values) {
private <T> void delete(final Connection conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return;
final Class clazz = values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
final Attribute primary = info.getPrimary();
Serializable[] ids = new Serializable[values.length];
int i = 0;
for (final T value : values) {
ids[i++] = (Serializable) primary.get(value);
}
delete(conn, clazz, ids);
delete(conn, info, ids);
}
@Override
public <T> void delete(Class<T> clazz, Serializable... ids) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
delete(null, info, ids);
return;
}
Connection conn = createWriteSQLConnection();
try {
delete(conn, clazz, ids);
delete(conn, info, ids);
} finally {
closeSQLConnection(conn);
}
@@ -543,27 +563,28 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void delete(final DataConnection conn, Class<T> clazz, Serializable... ids) {
delete((Connection) conn.getConnection(), clazz, ids);
delete((Connection) conn.getConnection(), loadEntityInfo(clazz), ids);
}
private <T> void delete(final Connection conn, Class<T> clazz, Serializable... keys) {
private <T> void delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
if (keys.length == 0) return;
try {
final EntityInfo<T> info = loadEntityInfo(clazz);
String sql = "DELETE FROM " + info.getTable() + " WHERE " + info.getPrimarySQLColumn()
+ " IN " + formatToString(keys);
if (debug.get()) logger.finest(clazz.getSimpleName() + " delete sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.delete(name, sql);
if (!info.isVirtualEntity()) {
String sql = "DELETE FROM " + info.getTable() + " WHERE " + info.getPrimarySQLColumn()
+ " IN " + formatToString(keys);
if (debug.get()) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.delete(name, sql);
}
//------------------------------------
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
final Attribute<T, Serializable> attr = info.getPrimary();
final Serializable[] keys2 = keys;
Serializable[] ids = cache.delete((T t) -> Arrays.binarySearch(keys2, attr.get(t)) >= 0);
if (cacheListener != null) cacheListener.delete(name, clazz, ids);
if (cacheListener != null) cacheListener.delete(name, info.getType(), ids);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -571,9 +592,14 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void delete(Class<T> clazz, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
delete(null, info, node);
return;
}
Connection conn = createWriteSQLConnection();
try {
delete(conn, clazz, node);
delete(conn, info, node);
} finally {
closeSQLConnection(conn);
}
@@ -581,23 +607,24 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void delete(final DataConnection conn, Class<T> clazz, FilterNode node) {
delete((Connection) conn.getConnection(), clazz, node);
delete((Connection) conn.getConnection(), loadEntityInfo(clazz), node);
}
private <T> void delete(final Connection conn, Class<T> clazz, FilterNode node) {
private <T> void delete(final Connection conn, final EntityInfo<T> info, FilterNode node) {
try {
final EntityInfo<T> info = loadEntityInfo(clazz);
String sql = "DELETE FROM " + info.getTable() + " a" + node.createFilterSQLExpress(info, null);
if (debug.get()) logger.finest(clazz.getSimpleName() + " delete sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.delete(name, sql);
if (!info.isVirtualEntity()) {
String sql = "DELETE FROM " + info.getTable() + " a" + node.createFilterSQLExpress(info, null);
if (debug.get()) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.delete(name, sql);
}
//------------------------------------
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
Serializable[] ids = cache.delete(node.createFilterPredicate(info, null));
if (cacheListener != null) cacheListener.delete(name, clazz, ids);
if (cacheListener != null) cacheListener.delete(name, info.getType(), ids);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -622,9 +649,15 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void update(T... values) {
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
update(null, info, values);
return;
}
Connection conn = createWriteSQLConnection();
try {
update(conn, values);
update(conn, info, values);
} finally {
closeSQLConnection(conn);
}
@@ -632,57 +665,60 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void update(final DataConnection conn, T... values) {
update((Connection) conn.getConnection(), values);
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
update((Connection) conn.getConnection(), info, values);
}
private <T> void update(final Connection conn, T... values) {
private <T> void update(final Connection conn, final EntityInfo<T> info, T... values) {
try {
Class clazz = values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
if (debug.get()) logger.finest(clazz.getSimpleName() + " update sql=" + info.updateSQL);
final Attribute<T, Serializable> primary = info.getPrimary();
final PreparedStatement prestmt = conn.prepareStatement(info.updateSQL);
Attribute<T, Serializable>[] attrs = info.updateAttributes;
String[] sqls = null;
if (writeListener == null) {
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
}
} else {
char[] sqlchars = info.updateSQL.toCharArray();
sqls = new String[values.length];
String[] ps = new String[attrs.length];
int index = 0;
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object a = attr.get(value);
ps[i] = formatToString(a);
prestmt.setObject(++i, a);
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
//-----------------------------
StringBuilder sb = new StringBuilder(128);
i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
sb.append(ps[i++]);
} else {
sb.append(ch);
Class clazz = info.getType();
if (!info.isVirtualEntity()) {
if (debug.get()) logger.finest(clazz.getSimpleName() + " update sql=" + info.updateSQL);
final Attribute<T, Serializable> primary = info.getPrimary();
final PreparedStatement prestmt = conn.prepareStatement(info.updateSQL);
Attribute<T, Serializable>[] attrs = info.updateAttributes;
String[] sqls = null;
if (writeListener == null) {
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
}
} else {
char[] sqlchars = info.updateSQL.toCharArray();
sqls = new String[values.length];
String[] ps = new String[attrs.length];
int index = 0;
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object a = attr.get(value);
ps[i] = formatToString(a);
prestmt.setObject(++i, a);
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
//-----------------------------
StringBuilder sb = new StringBuilder(128);
i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
sb.append(ps[i++]);
} else {
sb.append(ch);
}
}
sqls[index++] = sb.toString();
}
sqls[index++] = sb.toString();
}
prestmt.executeBatch();
prestmt.close();
if (writeListener != null) writeListener.update(name, sqls);
}
prestmt.executeBatch();
prestmt.close();
if (writeListener != null) writeListener.update(name, sqls);
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
@@ -706,9 +742,14 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void updateColumn(Class<T> clazz, Serializable id, String column, Serializable value) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
updateColumn(null, info, id, column, value);
return;
}
Connection conn = createWriteSQLConnection();
try {
updateColumn(conn, clazz, id, column, value);
updateColumn(conn, info, id, column, value);
} finally {
closeSQLConnection(conn);
}
@@ -716,24 +757,25 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void updateColumn(DataConnection conn, Class<T> clazz, Serializable id, String column, Serializable value) {
updateColumn((Connection) conn.getConnection(), clazz, id, column, value);
updateColumn((Connection) conn.getConnection(), loadEntityInfo(clazz), id, column, value);
}
private <T> void updateColumn(Connection conn, Class<T> clazz, Serializable id, String column, Serializable value) {
private <T> void updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, Serializable value) {
try {
final EntityInfo<T> info = loadEntityInfo(clazz);
String sql = "UPDATE " + info.getTable() + " SET " + info.getSQLColumn(column) + " = "
+ formatToString(value) + " WHERE " + info.getPrimarySQLColumn() + " = " + formatToString(id);
if (debug.get()) logger.finest(clazz.getSimpleName() + " update sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(name, sql);
if (!info.isVirtualEntity()) {
String sql = "UPDATE " + info.getTable() + " SET " + info.getSQLColumn(column) + " = "
+ formatToString(value) + " WHERE " + info.getPrimarySQLColumn() + " = " + formatToString(id);
if (debug.get()) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(name, sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
T rs = cache.update(id, (Attribute<T, Serializable>) info.getAttribute(column), value);
if (cacheListener != null) cacheListener.update(name, clazz, rs);
if (cacheListener != null) cacheListener.update(name, info.getType(), rs);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
@@ -752,9 +794,14 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void updateColumnIncrement(Class<T> clazz, Serializable id, String column, long incvalue) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
updateColumnIncrement(null, info, id, column, incvalue);
return;
}
Connection conn = createWriteSQLConnection();
try {
updateColumnIncrement(conn, clazz, id, column, incvalue);
updateColumnIncrement(conn, info, id, column, incvalue);
} finally {
closeSQLConnection(conn);
}
@@ -762,26 +809,27 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void updateColumnIncrement(DataConnection conn, Class<T> clazz, Serializable id, String column, long incvalue) {
updateColumnIncrement((Connection) conn.getConnection(), clazz, id, column, incvalue);
updateColumnIncrement((Connection) conn.getConnection(), loadEntityInfo(clazz), id, column, incvalue);
}
private <T> void updateColumnIncrement(Connection conn, Class<T> clazz, Serializable id, String column, long incvalue) {
private <T> void updateColumnIncrement(Connection conn, final EntityInfo<T> info, Serializable id, String column, long incvalue) {
try {
final EntityInfo<T> info = loadEntityInfo(clazz);
String col = info.getSQLColumn(column);
String sql = "UPDATE " + info.getTable() + " SET " + col + " = " + col + " + (" + incvalue
+ ") WHERE " + info.getPrimarySQLColumn() + " = " + formatToString(id);
if (debug.get()) logger.finest(clazz.getSimpleName() + " update sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(name, sql);
if (!info.isVirtualEntity()) {
String col = info.getSQLColumn(column);
String sql = "UPDATE " + info.getTable() + " SET " + col + " = " + col + " + (" + incvalue
+ ") WHERE " + info.getPrimarySQLColumn() + " = " + formatToString(id);
if (debug.get()) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(name, sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
Attribute<T, Serializable> attr = info.getAttribute(column);
T value = cache.updateColumnIncrement(id, attr, incvalue);
if (value != null && cacheListener != null) cacheListener.update(name, clazz, value);
if (value != null && cacheListener != null) cacheListener.update(name, info.getType(), value);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
@@ -798,9 +846,14 @@ public final class DataJDBCSource implements DataSource {
*/
@Override
public <T> void updateColumns(final T value, final String... columns) {
final EntityInfo<T> info = loadEntityInfo((Class<T>) value.getClass());
if (info.isVirtualEntity()) {
updateColumns(null, info, value, columns);
return;
}
Connection conn = createWriteSQLConnection();
try {
updateColumns(conn, value, columns);
updateColumns(conn, info, value, columns);
} finally {
closeSQLConnection(conn);
}
@@ -808,31 +861,35 @@ public final class DataJDBCSource implements DataSource {
@Override
public <T> void updateColumns(final DataConnection conn, final T value, final String... columns) {
updateColumns((Connection) conn.getConnection(), value, columns);
updateColumns((Connection) conn.getConnection(), loadEntityInfo((Class<T>) value.getClass()), value, columns);
}
private <T> void updateColumns(final Connection conn, final T value, final String... columns) {
private <T> void updateColumns(final Connection conn, final EntityInfo<T> info, final T value, final String... columns) {
if (value == null || columns.length < 1) return;
try {
final Class<T> clazz = (Class<T>) value.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
StringBuilder setsql = new StringBuilder();
final Serializable id = info.getPrimary().get(value);
final List<Attribute<T, Serializable>> attrs = new ArrayList<>();
final boolean virtual = info.isVirtualEntity();
for (String col : columns) {
Attribute<T, Serializable> attr = info.getUpdateAttribute(col);
if (attr == null) continue;
if (setsql.length() > 0) setsql.append(',');
setsql.append(info.getSQLColumn(col)).append(" = ").append(formatToString(attr.get(value)));
attrs.add(attr);
if (!virtual) {
if (setsql.length() > 0) setsql.append(',');
setsql.append(info.getSQLColumn(col)).append(" = ").append(formatToString(attr.get(value)));
}
}
if (!virtual) {
String sql = "UPDATE " + info.getTable() + " SET " + setsql
+ " WHERE " + info.getPrimarySQLColumn() + " = " + formatToString(id);
if (debug.get()) logger.finest(value.getClass().getSimpleName() + ": " + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(name, sql);
}
String sql = "UPDATE " + info.getTable() + " SET " + setsql
+ " WHERE " + info.getPrimarySQLColumn() + " = " + formatToString(id);
if (debug.get()) logger.finest(value.getClass().getSimpleName() + ": " + sql);
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(name, sql);
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
@@ -887,7 +944,7 @@ public final class DataJDBCSource implements DataSource {
final EntityInfo<T> info = loadEntityInfo(entityClass);
if (node == null && bean != null) node = loadFilterBeanNode(bean.getClass());
final EntityCache<T> cache = info.getCache();
if (cache != null && cache.isFullLoaded()) {
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
Predicate<T> filter = node == null ? null : node.createFilterPredicate(info, bean);
if (node == null || node.isJoinAllCached()) {
return cache.getNumberResult(reckon, column == null ? null : info.getAttribute(column), filter);
@@ -934,7 +991,7 @@ public final class DataJDBCSource implements DataSource {
final EntityInfo info = loadEntityInfo(entityClass);
if (node == null && bean != null) node = loadFilterBeanNode(bean.getClass());
final EntityCache cache = info.getCache();
if (cache != null && cache.isFullLoaded()) {
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
Predicate filter = node == null ? null : node.createFilterPredicate(info, bean);
if (node == null || node.isJoinAllCached()) {
return cache.getMapResult(info.getAttribute(keyColumn), reckon, reckonColumn == null ? null : info.getAttribute(reckonColumn), filter);
@@ -1049,6 +1106,7 @@ public final class DataJDBCSource implements DataSource {
/**
* 根据过滤对象FilterBean查询对象集合
*
* @param <K>
* @param <T>
* @param clazz
* @param bean
@@ -1219,7 +1277,7 @@ public final class DataJDBCSource implements DataSource {
Predicate<T> filter = node == null ? null : node.createFilterPredicate(info, bean);
if (node == null || node.isJoinAllCached()) {
Sheet<T> sheet = cache.querySheet(selects, filter, flipper, FilterNode.createFilterComparator(info, flipper));
if (!sheet.isEmpty() || cache.isFullLoaded()) return sheet;
if (!sheet.isEmpty() || info.isVirtualEntity() || cache.isFullLoaded()) return sheet;
}
}
final Connection conn = createReadSQLConnection();

View File

@@ -31,7 +31,7 @@ public abstract class DataSourceFactory {
// }
// if (jpa) return new DataJPASource(unitName);
try {
return new DataJDBCSource(unitName);
return new DataDefaultSource(unitName);
} catch (IOException ex) {
logger.log(Level.WARNING, "cannot create DataSource (" + unitName + ")", ex);
return null;

View File

@@ -34,7 +34,7 @@ public final class EntityInfo<T> {
//Entity类的类名
private final Class<T> type;
//类对应的数据表名
//类对应的数据表名, 如果是VirtualEntity 类, 则该字段为null
private final String table;
private final Creator<T> creator;
@@ -115,7 +115,11 @@ public final class EntityInfo<T> {
this.logLevel = ll == null ? Integer.MIN_VALUE : Level.parse(ll.value()).intValue();
//---------------------------------------------
Table t = type.getAnnotation(Table.class);
this.table = (t == null) ? type.getSimpleName().toLowerCase() : (t.catalog().isEmpty()) ? t.name() : (t.catalog() + '.' + t.name());
if (type.getAnnotation(VirtualEntity.class) != null) {
this.table = null;
} else {
this.table = (t == null) ? type.getSimpleName().toLowerCase() : (t.catalog().isEmpty()) ? t.name() : (t.catalog() + '.' + t.name());
}
this.creator = Creator.create(type);
Attribute idAttr0 = null;
Map<String, String> aliasmap0 = null;
@@ -186,10 +190,10 @@ public final class EntityInfo<T> {
} while ((cltmp = cltmp.getSuperclass()) != Object.class);
this.primary = idAttr0;
this.aliasmap = aliasmap0;
{
this.queryAttributes = queryattrs.toArray(new Attribute[queryattrs.size()]);
this.insertAttributes = insertattrs.toArray(new Attribute[insertattrs.size()]);
this.updateAttributes = updateattrs.toArray(new Attribute[updateattrs.size()]);
this.queryAttributes = queryattrs.toArray(new Attribute[queryattrs.size()]);
this.insertAttributes = insertattrs.toArray(new Attribute[insertattrs.size()]);
this.updateAttributes = updateattrs.toArray(new Attribute[updateattrs.size()]);
if (table != null) {
StringBuilder insertsb = new StringBuilder();
StringBuilder insertsb2 = new StringBuilder();
for (String col : insertcols) {
@@ -207,13 +211,18 @@ public final class EntityInfo<T> {
this.updateSQL = "UPDATE " + table + " SET " + updatesb + " WHERE " + getPrimarySQLColumn() + " = ?";
this.deleteSQL = "DELETE FROM " + table + " WHERE " + getPrimarySQLColumn() + " = ?";
this.querySQL = "SELECT * FROM " + table + " WHERE " + getPrimarySQLColumn() + " = ?";
} else {
this.insertSQL = null;
this.updateSQL = null;
this.deleteSQL = null;
this.querySQL = null;
}
this.autoGenerated = auto;
this.distributed = sqldistribute;
this.allocationSize = allocationSize0;
//----------------cache--------------
Cacheable c = type.getAnnotation(Cacheable.class);
if (!cacheForbidden && c != null && c.value()) {
if (this.table == null || (!cacheForbidden && c != null && c.value())) {
this.cache = new EntityCache<>(type, creator, primary, attributes);
} else {
this.cache = null;
@@ -241,6 +250,15 @@ public final class EntityInfo<T> {
return type;
}
/**
* 是否虚拟类
* <p>
* @return
*/
public boolean isVirtualEntity() {
return table == null;
}
public String getTable() {
return table;
}

View File

@@ -5,7 +5,7 @@
*/
package com.wentch.redkale.source;
import static com.wentch.redkale.source.DataJDBCSource.*;
import static com.wentch.redkale.source.DataDefaultSource.*;
import java.io.*;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
@@ -40,7 +40,7 @@ public class JDBCPoolSource {
private final ConnectionEventListener listener;
private final DataJDBCSource dataSource;
private final DataDefaultSource dataSource;
private final String stype; // "" 或 "read" 或 "write"
@@ -52,7 +52,7 @@ public class JDBCPoolSource {
private String password;
public JDBCPoolSource(DataJDBCSource source, String stype, Properties prop) {
public JDBCPoolSource(DataDefaultSource source, String stype, Properties prop) {
this.dataSource = source;
this.stype = stype;
this.source = createDataSource(prop);

View File

@@ -0,0 +1,21 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.source;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
*
* @author zhangjx
*/
@Documented
@Target(TYPE)
@Retention(RUNTIME)
public @interface VirtualEntity {
}