168 lines
5.7 KiB
Java
168 lines
5.7 KiB
Java
package net.tccn.dbq.jdbc.api;
|
||
|
||
import net.tccn.base.Kv;
|
||
|
||
import java.sql.*;
|
||
import java.util.ArrayList;
|
||
import java.util.HashMap;
|
||
import java.util.List;
|
||
import java.util.Map;
|
||
import java.util.concurrent.ConcurrentHashMap;
|
||
import java.util.concurrent.atomic.AtomicInteger;
|
||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||
|
||
/**
|
||
* Created by liangxianyou at 2019/3/12 14:20.
|
||
*/
|
||
@SuppressWarnings("Duplicates")
|
||
public class DbSourceMysql implements DbSource {
|
||
|
||
private static ConcurrentHashMap<String, AtomicReferenceArray<Connection>> conns = new ConcurrentHashMap<>();
|
||
private static ConcurrentHashMap<String, AtomicInteger> counter = new ConcurrentHashMap<>();
|
||
|
||
private String accountKey;
|
||
private DbAccount dbAccount;
|
||
|
||
public DbSourceMysql(DbAccount dbAccount) {
|
||
this.dbAccount = dbAccount;
|
||
this.accountKey = dbAccount.accountKey();
|
||
}
|
||
|
||
@Override
|
||
public <T> List<T> findList(String sql, Class<T> type) {
|
||
Connection connection = connection();
|
||
try (
|
||
PreparedStatement ps = connection.prepareStatement(sql);
|
||
ResultSet rs = ps.executeQuery()) {
|
||
List list = new ArrayList();
|
||
while (rs.next()) {
|
||
ResultSetMetaData metaData = rs.getMetaData();
|
||
int count = metaData.getColumnCount();
|
||
|
||
Map row = new HashMap();
|
||
for (int i = 1; i <= count; i++) {
|
||
String columnTypeName = metaData.getColumnTypeName(i);
|
||
String columnName = metaData.getColumnName(i);
|
||
row.put(columnName, null);
|
||
|
||
if (rs.getObject(i) != null) {
|
||
switch (columnTypeName) {
|
||
case "DATETIME":
|
||
case "TIMESTAMP":
|
||
case "DATE":
|
||
row.put(columnName, rs.getTimestamp(i).getTime()); break;
|
||
default:
|
||
row.put(columnName, rs.getObject(i));
|
||
}
|
||
}
|
||
}
|
||
list.add(Map.class == type ? row : Kv.toBean(row, type));
|
||
}
|
||
|
||
return list;
|
||
} catch (SQLException e) {
|
||
e.printStackTrace();
|
||
return null;
|
||
} finally {
|
||
release(connection);
|
||
}
|
||
}
|
||
|
||
@Override
|
||
public <T> T findfirst(String sql, Class<T> type) {
|
||
List<T> list = findList(sql, type);
|
||
return list.size() > 0 ? list.get(0) : null;
|
||
}
|
||
|
||
@Override
|
||
public <T> T queryColumn(String sql, Class<T> type) {
|
||
Connection connection = connection();
|
||
try (
|
||
PreparedStatement ps = connection.prepareStatement(sql);
|
||
ResultSet rs = ps.executeQuery()
|
||
) {
|
||
Object v = null;
|
||
while (rs.next()) {
|
||
ResultSetMetaData metaData = rs.getMetaData();
|
||
int count = metaData.getColumnCount();
|
||
|
||
for (int i = 1; i <= count; i++) {
|
||
String columnTypeName = metaData.getColumnTypeName(i);
|
||
if (rs.getObject(i) != null) {
|
||
switch (columnTypeName) {
|
||
case "DATETIME":
|
||
case "TIMESTAMP":
|
||
case "DATE":
|
||
v = rs.getTimestamp(i).getTime(); break;
|
||
default:
|
||
v = rs.getObject(i);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return Kv.toAs(v, type);
|
||
} catch (SQLException e) {
|
||
e.printStackTrace();
|
||
return null;
|
||
} finally {
|
||
release(connection);
|
||
}
|
||
}
|
||
|
||
private Connection connection() {
|
||
return connection(0);
|
||
}
|
||
|
||
private Connection connection(int n) {
|
||
AtomicReferenceArray<Connection> arr = conns.getOrDefault(accountKey, new AtomicReferenceArray<>(15));
|
||
Connection connection = null;
|
||
AtomicInteger num = counter.getOrDefault(accountKey, new AtomicInteger(0));
|
||
for (int i = 0; num.get() > 0 && i < arr.length() && connection == null; i++) {
|
||
try {
|
||
connection = arr.getAndUpdate(i, null);
|
||
} catch (Exception e) {
|
||
System.out.println("getAndUpdate exception");
|
||
}
|
||
}
|
||
if (connection == null) {
|
||
try {
|
||
if (num.get() < 15) {
|
||
connection = DriverManager.getConnection(dbAccount.getUrl(), dbAccount.getUser(), dbAccount.getPwd());
|
||
num.getAndIncrement();
|
||
} else {
|
||
//连接被全部使用中,等待1s后再次获取连接,直到得到连接 !!!
|
||
Thread.sleep(1000);
|
||
if (++n > 3)
|
||
connection(n);
|
||
}
|
||
} catch (SQLException e) {
|
||
e.printStackTrace();
|
||
throw new IllegalArgumentException("获取数据库连接失败");
|
||
} catch (InterruptedException e) {
|
||
e.printStackTrace();
|
||
}
|
||
}
|
||
|
||
return connection;
|
||
}
|
||
private void release(Connection connection) {
|
||
AtomicReferenceArray<Connection> arr = conns.getOrDefault(accountKey, new AtomicReferenceArray<>(15));
|
||
|
||
int i = 0;
|
||
boolean bool = false;
|
||
while (i < arr.length() && !bool){
|
||
bool = arr.compareAndSet(i++, null, connection);
|
||
}
|
||
|
||
//如果没成功释放,关系连接
|
||
if (!bool && connection != null) {
|
||
try {
|
||
connection.close();
|
||
} catch (SQLException e) {
|
||
e.printStackTrace();
|
||
}
|
||
}
|
||
}
|
||
}
|