This commit is contained in:
Redkale
2018-05-02 14:59:57 +08:00
parent d934f615ca
commit 13bd467152
5 changed files with 164 additions and 33 deletions

View File

@@ -39,7 +39,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected String name;
protected URL conf;
protected URL persistxml;
protected int threads;
@@ -60,10 +60,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
public DataJdbcSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
this.persistxml = persistxml;
this.preConstruct(unitName, readprop, writeprop);
this.initByProperties(unitName, readprop, writeprop);
}
public DataJdbcSource() {
@@ -130,10 +130,9 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected void initByProperties(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName;
this.conf = null;
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
this.readPool = new PoolJdbcSource(this, "read", readprop);
this.writePool = new PoolJdbcSource(this, "write", writeprop);
this.readPool = new PoolJdbcSource(unitName, persistxml, "read", readprop, logger);
this.writePool = new PoolJdbcSource(unitName, persistxml, "write", writeprop, logger);
}
@Local

View File

@@ -25,6 +25,8 @@ public final class DataSources {
public static final String JDBC_CONNECTIONSMAX = "javax.persistence.connections.limit";
public static final String JDBC_CONNECTIONSCAPACITY = "javax.persistence.connections.bufcapacity";
public static final String JDBC_CONTAIN_SQLTEMPLATE = "javax.persistence.contain.sqltemplate";
public static final String JDBC_NOTCONTAIN_SQLTEMPLATE = "javax.persistence.notcontain.sqltemplate";
@@ -53,11 +55,11 @@ public final class DataSources {
}
public static DataSource createDataSource(final String unitName, Properties prop) throws IOException {
return new DataJdbcSource(unitName, prop, prop);
return new DataJdbcSource(unitName, null, prop, prop);
}
public static DataSource createDataSource(final String unitName, Properties readprop, Properties writeprop) throws IOException {
return new DataJdbcSource(unitName, readprop, writeprop);
return new DataJdbcSource(unitName, null, readprop, writeprop);
}
public static DataSource createDataSource(final String unitName) throws IOException {
@@ -66,9 +68,9 @@ public final class DataSources {
: new File(System.getProperty(DATASOURCE_CONFPATH)).toURI().toURL());
}
public static DataSource createDataSource(final String unitName, URL url) throws IOException {
if (url == null) url = DataSources.class.getResource("/persistence.xml");
InputStream in = url.openStream();
public static DataSource createDataSource(final String unitName, URL persistxml) throws IOException {
if (persistxml == null) persistxml = DataSources.class.getResource("/persistence.xml");
InputStream in = persistxml.openStream();
if (in == null) return null;
Map<String, Properties> map = loadPersistenceXml(in);
Properties readprop = null;
@@ -100,7 +102,7 @@ public final class DataSources {
if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource");
if (writeprop == null) writeprop = readprop;
String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName());
if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, readprop, writeprop);
if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, persistxml, readprop, writeprop);
try {
Class ds = Thread.currentThread().getContextClassLoader().loadClass(impl);
for (Constructor d : ds.getConstructors()) {
@@ -111,6 +113,8 @@ public final class DataSources {
return (DataSource) d.newInstance(unitName, readprop);
} else if (paramtypes.length == 3 && paramtypes[0] == String.class && paramtypes[1] == Properties.class && paramtypes[2] == Properties.class) {
return (DataSource) d.newInstance(unitName, readprop, writeprop);
} else if (paramtypes.length == 4 && paramtypes[0] == String.class && paramtypes[1] == URL.class && paramtypes[2] == Properties.class && paramtypes[3] == Properties.class) {
return (DataSource) d.newInstance(unitName, persistxml, readprop, writeprop);
}
}
throw new IOException("DataSource impl class (" + impl + ") have no Constructor by (Properties prop) or (String name, Properties prop) or (String name, Properties readprop, Propertieswriteprop)");

View File

@@ -0,0 +1,41 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.source;
import java.net.URL;
import java.sql.Connection;
import java.util.Properties;
import org.redkale.service.Local;
import org.redkale.util.*;
/**
* DataSource的JDBC实现类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@Local
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(DataSource.class)
public abstract class DataSqlJdbcSource extends DataSqlSource<Connection> {
public DataSqlJdbcSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
super(unitName, persistxml, readprop, writeprop);
}
@Override
protected final String getPrepareParamSign(int index) {
return "?";
}
@Override
protected final boolean isAysnc() {
return false;
}
}

View File

@@ -7,9 +7,10 @@ package org.redkale.source;
import java.io.Serializable;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import javax.annotation.Resource;
@@ -38,11 +39,13 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
protected String name;
protected URL confxml;
protected URL persistxml;
protected int threads;
protected ExecutorService executor;
protected ObjectPool<ByteBuffer> bufferPool;
protected ThreadPoolExecutor executor;
protected boolean cacheForbidden;
@@ -60,7 +63,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
@SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"})
public DataSqlSource(String unitName, URL confxml, Properties readprop, Properties writeprop) {
public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
final AtomicInteger counter = new AtomicInteger();
this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
if (readprop != writeprop) {
@@ -70,7 +73,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final Thread.UncaughtExceptionHandler ueh = (t, e) -> {
logger.log(Level.SEVERE, cname + " error", e);
};
this.executor = Executors.newFixedThreadPool(threads, (Runnable r) -> {
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
String s = "" + counter.incrementAndGet();
@@ -83,19 +86,28 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
t.setUncaughtExceptionHandler(ueh);
return t;
});
final int bufferCapacity = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSCAPACITY, "" + 16 * 1024));
this.bufferPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), this.threads,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
e.clear();
return true;
});
this.name = unitName;
this.confxml = confxml;
this.persistxml = persistxml;
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
this.readPool = createReadPoolSource(this, "read", readprop);
this.writePool = createWritePoolSource(this, "write", writeprop);
this.readPool = createPoolSource(this, "read", readprop);
this.writePool = createPoolSource(this, "write", writeprop);
}
//是否异步, 为true则只能调用pollAsync方法为false则只能调用poll方法
protected abstract boolean isAysnc();
protected abstract PoolSource<DBChannel> createReadPoolSource(DataSource source, String stype, Properties prop);
//index从1开始
protected abstract String getPrepareParamSign(int index);
protected abstract PoolSource<DBChannel> createWritePoolSource(DataSource source, String stype, Properties prop);
//创建连接池
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, Properties prop);
@Override
protected ExecutorService getExecutor() {
@@ -279,6 +291,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys);
return CompletableFuture.completedFuture(c);
}
//待实现
if (isAysnc()) { //异步模式
} else {
@@ -287,6 +300,76 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(-1);
}
//----------------------------- find -----------------------------
/**
* 根据主键获取对象
*
* @param <T> Entity类的泛型
* @param clazz Entity类
* @param pk 主键值
*
* @return Entity对象
*/
@Override
public <T> T find(Class<T> clazz, Serializable pk) {
return find(clazz, (SelectColumn) null, pk);
}
@Override
public <T> CompletableFuture<T> findAsync(final Class<T> clazz, final Serializable pk) {
return findAsync(clazz, (SelectColumn) null, pk);
}
@Override
public <T> T find(Class<T> clazz, final SelectColumn selects, Serializable pk) {
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache != null) {
T rs = cache.find(selects, pk);
if (cache.isFullLoaded() || rs != null) return rs;
}
if (info.isVirtualEntity()) {
return find(null, info, selects, pk).join();
} else {
if (isAysnc()) {
return readPool.pollAsync().thenCompose(conn -> find(conn, info, selects, pk)).join();
} else {
return find(readPool.poll(), info, selects, pk).join();
}
}
}
@Override
public <T> CompletableFuture<T> findAsync(final Class<T> clazz, final SelectColumn selects, final Serializable pk) {
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache != null) {
T rs = cache.find(selects, pk);
if (cache.isFullLoaded() || rs != null) return CompletableFuture.completedFuture(rs);
}
if (info.isVirtualEntity()) {
if (isAysnc()) {
return find(null, info, selects, pk);
} else {
return CompletableFuture.supplyAsync(() -> find(null, info, selects, pk).join(), getExecutor());
}
} else {
if (isAysnc()) {
return readPool.pollAsync().thenCompose(conn -> find(conn, info, selects, pk));
} else {
return CompletableFuture.supplyAsync(() -> find(readPool.poll(), info, selects, pk).join(), getExecutor());
}
}
}
protected <T> CompletableFuture<T> find(final DBChannel conn, final EntityInfo<T> info, final SelectColumn selects, Serializable pk) {
final SelectColumn sels = selects;
final String sql = "SELECT " + info.getQueryColumns(null, sels) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " find sql=" + sql);
//待实现
return null;
}
protected <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
return null;
}

