1、包结构修改

2、dict的bug修改
3、arangodb查询find()默认1000条修改为库里所有数据
This commit is contained in:
2019-07-02 17:37:55 +08:00
parent bbf9b8b9c4
commit 972fb387c7
44 changed files with 397 additions and 248 deletions

View File

@@ -0,0 +1,37 @@
package net.tccn.base.dbq.jdbc.api;
import lombok.Data;
import net.tccn.base.arango.Doc;
import javax.persistence.Table;
/**
* 数据库平台
* @author: liangxianyou at 2018/11/14 12:58.
*/
@Data
@Table(name = "db_plat", catalog = "db_dev")
public class DbAccount extends Doc<DbAccount> {
public static DbAccount dao = dao(DbAccount.class);
private String name; //名称
private String cate; //类型 mysql|ArangoDb
private String remark; //备注
private String url; //数据库连接地址
private String user; //账号
private String pwd; //密码
private String[] catalogs; //库
//----------------------------
public String accountKey() {
int start = url.indexOf("//") + 2;
int end = url.indexOf("/", start);
int endDef = url.indexOf("?", end);
if (endDef == -1) {
endDef = url.length();
}
String host = url.substring(start, end == -1 ? url.length() : end);
return user + "@" + host;
}
}

View File

@@ -0,0 +1,104 @@
package net.tccn.base.dbq.jdbc.api;
import net.tccn.base.X;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Db 最终执行层
* Created by liangxianyou at 2019/3/12 14:11.
*/
public class DbKit implements DbSource{
private DbAccount dbAccount;
private DbSource dbSource;
private String catalog;
/*public DbKit(DbAccount dbAccount) {
this.dbAccount = dbAccount;
try {
DbSource dbSource = X.getDbSource(DbSource.class, dbAccount.getCate());
dbSource.setDbAccount(dbAccount);
this.dbSource = dbSource;
} catch (Exception e) {
throw new IllegalArgumentException(String.format("创建DbKit失败数据库类型[cate:%s]未知", dbAccount.getCate()));
}
}*/
public DbKit(DbAccount dbAccount, String catalog) {
this.dbAccount = dbAccount;
this.catalog = catalog;
try {
DbSource dbSource = X.getDbSource(DbSource.class, dbAccount.getCate());
dbSource.setDbAccount(dbAccount);
dbSource.setCatalog(catalog);
this.dbSource = dbSource;
} catch (Exception e) {
throw new IllegalArgumentException(String.format("创建DbKit失败数据库类型[cate:%s]未知", dbAccount.getCate()));
}
}
@Override
public void setDbAccount(DbAccount dbAccount) {
this.dbAccount = dbAccount;
dbSource.setDbAccount(dbAccount);
}
@Override
public void setCatalog(String catelog) {
this.catalog = catelog;
dbSource.setCatalog(catalog);
}
@Override
public <T> List<T> findList(String sql, Class<T> type) {
return dbSource.findList(sql, type);
}
@Override
public <T> T findFirst(String sql, Class<T> type) {
return dbSource.findFirst(sql, type);
}
@Override
public <T> T queryColumn(String sql, Class<T> type) {
return dbSource.queryColumn(sql, type);
}
@Override
public void createTable(String sql) {
dbSource.createTable(sql);
}
@Override
public void dropTable(String tableName) {
dbSource.dropTable(tableName);
}
public void exetute(String sql) {
dbSource.exetute(sql);
}
// -----------------------------------------
public <T> CompletableFuture<T> findfirstAsync(String sql, Class<T> type) {
return CompletableFuture.supplyAsync(() -> findFirst(sql, type));
}
public <T> CompletableFuture<List<T>> findListAsync(String sql, Class<T> type) {
return CompletableFuture.supplyAsync(() -> findList(sql, type));
}
public <T> CompletableFuture<T> queryColumnAsync(String sql, Class<T> type) {
return CompletableFuture.supplyAsync(() -> queryColumn(sql, type));
}
public CompletableFuture<Void> exetuteAsync(String sql) {
return CompletableFuture.runAsync(() -> exetute(sql));
}
@Override
public String getType() {
return null;
}
}

View File

@@ -0,0 +1,45 @@
package net.tccn.base.dbq.jdbc.api;
import net.tccn.base.IService;
import java.util.Date;
import java.util.List;
/**
* Created by liangxianyou at 2019/3/12 14:07.
*/
public interface DbSource extends IService {
void setDbAccount(DbAccount dbAccount);
void setCatalog(String catelog);
<T> List<T> findList(String sql, Class<T> type);
<T> T findFirst(String sql, Class<T> type);
<T> T queryColumn(String sql, Class<T> type);
//待实现
default <T> void save(String tableName, T t) {}
//待实现
default <T> void update(String tableName, T t) {}
default int queryInt(String sql) {
return queryColumn(sql, int.class);
}
default long queryLong(String sql) {
return queryColumn(sql, long.class);
}
default double queryDouble(String sql) {
return queryColumn(sql, double.class);
}
default Date queryDate(String sql) {
return queryColumn(sql, Date.class);
}
void createTable(String sql);
void dropTable(String tableName);
void exetute(String sql);
}

View File

