From d934f615caa6339d98a406f9c2047f74a51dee0c Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 2 May 2018 09:22:10 +0800 Subject: [PATCH] --- src/org/redkale/source/DataSqlSource.java | 108 ++++++++++++++++++---- src/org/redkale/source/PoolSource.java | 10 +- 2 files changed, 93 insertions(+), 25 deletions(-) diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 5ea7a10a0..1f1b88d4b 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -5,6 +5,7 @@ */ package org.redkale.source; +import java.io.Serializable; import java.net.URL; import java.util.*; import java.util.concurrent.*; @@ -23,12 +24,13 @@ import org.redkale.util.*; * 详情见: https://redkale.org * * @author zhangjx + * @param 数据库连接 */ @Local @AutoLoad(false) @SuppressWarnings("unchecked") @ResourceType(DataSource.class) -public abstract class DataSqlSource extends AbstractService implements DataSource, DataCacheListener, Function, AutoCloseable, Resourcable { +public abstract class DataSqlSource extends AbstractService implements DataSource, DataCacheListener, Function, AutoCloseable, Resourcable { protected static final Flipper FLIPPER_ONE = new Flipper(1); @@ -36,7 +38,7 @@ public abstract class DataSqlSource extends AbstractService implements Dat protected String name; - protected URL conf; + protected URL confxml; protected int threads; @@ -44,9 +46,9 @@ public abstract class DataSqlSource extends AbstractService implements Dat protected boolean cacheForbidden; - protected PoolSource readPool; + protected PoolSource readPool; - protected PoolSource writePool; + protected PoolSource writePool; @Resource(name = "$") protected DataCacheListener cacheListener; @@ -57,8 +59,8 @@ public abstract class DataSqlSource extends AbstractService implements Dat protected final BiFunction fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true); - @SuppressWarnings("OverridableMethodCallInConstructor") - public DataSqlSource(String unitName, Properties readprop, Properties writeprop) { + @SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"}) + public DataSqlSource(String unitName, URL confxml, Properties readprop, Properties writeprop) { final AtomicInteger counter = new AtomicInteger(); this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); if (readprop != writeprop) { @@ -82,7 +84,7 @@ public abstract class DataSqlSource extends AbstractService implements Dat return t; }); this.name = unitName; - this.conf = null; + this.confxml = confxml; this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); this.readPool = createReadPoolSource(this, "read", readprop); this.writePool = createWritePoolSource(this, "write", writeprop); @@ -91,9 +93,9 @@ public abstract class DataSqlSource extends AbstractService implements Dat //是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法 protected abstract boolean isAysnc(); - protected abstract PoolSource createReadPoolSource(DataSource source, String stype, Properties prop); + protected abstract PoolSource createReadPoolSource(DataSource source, String stype, Properties prop); - protected abstract PoolSource createWritePoolSource(DataSource source, String stype, Properties prop); + protected abstract PoolSource createWritePoolSource(DataSource source, String stype, Properties prop); @Override protected ExecutorService getExecutor() { @@ -124,12 +126,12 @@ public abstract class DataSqlSource extends AbstractService implements Dat } @Local - public PoolSource getReadPoolSource() { + public PoolSource getReadPoolSource() { return readPool; } @Local - public PoolSource getWritePoolSource() { + public PoolSource getWritePoolSource() { return writePool; } @@ -160,7 +162,7 @@ public abstract class DataSqlSource extends AbstractService implements Dat cache.fullLoad(); } - //----------------------insertCache----------------------------- + //----------------------------- insert ----------------------------- /** * 新增对象, 必须是Entity对象 * @@ -169,25 +171,43 @@ public abstract class DataSqlSource extends AbstractService implements Dat */ @Override public void insert(@RpcCall(DataCallArrayAttribute.class) T... values) { - if (isAysnc()) { - insertAsync(values).join(); + if (values.length == 0) return; + final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); + if (info.isVirtualEntity()) { + if (isAysnc()) { + insert(null, info, values).join(); + } else { + insert(null, info, values); + } } else { - if (values.length == 0) return; - insert(readPool.poll(), loadEntityInfo((Class) values[0].getClass()), values); + if (isAysnc()) { + writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).join(); + } else { + insert(writePool.poll(), info, values); + } } } @Override public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { + if (values.length == 0) return completeVoidFuture(); final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); - if (isAysnc()) { - return readPool.pollAsync().thenApply(conn -> insert(conn, info, values)).whenComplete(futureCompleteConsumer); + if (info.isVirtualEntity()) { + if (isAysnc()) { + return insert(null, info, values).whenComplete(futureCompleteConsumer); + } else { + return CompletableFuture.runAsync(() -> insert(null, info, values), getExecutor()).whenComplete(futureCompleteConsumer); + } } else { - return CompletableFuture.runAsync(() -> insert(readPool.poll(), info, values), getExecutor()).whenComplete(futureCompleteConsumer); + if (isAysnc()) { + return writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).whenComplete(futureCompleteConsumer); + } else { + return CompletableFuture.runAsync(() -> insert(writePool.poll(), info, values), getExecutor()).whenComplete(futureCompleteConsumer); + } } } - protected CompletableFuture insert(final Conn conn, final EntityInfo info, T... values) { + protected CompletableFuture insert(final DBChannel conn, final EntityInfo info, T... values) { if (values.length == 0) return completeVoidFuture(); if (values.length > 1) { //检查对象是否都是同一个Entity类 Class clazz = null; @@ -219,6 +239,54 @@ public abstract class DataSqlSource extends AbstractService implements Dat return completeVoidFuture(); } + @Override + public int insertCache(Class clazz, T... values) { + if (values.length == 0) return 0; + final EntityInfo info = loadEntityInfo(clazz); + final EntityCache cache = info.getCache(); + if (cache == null) return -1; + int c = 0; + for (T value : values) { + c += cache.insert(value); + } + return c; + } + + //----------------------------- delete ----------------------------- + /** + * 删除对象, 必须是Entity对象 + * + * @param Entity类泛型 + * @param values Entity对象 + * + * @return 删除的数据条数 + */ + @Override + public int delete(T... values) { + if (values.length == 0) return -1; + return -1; + } + + protected CompletableFuture delete(final DBChannel conn, final EntityInfo info, Serializable... keys) { + if (keys.length == 0) return CompletableFuture.completedFuture(-1); + if (info.isVirtualEntity()) { + final EntityCache cache = info.getCache(); + if (cache == null) return CompletableFuture.completedFuture(-1); + int c = 0; + for (Serializable key : keys) { + c += cache.delete(key); + } + if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys); + return CompletableFuture.completedFuture(c); + } + if (isAysnc()) { //异步模式 + + } else { + + } + return CompletableFuture.completedFuture(-1); + } + protected Sheet querySheet(final boolean readcache, final boolean needtotal, final Class clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { return null; } diff --git a/src/org/redkale/source/PoolSource.java b/src/org/redkale/source/PoolSource.java index e3f974f7d..8dccfe342 100644 --- a/src/org/redkale/source/PoolSource.java +++ b/src/org/redkale/source/PoolSource.java @@ -19,9 +19,9 @@ import static org.redkale.source.DataSources.*; * 详情见: https://redkale.org * * @author zhangjx - * @param 连接泛型 + * @param 连接泛型 */ -public abstract class PoolSource { +public abstract class PoolSource { protected final AtomicLong usingCounter = new AtomicLong(); @@ -135,11 +135,11 @@ public abstract class PoolSource { public abstract void change(Properties property); - public abstract T poll(); + public abstract DBChannel poll(); - public abstract CompletableFuture pollAsync(); + public abstract CompletableFuture pollAsync(); - public abstract void closeConnection(final T conn); + public abstract void closeConnection(final DBChannel conn); public abstract void close();