This commit is contained in:
Redkale
2018-05-01 12:14:52 +08:00
parent e642854e99
commit 0392fd68a8
5 changed files with 87 additions and 9 deletions

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;
import static org.redkale.net.ProtocolServer.*;
@@ -40,6 +41,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
//关闭数
protected AtomicLong closedCounter;
protected Consumer<AsyncConnection> beforeCloseListener;
public final long getLastReadTime() {
return readtime;
}
@@ -89,6 +92,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
public AsyncConnection beforeCloseListener(Consumer<AsyncConnection> beforeCloseListener) {
this.beforeCloseListener = beforeCloseListener;
return this;
}
@Override
public void close() throws IOException {
if (closedCounter != null) {
@@ -99,6 +107,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
livingCounter.decrementAndGet();
livingCounter = null;
}
if (beforeCloseListener != null) beforeCloseListener.accept(this);
if (attributes == null) return;
try {
for (Object obj : attributes.values()) {

View File

@@ -95,12 +95,6 @@ public abstract class DataSqlSource<Conn> extends AbstractService implements Dat
protected abstract PoolSource<Conn> createWritePoolSource(DataSource source, String stype, Properties prop);
@Local
protected abstract void closeReadConnection(final Conn conn);
@Local
protected abstract void closeWriteConnection(final Conn conn);
@Override
protected ExecutorService getExecutor() {
return executor;

View File

@@ -250,6 +250,16 @@ public class PoolJdbcSource extends PoolSource<Connection> {
}
}
@Override
public void closeConnection(final Connection conn) {
if (conn == null) return;
try {
conn.close();
} catch (Exception e) {
logger.log(Level.WARNING, "closeSQLConnection abort", e);
}
}
@Override
public Connection poll() {
return poll(0, null);

View File

@@ -70,7 +70,7 @@ public abstract class PoolSource<T> {
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_CONNECTTIMEOUT_SECONDS, "3"));
this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3"));
this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3"));
this.maxconns = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)));
String dbtype0 = "";
{ //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串
int pos = this.url.indexOf("://");
@@ -141,6 +141,8 @@ public abstract class PoolSource<T> {
public abstract void close();
public abstract void closeConnection(final T conn);
public final String getDbtype() {
return dbtype;
}

View File

@@ -8,10 +8,10 @@ package org.redkale.source;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.sql.SQLException;
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.logging.Logger;
import java.util.logging.*;
import org.redkale.net.AsyncConnection;
import org.redkale.util.ObjectPool;
@@ -30,6 +30,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
//TCP Channel组
protected AsynchronousChannelGroup group;
protected final ArrayBlockingQueue<AsyncConnection> connQueue;
public PoolTcpSource(String rwtype, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
super(rwtype, prop, logger);
this.bufferPool = bufferPool;
@@ -39,6 +41,19 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
} catch (IOException e) {
throw new RuntimeException(e);
}
this.connQueue = new ArrayBlockingQueue<>(this.maxconns);
}
@Override
public void closeConnection(final AsyncConnection conn) {
if (conn == null) return;
if (connQueue.offer(conn)) {
saveCounter.incrementAndGet();
usingCounter.decrementAndGet();
} else {
//usingCounter 会在close方法中执行
conn.dispose();
}
}
@Override
@@ -52,7 +67,40 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
@Override
public CompletableFuture<AsyncConnection> pollAsync() {
return pollAsync(0);
}
protected CompletableFuture<AsyncConnection> pollAsync(final int count) {
if (count >= 3) {
logger.log(Level.WARNING, "create datasource connection error");
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
future.completeExceptionally(new SQLException("create datasource connection error"));
return future;
}
AsyncConnection conn0 = connQueue.poll();
if (conn0 != null) {
cycleCounter.incrementAndGet();
usingCounter.incrementAndGet();
return CompletableFuture.completedFuture(conn0);
}
if (usingCounter.get() >= maxconns && count < 2) {
return CompletableFuture.supplyAsync(() -> {
try {
return connQueue.poll(3, TimeUnit.SECONDS);
} catch (Exception t) {
return null;
}
}, executor).thenCompose((conn2) -> {
if (conn2 != null) {
cycleCounter.incrementAndGet();
return CompletableFuture.completedFuture(conn2);
}
return pollAsync(count + 1);
});
}
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
conn.beforeCloseListener((c) -> usingCounter.decrementAndGet());
CompletableFuture<AsyncConnection> future = new CompletableFuture();
final ByteBuffer buffer = reqConnectBuffer(conn);
conn.write(buffer, null, new CompletionHandler<Integer, Void>() {
@@ -95,6 +143,21 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
}
});
return future;
}).whenComplete((c, t) -> {
if (t == null) {
creatCounter.incrementAndGet();
usingCounter.incrementAndGet();
}
});
}
@Override
public void close() {
connQueue.stream().forEach(x -> {
try {
x.close();
} catch (Exception e) {
}
});
}
}