@@ -0,0 +1,226 @@
package net.tccn.base.dbq.jdbc.api;
import net.tccn.base.CfgException;
import net.tccn.base.Kv;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by liangxianyou at 2019/3/12 14:20.
*/
public class DbSourceMysql implements DbSource {
private static ConcurrentHashMap<String, LinkedBlockingQueue<Connection>> conns = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, AtomicInteger> counter = new ConcurrentHashMap<>();
private String accountKey;
private DbAccount dbAccount;
private String catalog;
public DbSourceMysql() {
}
public void setDbAccount(DbAccount dbAccount) {
this.dbAccount = dbAccount;
this.accountKey = dbAccount.accountKey();
}
@Override
public void setCatalog(String catalog) {
this.catalog = catalog;
}
public DbSourceMysql(DbAccount dbAccount) {
this.dbAccount = dbAccount;
this.accountKey = dbAccount.accountKey();
}
public DbSourceMysql(DbAccount dbAccount, String catalog) {
this.dbAccount = dbAccount;
this.catalog = catalog;
this.accountKey = dbAccount.accountKey();
}
@Override
public String getType() {
return "mysql";
}
@Override
public <T> List<T> findList(String sql, Class<T> type) {
Connection connection = connection();
try (
PreparedStatement ps = connection.prepareStatement(sql);
ResultSet rs = ps.executeQuery()) {
List list = new ArrayList();
while (rs.next()) {
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
Map row = new HashMap();
for (int i = 1; i <= count; i++) {
String columnTypeName = metaData.getColumnTypeName(i);
//String columnName = metaData.getColumnName(i);
String columnLabel = metaData.getColumnLabel(i);
row.put(columnLabel, null);
if (rs.getObject(i) != null) {
switch (columnTypeName) {
case "DATETIME":
case "TIMESTAMP":
case "DATE":
row.put(columnLabel, rs.getTimestamp(i).getTime()); break;
default:
row.put(columnLabel, rs.getObject(i));
}
}
}
list.add(Map.class == type ? row : Kv.toBean(row, type));
}
return list;
} catch (SQLException e) {
e.printStackTrace();
return null;
} finally {
release(connection);
}
}
@Override
public <T> T findFirst(String sql, Class<T> type) {
List<T> list = findList(sql, type);
return list.size() > 0 ? list.get(0) : null;
}
@Override
public <T> T queryColumn(String sql, Class<T> type) {
Connection connection = connection();
try (
PreparedStatement ps = connection.prepareStatement(sql);
ResultSet rs = ps.executeQuery()
) {
Object v = null;
while (rs.next()) {
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
for (int i = 1; i <= count; i++) {
String columnTypeName = metaData.getColumnTypeName(i);
if (rs.getObject(i) != null) {
switch (columnTypeName) {
case "DATETIME":
case "TIMESTAMP":
case "DATE":
v = rs.getTimestamp(i).getTime(); break;
default:
v = rs.getObject(i);
}
}
}
}
return Kv.toAs(v, type);
} catch (SQLException e) {
e.printStackTrace();
return null;
} finally {
release(connection);
}
}
@Override
public void createTable(String sql) {
new RuntimeException("DbSourceMysql.createTable NOT SUPPORT right now" ); // todo:
}
@Override
public void dropTable(String tableName) {
new RuntimeException("[DbSourceMysql.dropTable] NOT SUPPORT right now" ); // todo:
}
@Override
public void exetute(String sql) {
Connection connection = connection();
try (
PreparedStatement ps = connection.prepareStatement(sql);
){
ps.execute();
//ps.executeUpdate();
} catch (SQLException e) {
throw new CfgException("SQL 执行失败:", sql);
} finally {
release(connection);
}
}
//fixme: lxy 处理连接超过8小时失效问题
private Connection connection() {
Connection connection = connection(0);
if (connection != null && catalog != null && !catalog.isEmpty()) {
try {
connection.setCatalog(catalog); //还回连接的时候是否需要重置catalog 后续观察
} catch (SQLException e) {
e.printStackTrace();
}
}
return connection;
}
private Connection connection(int n) {
LinkedBlockingQueue<Connection> queue = conns.getOrDefault(accountKey, new LinkedBlockingQueue<>(15));
Connection conn = null;
AtomicInteger num = counter.getOrDefault(accountKey, new AtomicInteger(0));
try {
if (queue.size() == 0 && num.get() < 15) {
conn = DriverManager.getConnection(dbAccount.getUrl(), dbAccount.getUser(), dbAccount.getPwd());
int x = num.incrementAndGet();
counter.put(accountKey, num);
System.out.println("创建新的连接:" + x);
} else {
conn = queue.take();
if (conn.isClosed()) {
System.out.println("connetion had closed,");
conn = connection(n);
}
}
} catch (SQLException | InterruptedException e) {
if (e instanceof InterruptedException) {
try {
conn = DriverManager.getConnection(dbAccount.getUrl(), dbAccount.getUser(), dbAccount.getPwd());
num.getAndIncrement();
if (conn != null) {
System.out.println("获取连接异常,并重新创建成功");
}
} catch (SQLException ex) {
new IllegalArgumentException("创建连接失败", e);
}
num.getAndIncrement();
counter.put(accountKey, num);
} else {
new IllegalArgumentException("获取连接失败", e);
}
}
conns.put(accountKey, queue);
return conn;
}
private void release(Connection connection) {
LinkedBlockingQueue<Connection> queue = conns.getOrDefault(accountKey, new LinkedBlockingQueue<>(15));
try {
if (connection != null) {
queue.put(connection);
conns.put(accountKey, queue);
//System.out.println("还回连接:" + connection);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}