View File

@@ -8,12 +8,13 @@ package org.redkale.source;
import java.io.*;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.*;
import javax.sql.*;
import static org.redkale.source.DataSources.*;
@@ -34,11 +35,14 @@ public class PoolJdbcSource extends PoolSource<Connection> {
private final ConnectionEventListener listener;
private final DataJdbcSource dataSource;
private final String unitName;
public PoolJdbcSource(DataJdbcSource source, String stype, Properties prop) {
super(stype, prop, source.logger);
this.dataSource = source;
private final URL persistxml;
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, Properties prop, Logger logger) {
super(rwtype, prop, logger);
this.unitName = unitName;
this.persistxml = persistxml;
this.source = createDataSource(prop);
this.queue = new ArrayBlockingQueue<>(this.maxconns);
this.listener = new ConnectionEventListener() {
@@ -68,7 +72,7 @@ public class PoolJdbcSource extends PoolSource<Connection> {
try {
this.watch();
} catch (Exception e) {
logger.log(Level.WARNING, DataSource.class.getSimpleName() + " watch " + dataSource.conf + " error", e);
logger.log(Level.WARNING, DataSource.class.getSimpleName() + " watch " + persistxml + " error", e);
}
}
@@ -165,8 +169,8 @@ public class PoolJdbcSource extends PoolSource<Connection> {
}
private void watch() throws IOException {
if (dataSource.conf == null || dataSource.name == null) return;
final String file = dataSource.conf.getFile();
if (persistxml == null || unitName == null) return;
final String file = persistxml.getFile();
final File f = new File(file);
if (!f.isFile() || !f.canRead()) return;
synchronized (maps) {
@@ -198,8 +202,8 @@ public class PoolJdbcSource extends PoolSource<Connection> {
PoolJdbcSource pool = ref.get();
if (pool == null) continue;
try {
Properties property = m.get(pool.dataSource.name);
if (property == null) property = m.get(pool.dataSource.name + "." + pool.rwtype);
Properties property = m.get(unitName);
if (property == null) property = m.get(unitName + "." + pool.rwtype);
if (property != null) pool.change(property);
} catch (Exception ex) {
logger.log(Level.INFO, event.context() + " occur error", ex);
@@ -244,7 +248,7 @@ public class PoolJdbcSource extends PoolSource<Connection> {
this.url = newurl;
this.username = newuser;
this.password = newpassword;
logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + dataSource.name + "." + rwtype + ") change (" + property + ")");
logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + unitName + "." + rwtype + ") change (" + property + ")");
} catch (Exception e) {
logger.log(Level.SEVERE, DataSource.class.getSimpleName() + " dynamic change JDBC (url userName password) error", e);
}