This commit is contained in:
Redkale
2018-05-12 14:24:59 +08:00
parent 513183f55c
commit 350ece1533
5 changed files with 36 additions and 18 deletions

View File

@@ -131,8 +131,8 @@ public class DataJdbcOldSource extends AbstractService implements DataSource, Da
protected void initByProperties(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName;
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
this.readPool = new PoolJdbcSource(unitName, persistxml, "read", readprop, logger);
this.writePool = new PoolJdbcSource(unitName, persistxml, "write", writeprop, logger);
this.readPool = new PoolJdbcSource(unitName, persistxml, "read", null, readprop, logger);
this.writePool = new PoolJdbcSource(unitName, persistxml, "write", null, writeprop, logger);
}
@Local

View File

@@ -9,7 +9,7 @@ import java.io.Serializable;
import java.net.URL;
import java.sql.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Consumer;
import java.util.logging.Level;
@@ -45,8 +45,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
}
@Override
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, Properties prop) {
return new PoolJdbcSource(this.name, this.persistxml, rwtype, prop, this.logger);
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) {
return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, prop, this.logger);
}
@Override

View File

@@ -69,8 +69,10 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
final AtomicInteger counter = new AtomicInteger();
this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
int maxconns = Math.max(8, Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)));
if (readprop != writeprop) {
this.threads += Integer.decode(writeprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
maxconns = 0;
}
final String cname = this.getClass().getSimpleName();
final Thread.UncaughtExceptionHandler ueh = (t, e) -> {
@@ -99,8 +101,9 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
this.name = unitName;
this.persistxml = persistxml;
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
this.readPool = createPoolSource(this, "read", readprop);
this.writePool = createPoolSource(this, "write", writeprop);
ArrayBlockingQueue<DBChannel> queue = maxconns > 0 ? new ArrayBlockingQueue(maxconns) : null;
this.readPool = createPoolSource(this, "read", queue, readprop);
this.writePool = createPoolSource(this, "write", queue, writeprop);
}
//是否异步, 为true则只能调用pollAsync方法为false则只能调用poll方法
@@ -110,7 +113,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
protected abstract String prepareParamSign(int index);
//创建连接池
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, Properties prop);
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop);
//插入纪录
protected abstract <T> CompletableFuture<Integer> insertDB(final EntityInfo<T> info, T... values);

View File

@@ -39,12 +39,12 @@ public class PoolJdbcSource extends PoolSource<Connection> {
private final URL persistxml;
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, Properties prop, Logger logger) {
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) {
super(rwtype, prop, logger);
this.unitName = unitName;
this.persistxml = persistxml;
this.source = createDataSource(prop);
this.queue = new ArrayBlockingQueue<>(this.maxconns);
this.queue = aqueue == null ? new ArrayBlockingQueue<>(this.maxconns) : aqueue;
this.listener = new ConnectionEventListener() {
@Override

View File

@@ -33,7 +33,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
protected final ArrayBlockingQueue<AsyncConnection> connQueue;
public PoolTcpSource(String rwtype, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
super(rwtype, prop, logger);
this.bufferPool = bufferPool;
this.executor = executor;
@@ -42,7 +42,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
} catch (IOException e) {
throw new RuntimeException(e);
}
this.connQueue = new ArrayBlockingQueue<>(this.maxconns);
this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue;
}
@Override
@@ -53,7 +53,18 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
usingCounter.decrementAndGet();
} else {
//usingCounter 会在close方法中执行
conn.dispose();
CompletableFuture<AsyncConnection> future = null;
try {
future = sendCloseCommand(conn);
} catch (Exception e) {
}
if (future == null) {
conn.dispose();
} else {
future.whenComplete((c, t) -> {
if (c != null) c.dispose();
});
}
}
}
@@ -167,16 +178,20 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
@Override
public void close() {
connQueue.stream().forEach(x -> {
CompletableFuture<AsyncConnection> future = null;
try {
sendCloseCommand(x);
future = sendCloseCommand(x);
} catch (Exception e) {
}
try {
x.close();
} catch (Exception e) {
if (future == null) {
x.dispose();
} else {
future.whenComplete((c, t) -> {
if (c != null) c.dispose();
});
}
});
}
protected abstract void sendCloseCommand(final AsyncConnection conn);
protected abstract CompletableFuture<AsyncConnection> sendCloseCommand(final AsyncConnection conn);
}