This commit is contained in:
Redkale
2018-04-02 13:22:45 +08:00
parent dab8ed8ceb
commit 93b1c9f0d9
2 changed files with 34 additions and 5 deletions

View File

@@ -9,7 +9,7 @@ import java.io.*;
import java.net.URL;
import java.sql.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
@@ -41,6 +41,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected URL conf;
protected int threads;
protected ExecutorService executor;
protected boolean cacheForbidden;
protected PoolJdbcSource readPool;
@@ -82,10 +86,35 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
if (pn == null || pv == null) continue;
readprop.put(pn, pv);
}
final AtomicInteger counter = new AtomicInteger();
this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
this.executor = Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
String s = "" + counter.incrementAndGet();
if (s.length() == 1) {
s = "00" + s;
} else if (s.length() == 2) {
s = "0" + s;
}
t.setName("DataJdbcSource-Thread-" + s);
return t;
});
if (writeprop.isEmpty()) writeprop = readprop;
this.initByProperties(unitName, readprop, writeprop);
}
@Override
protected ExecutorService getExecutor() {
return executor;
}
@Override
public void destroy(AnyValue config) {
if (this.executor != null) this.executor.shutdownNow();
}
//构造前调用
protected void preConstruct(String unitName, Properties readprop, Properties writeprop) {
}

View File

@@ -47,7 +47,7 @@ public class PoolJdbcSource {
private final String stype; // "" 或 "read" 或 "write"
private final int max;
private final int maxconns;
private String url;
@@ -65,8 +65,8 @@ public class PoolJdbcSource {
this.url = prop.getProperty(JDBC_URL);
this.user = prop.getProperty(JDBC_USER);
this.password = prop.getProperty(JDBC_PWD);
this.max = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
this.queue = new ArrayBlockingQueue<>(this.max);
this.maxconns = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
this.queue = new ArrayBlockingQueue<>(this.maxconns);
this.listener = new ConnectionEventListener() {
@Override
@@ -287,7 +287,7 @@ public class PoolJdbcSource {
}
PooledConnection result = queue.poll();
if (result == null) {
if (usingCounter.get() >= max) {
if (usingCounter.get() >= maxconns) {
try {
result = queue.poll(6, TimeUnit.SECONDS);
} catch (Exception t) {