This commit is contained in:
Redkale
2018-05-02 09:22:10 +08:00
parent 5e5280a7fd
commit d934f615ca
2 changed files with 93 additions and 25 deletions

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.source; package org.redkale.source;
import java.io.Serializable;
import java.net.URL; import java.net.URL;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
@@ -23,12 +24,13 @@ import org.redkale.util.*;
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
* @param <DBChannel> 数据库连接
*/ */
@Local @Local
@AutoLoad(false) @AutoLoad(false)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ResourceType(DataSource.class) @ResourceType(DataSource.class)
public abstract class DataSqlSource<Conn> extends AbstractService implements DataSource, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable { public abstract class DataSqlSource<DBChannel> extends AbstractService implements DataSource, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
protected static final Flipper FLIPPER_ONE = new Flipper(1); protected static final Flipper FLIPPER_ONE = new Flipper(1);
@@ -36,7 +38,7 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
protected String name; protected String name;
protected URL conf; protected URL confxml;
protected int threads; protected int threads;
@@ -44,9 +46,9 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
protected boolean cacheForbidden; protected boolean cacheForbidden;
protected PoolSource<Conn> readPool; protected PoolSource<DBChannel> readPool;
protected PoolSource<Conn> writePool; protected PoolSource<DBChannel> writePool;
@Resource(name = "$") @Resource(name = "$")
protected DataCacheListener cacheListener; protected DataCacheListener cacheListener;
@@ -57,8 +59,8 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true); protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"})
public DataSqlSource(String unitName, Properties readprop, Properties writeprop) { public DataSqlSource(String unitName, URL confxml, Properties readprop, Properties writeprop) {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
if (readprop != writeprop) { if (readprop != writeprop) {
@@ -82,7 +84,7 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
return t; return t;
}); });
this.name = unitName; this.name = unitName;
this.conf = null; this.confxml = confxml;
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
this.readPool = createReadPoolSource(this, "read", readprop); this.readPool = createReadPoolSource(this, "read", readprop);
this.writePool = createWritePoolSource(this, "write", writeprop); this.writePool = createWritePoolSource(this, "write", writeprop);
@@ -91,9 +93,9 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
//是否异步, 为true则只能调用pollAsync方法为false则只能调用poll方法 //是否异步, 为true则只能调用pollAsync方法为false则只能调用poll方法
protected abstract boolean isAysnc(); protected abstract boolean isAysnc();
protected abstract PoolSource<Conn> createReadPoolSource(DataSource source, String stype, Properties prop); protected abstract PoolSource<DBChannel> createReadPoolSource(DataSource source, String stype, Properties prop);
protected abstract PoolSource<Conn> createWritePoolSource(DataSource source, String stype, Properties prop); protected abstract PoolSource<DBChannel> createWritePoolSource(DataSource source, String stype, Properties prop);
@Override @Override
protected ExecutorService getExecutor() { protected ExecutorService getExecutor() {
@@ -124,12 +126,12 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
} }
@Local @Local
public PoolSource<Conn> getReadPoolSource() { public PoolSource<DBChannel> getReadPoolSource() {
return readPool; return readPool;
} }
@Local @Local
public PoolSource<Conn> getWritePoolSource() { public PoolSource<DBChannel> getWritePoolSource() {
return writePool; return writePool;
} }
@@ -160,7 +162,7 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
cache.fullLoad(); cache.fullLoad();
} }
//----------------------insertCache----------------------------- //----------------------------- insert -----------------------------
/** /**
* 新增对象, 必须是Entity对象 * 新增对象, 必须是Entity对象
* *
@@ -169,25 +171,43 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
*/ */
@Override @Override
public <T> void insert(@RpcCall(DataCallArrayAttribute.class) T... values) { public <T> void insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (isAysnc()) {
insertAsync(values).join();
} else {
if (values.length == 0) return; if (values.length == 0) return;
insert(readPool.poll(), loadEntityInfo((Class<T>) values[0].getClass()), values); final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
if (isAysnc()) {
insert(null, info, values).join();
} else {
insert(null, info, values);
}
} else {
if (isAysnc()) {
writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).join();
} else {
insert(writePool.poll(), info, values);
}
} }
} }
@Override @Override
public <T> CompletableFuture<Void> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { public <T> CompletableFuture<Void> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return completeVoidFuture();
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass()); final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
if (isAysnc()) { if (isAysnc()) {
return readPool.pollAsync().thenApply(conn -> insert(conn, info, values)).whenComplete(futureCompleteConsumer); return insert(null, info, values).whenComplete(futureCompleteConsumer);
} else { } else {
return CompletableFuture.runAsync(() -> insert(readPool.poll(), info, values), getExecutor()).whenComplete(futureCompleteConsumer); return CompletableFuture.runAsync(() -> insert(null, info, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
} else {
if (isAysnc()) {
return writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).whenComplete(futureCompleteConsumer);
} else {
return CompletableFuture.runAsync(() -> insert(writePool.poll(), info, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
} }
} }
protected <T> CompletableFuture<Void> insert(final Conn conn, final EntityInfo<T> info, T... values) { protected <T> CompletableFuture<Void> insert(final DBChannel conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return completeVoidFuture(); if (values.length == 0) return completeVoidFuture();
if (values.length > 1) { //检查对象是否都是同一个Entity类 if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null; Class clazz = null;
@@ -219,6 +239,54 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
return completeVoidFuture(); return completeVoidFuture();
} }
@Override
public <T> int insertCache(Class<T> clazz, T... values) {
if (values.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
int c = 0;
for (T value : values) {
c += cache.insert(value);
}
return c;
}
//----------------------------- delete -----------------------------
/**
* 删除对象, 必须是Entity对象
*
* @param <T> Entity类泛型
* @param values Entity对象
*
* @return 删除的数据条数
*/
@Override
public <T> int delete(T... values) {
if (values.length == 0) return -1;
return -1;
}
protected <T> CompletableFuture<Integer> delete(final DBChannel conn, final EntityInfo<T> info, Serializable... keys) {
if (keys.length == 0) return CompletableFuture.completedFuture(-1);
if (info.isVirtualEntity()) {
final EntityCache<T> cache = info.getCache();
if (cache == null) return CompletableFuture.completedFuture(-1);
int c = 0;
for (Serializable key : keys) {
c += cache.delete(key);
}
if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys);
return CompletableFuture.completedFuture(c);
}
if (isAysnc()) { //异步模式
} else {
}
return CompletableFuture.completedFuture(-1);
}
protected <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { protected <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
return null; return null;
} }

View File

@@ -19,9 +19,9 @@ import static org.redkale.source.DataSources.*;
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
* @param <T> 连接泛型 * @param <DBChannel> 连接泛型
*/ */
public abstract class PoolSource<T> { public abstract class PoolSource<DBChannel> {
protected final AtomicLong usingCounter = new AtomicLong(); protected final AtomicLong usingCounter = new AtomicLong();
@@ -135,11 +135,11 @@ public abstract class PoolSource<T> {
public abstract void change(Properties property); public abstract void change(Properties property);
public abstract T poll(); public abstract DBChannel poll();
public abstract CompletableFuture<T> pollAsync(); public abstract CompletableFuture<DBChannel> pollAsync();
public abstract void closeConnection(final T conn); public abstract void closeConnection(final DBChannel conn);
public abstract void close(); public abstract void close();