This commit is contained in:
Redkale
2018-04-14 13:13:13 +08:00
parent 4a518f1309
commit 7b9c71dcda
2 changed files with 88 additions and 7 deletions

View File

@@ -47,7 +47,7 @@ public abstract class PoolSource<T> {
protected String url;
protected InetSocketAddress addr;
protected InetSocketAddress servaddr;
protected String user;
@@ -118,9 +118,9 @@ public abstract class PoolSource<T> {
}
pos = url0.indexOf(':');
if (pos > 0) {
this.addr = new InetSocketAddress(url0.substring(0, pos), Integer.parseInt(url0.substring(pos + 1)));
this.servaddr = new InetSocketAddress(url0.substring(0, pos), Integer.parseInt(url0.substring(pos + 1)));
} else {
this.addr = new InetSocketAddress(url0, getDefaultPort());
this.servaddr = new InetSocketAddress(url0, getDefaultPort());
}
}
@@ -160,4 +160,37 @@ public abstract class PoolSource<T> {
public final long getSaveCount() {
return saveCounter.longValue();
}
public final int getMaxconns() {
return maxconns;
}
public final int getConnectTimeoutSeconds() {
return connectTimeoutSeconds;
}
public final int getReadTimeoutSeconds() {
return readTimeoutSeconds;
}
public final int getWriteTimeoutSeconds() {
return writeTimeoutSeconds;
}
public final String getUrl() {
return url;
}
public final InetSocketAddress getServaddr() {
return servaddr;
}
public final String getUser() {
return user;
}
public final String getPassword() {
return password;
}
}

View File

@@ -7,7 +7,8 @@ package org.redkale.source;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.*;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.logging.Logger;
@@ -29,8 +30,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
//TCP Channel组
protected AsynchronousChannelGroup group;
public PoolTcpSource(String stype, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool,ThreadPoolExecutor executor) {
super(stype, prop, logger);
public PoolTcpSource(String rwtype, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
super(rwtype, prop, logger);
this.bufferPool = bufferPool;
this.executor = executor;
try {
@@ -50,8 +51,55 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
return pollAsync().join();
}
protected abstract ByteBuffer reqConnectBuffer();
protected abstract void respConnectBuffer(final ByteBuffer buffer, CompletableFuture<AsyncConnection> future, AsyncConnection conn);
@Override
public CompletableFuture<AsyncConnection> pollAsync() {
return AsyncConnection.createTCP(group, this.addr, this.readTimeoutSeconds, this.writeTimeoutSeconds);
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
CompletableFuture<AsyncConnection> future = new CompletableFuture();
final ByteBuffer buffer = reqConnectBuffer();
conn.write(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment1) {
if (result < 0) {
failed(new SQLException("Write Buffer Error"), attachment1);
return;
}
if (buffer.hasRemaining()) {
conn.write(buffer, attachment1, this);
return;
}
buffer.clear();
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment2) {
if (result < 0) {
failed(new SQLException("Read Buffer Error"), attachment2);
return;
}
buffer.flip();
respConnectBuffer(buffer, future, conn);
}
@Override
public void failed(Throwable exc, Void attachment2) {
bufferPool.accept(buffer);
future.completeExceptionally(exc);
conn.dispose();
}
});
}
@Override
public void failed(Throwable exc, Void attachment1) {
bufferPool.accept(buffer);
future.completeExceptionally(exc);
conn.dispose();
}
});
return future;
});
}
}