From 13bd4671521a7fbafa740419061f3d2066fe247f Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 2 May 2018 14:59:57 +0800 Subject: [PATCH] --- src/org/redkale/source/DataJdbcSource.java | 11 +- src/org/redkale/source/DataSources.java | 16 ++- src/org/redkale/source/DataSqlJdbcSource.java | 41 +++++++ src/org/redkale/source/DataSqlSource.java | 103 ++++++++++++++++-- src/org/redkale/source/PoolJdbcSource.java | 26 +++-- 5 files changed, 164 insertions(+), 33 deletions(-) create mode 100644 src/org/redkale/source/DataSqlJdbcSource.java diff --git a/src/org/redkale/source/DataJdbcSource.java b/src/org/redkale/source/DataJdbcSource.java index e71bb390c..cc24a1180 100644 --- a/src/org/redkale/source/DataJdbcSource.java +++ b/src/org/redkale/source/DataJdbcSource.java @@ -39,7 +39,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC protected String name; - protected URL conf; + protected URL persistxml; protected int threads; @@ -60,10 +60,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC protected final BiFunction fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true); - public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) { + public DataJdbcSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { + this.persistxml = persistxml; this.preConstruct(unitName, readprop, writeprop); this.initByProperties(unitName, readprop, writeprop); - } public DataJdbcSource() { @@ -130,10 +130,9 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC protected void initByProperties(String unitName, Properties readprop, Properties writeprop) { this.name = unitName; - this.conf = null; this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); - this.readPool = new PoolJdbcSource(this, "read", readprop); - this.writePool = new PoolJdbcSource(this, "write", writeprop); + this.readPool = new PoolJdbcSource(unitName, persistxml, "read", readprop, logger); + this.writePool = new PoolJdbcSource(unitName, persistxml, "write", writeprop, logger); } @Local diff --git a/src/org/redkale/source/DataSources.java b/src/org/redkale/source/DataSources.java index 08782bb10..999c2b033 100644 --- a/src/org/redkale/source/DataSources.java +++ b/src/org/redkale/source/DataSources.java @@ -25,6 +25,8 @@ public final class DataSources { public static final String JDBC_CONNECTIONSMAX = "javax.persistence.connections.limit"; + public static final String JDBC_CONNECTIONSCAPACITY = "javax.persistence.connections.bufcapacity"; + public static final String JDBC_CONTAIN_SQLTEMPLATE = "javax.persistence.contain.sqltemplate"; public static final String JDBC_NOTCONTAIN_SQLTEMPLATE = "javax.persistence.notcontain.sqltemplate"; @@ -53,11 +55,11 @@ public final class DataSources { } public static DataSource createDataSource(final String unitName, Properties prop) throws IOException { - return new DataJdbcSource(unitName, prop, prop); + return new DataJdbcSource(unitName, null, prop, prop); } public static DataSource createDataSource(final String unitName, Properties readprop, Properties writeprop) throws IOException { - return new DataJdbcSource(unitName, readprop, writeprop); + return new DataJdbcSource(unitName, null, readprop, writeprop); } public static DataSource createDataSource(final String unitName) throws IOException { @@ -66,9 +68,9 @@ public final class DataSources { : new File(System.getProperty(DATASOURCE_CONFPATH)).toURI().toURL()); } - public static DataSource createDataSource(final String unitName, URL url) throws IOException { - if (url == null) url = DataSources.class.getResource("/persistence.xml"); - InputStream in = url.openStream(); + public static DataSource createDataSource(final String unitName, URL persistxml) throws IOException { + if (persistxml == null) persistxml = DataSources.class.getResource("/persistence.xml"); + InputStream in = persistxml.openStream(); if (in == null) return null; Map map = loadPersistenceXml(in); Properties readprop = null; @@ -100,7 +102,7 @@ public final class DataSources { if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource"); if (writeprop == null) writeprop = readprop; String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName()); - if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, readprop, writeprop); + if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, persistxml, readprop, writeprop); try { Class ds = Thread.currentThread().getContextClassLoader().loadClass(impl); for (Constructor d : ds.getConstructors()) { @@ -111,6 +113,8 @@ public final class DataSources { return (DataSource) d.newInstance(unitName, readprop); } else if (paramtypes.length == 3 && paramtypes[0] == String.class && paramtypes[1] == Properties.class && paramtypes[2] == Properties.class) { return (DataSource) d.newInstance(unitName, readprop, writeprop); + } else if (paramtypes.length == 4 && paramtypes[0] == String.class && paramtypes[1] == URL.class && paramtypes[2] == Properties.class && paramtypes[3] == Properties.class) { + return (DataSource) d.newInstance(unitName, persistxml, readprop, writeprop); } } throw new IOException("DataSource impl class (" + impl + ") have no Constructor by (Properties prop) or (String name, Properties prop) or (String name, Properties readprop, Propertieswriteprop)"); diff --git a/src/org/redkale/source/DataSqlJdbcSource.java b/src/org/redkale/source/DataSqlJdbcSource.java new file mode 100644 index 000000000..a5d131f4d --- /dev/null +++ b/src/org/redkale/source/DataSqlJdbcSource.java @@ -0,0 +1,41 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.source; + +import java.net.URL; +import java.sql.Connection; +import java.util.Properties; +import org.redkale.service.Local; +import org.redkale.util.*; + +/** + * DataSource的JDBC实现类 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +@Local +@AutoLoad(false) +@SuppressWarnings("unchecked") +@ResourceType(DataSource.class) +public abstract class DataSqlJdbcSource extends DataSqlSource { + + public DataSqlJdbcSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { + super(unitName, persistxml, readprop, writeprop); + } + + @Override + protected final String getPrepareParamSign(int index) { + return "?"; + } + + @Override + protected final boolean isAysnc() { + return false; + } +} diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 1f1b88d4b..58d10b619 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -7,9 +7,10 @@ package org.redkale.source; import java.io.Serializable; import java.net.URL; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.logging.*; import javax.annotation.Resource; @@ -38,11 +39,13 @@ public abstract class DataSqlSource extends AbstractService implement protected String name; - protected URL confxml; + protected URL persistxml; protected int threads; - protected ExecutorService executor; + protected ObjectPool bufferPool; + + protected ThreadPoolExecutor executor; protected boolean cacheForbidden; @@ -60,7 +63,7 @@ public abstract class DataSqlSource extends AbstractService implement protected final BiFunction fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true); @SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"}) - public DataSqlSource(String unitName, URL confxml, Properties readprop, Properties writeprop) { + public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { final AtomicInteger counter = new AtomicInteger(); this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); if (readprop != writeprop) { @@ -70,7 +73,7 @@ public abstract class DataSqlSource extends AbstractService implement final Thread.UncaughtExceptionHandler ueh = (t, e) -> { logger.log(Level.SEVERE, cname + " error", e); }; - this.executor = Executors.newFixedThreadPool(threads, (Runnable r) -> { + this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> { Thread t = new Thread(r); t.setDaemon(true); String s = "" + counter.incrementAndGet(); @@ -83,19 +86,28 @@ public abstract class DataSqlSource extends AbstractService implement t.setUncaughtExceptionHandler(ueh); return t; }); + final int bufferCapacity = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSCAPACITY, "" + 16 * 1024)); + this.bufferPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), this.threads, + (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; + e.clear(); + return true; + }); this.name = unitName; - this.confxml = confxml; + this.persistxml = persistxml; this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); - this.readPool = createReadPoolSource(this, "read", readprop); - this.writePool = createWritePoolSource(this, "write", writeprop); + this.readPool = createPoolSource(this, "read", readprop); + this.writePool = createPoolSource(this, "write", writeprop); } //是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法 protected abstract boolean isAysnc(); - protected abstract PoolSource createReadPoolSource(DataSource source, String stype, Properties prop); + //index从1开始 + protected abstract String getPrepareParamSign(int index); - protected abstract PoolSource createWritePoolSource(DataSource source, String stype, Properties prop); + //创建连接池 + protected abstract PoolSource createPoolSource(DataSource source, String rwtype, Properties prop); @Override protected ExecutorService getExecutor() { @@ -279,6 +291,7 @@ public abstract class DataSqlSource extends AbstractService implement if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys); return CompletableFuture.completedFuture(c); } + //待实现 if (isAysnc()) { //异步模式 } else { @@ -287,6 +300,76 @@ public abstract class DataSqlSource extends AbstractService implement return CompletableFuture.completedFuture(-1); } + //----------------------------- find ----------------------------- + /** + * 根据主键获取对象 + * + * @param Entity类的泛型 + * @param clazz Entity类 + * @param pk 主键值 + * + * @return Entity对象 + */ + @Override + public T find(Class clazz, Serializable pk) { + return find(clazz, (SelectColumn) null, pk); + } + + @Override + public CompletableFuture findAsync(final Class clazz, final Serializable pk) { + return findAsync(clazz, (SelectColumn) null, pk); + } + + @Override + public T find(Class clazz, final SelectColumn selects, Serializable pk) { + final EntityInfo info = loadEntityInfo(clazz); + final EntityCache cache = info.getCache(); + if (cache != null) { + T rs = cache.find(selects, pk); + if (cache.isFullLoaded() || rs != null) return rs; + } + if (info.isVirtualEntity()) { + return find(null, info, selects, pk).join(); + } else { + if (isAysnc()) { + return readPool.pollAsync().thenCompose(conn -> find(conn, info, selects, pk)).join(); + } else { + return find(readPool.poll(), info, selects, pk).join(); + } + } + } + + @Override + public CompletableFuture findAsync(final Class clazz, final SelectColumn selects, final Serializable pk) { + final EntityInfo info = loadEntityInfo(clazz); + final EntityCache cache = info.getCache(); + if (cache != null) { + T rs = cache.find(selects, pk); + if (cache.isFullLoaded() || rs != null) return CompletableFuture.completedFuture(rs); + } + if (info.isVirtualEntity()) { + if (isAysnc()) { + return find(null, info, selects, pk); + } else { + return CompletableFuture.supplyAsync(() -> find(null, info, selects, pk).join(), getExecutor()); + } + } else { + if (isAysnc()) { + return readPool.pollAsync().thenCompose(conn -> find(conn, info, selects, pk)); + } else { + return CompletableFuture.supplyAsync(() -> find(readPool.poll(), info, selects, pk).join(), getExecutor()); + } + } + } + + protected CompletableFuture find(final DBChannel conn, final EntityInfo info, final SelectColumn selects, Serializable pk) { + final SelectColumn sels = selects; + final String sql = "SELECT " + info.getQueryColumns(null, sels) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk); + if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " find sql=" + sql); + //待实现 + return null; + } + protected Sheet querySheet(final boolean readcache, final boolean needtotal, final Class clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { return null; } diff --git a/src/org/redkale/source/PoolJdbcSource.java b/src/org/redkale/source/PoolJdbcSource.java index 5ccdd3c56..ee86c5d3e 100644 --- a/src/org/redkale/source/PoolJdbcSource.java +++ b/src/org/redkale/source/PoolJdbcSource.java @@ -8,12 +8,13 @@ package org.redkale.source; import java.io.*; import java.lang.ref.WeakReference; import java.lang.reflect.Method; +import java.net.URL; import java.nio.file.*; import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; import java.sql.*; import java.util.*; import java.util.concurrent.*; -import java.util.logging.Level; +import java.util.logging.*; import javax.sql.*; import static org.redkale.source.DataSources.*; @@ -34,11 +35,14 @@ public class PoolJdbcSource extends PoolSource { private final ConnectionEventListener listener; - private final DataJdbcSource dataSource; + private final String unitName; - public PoolJdbcSource(DataJdbcSource source, String stype, Properties prop) { - super(stype, prop, source.logger); - this.dataSource = source; + private final URL persistxml; + + public PoolJdbcSource(String unitName, URL persistxml, String rwtype, Properties prop, Logger logger) { + super(rwtype, prop, logger); + this.unitName = unitName; + this.persistxml = persistxml; this.source = createDataSource(prop); this.queue = new ArrayBlockingQueue<>(this.maxconns); this.listener = new ConnectionEventListener() { @@ -68,7 +72,7 @@ public class PoolJdbcSource extends PoolSource { try { this.watch(); } catch (Exception e) { - logger.log(Level.WARNING, DataSource.class.getSimpleName() + " watch " + dataSource.conf + " error", e); + logger.log(Level.WARNING, DataSource.class.getSimpleName() + " watch " + persistxml + " error", e); } } @@ -165,8 +169,8 @@ public class PoolJdbcSource extends PoolSource { } private void watch() throws IOException { - if (dataSource.conf == null || dataSource.name == null) return; - final String file = dataSource.conf.getFile(); + if (persistxml == null || unitName == null) return; + final String file = persistxml.getFile(); final File f = new File(file); if (!f.isFile() || !f.canRead()) return; synchronized (maps) { @@ -198,8 +202,8 @@ public class PoolJdbcSource extends PoolSource { PoolJdbcSource pool = ref.get(); if (pool == null) continue; try { - Properties property = m.get(pool.dataSource.name); - if (property == null) property = m.get(pool.dataSource.name + "." + pool.rwtype); + Properties property = m.get(unitName); + if (property == null) property = m.get(unitName + "." + pool.rwtype); if (property != null) pool.change(property); } catch (Exception ex) { logger.log(Level.INFO, event.context() + " occur error", ex); @@ -244,7 +248,7 @@ public class PoolJdbcSource extends PoolSource { this.url = newurl; this.username = newuser; this.password = newpassword; - logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + dataSource.name + "." + rwtype + ") change (" + property + ")"); + logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + unitName + "." + rwtype + ") change (" + property + ")"); } catch (Exception e) { logger.log(Level.SEVERE, DataSource.class.getSimpleName() + " dynamic change JDBC (url userName password) error", e); }