This commit is contained in:
@@ -143,7 +143,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) writePool.closeConnection(conn);
|
if (conn != null) writePool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,7 +191,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) writePool.closeConnection(conn);
|
if (conn != null) writePool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,7 +256,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) writePool.closeConnection(conn);
|
if (conn != null) writePool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,7 +289,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) writePool.closeConnection(conn);
|
if (conn != null) writePool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -321,7 +321,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -346,7 +346,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -372,7 +372,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -395,7 +395,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -427,7 +427,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -450,7 +450,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -496,7 +496,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
|||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return future;
|
return future;
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) readPool.closeConnection(conn);
|
if (conn != null) readPool.offerConnection(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -255,7 +255,7 @@ public class PoolJdbcSource extends PoolSource<Connection> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeConnection(final Connection conn) {
|
public void offerConnection(final Connection conn) {
|
||||||
if (conn == null) return;
|
if (conn == null) return;
|
||||||
try {
|
try {
|
||||||
conn.close();
|
conn.close();
|
||||||
|
|||||||
@@ -139,7 +139,7 @@ public abstract class PoolSource<DBChannel> {
|
|||||||
|
|
||||||
public abstract CompletableFuture<DBChannel> pollAsync();
|
public abstract CompletableFuture<DBChannel> pollAsync();
|
||||||
|
|
||||||
public abstract void closeConnection(final DBChannel conn);
|
public abstract void offerConnection(final DBChannel conn);
|
||||||
|
|
||||||
public abstract void close();
|
public abstract void close();
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeConnection(final AsyncConnection conn) {
|
public void offerConnection(final AsyncConnection conn) {
|
||||||
if (conn == null) return;
|
if (conn == null) return;
|
||||||
if (connQueue.offer(conn)) {
|
if (connQueue.offer(conn)) {
|
||||||
saveCounter.incrementAndGet();
|
saveCounter.incrementAndGet();
|
||||||
@@ -167,10 +167,16 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
connQueue.stream().forEach(x -> {
|
connQueue.stream().forEach(x -> {
|
||||||
|
try {
|
||||||
|
sendCloseCommand(x);
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
x.close();
|
x.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void sendCloseCommand(final AsyncConnection conn);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user