修复负载均衡无法正确切换节点的BUG

This commit is contained in:
Redkale
2017-07-24 18:29:40 +08:00
parent a5fcb45a88
commit f4a7f1cff6

View File

@@ -11,6 +11,8 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.*;
/**
@@ -52,7 +54,7 @@ public final class Transport {
protected final InetSocketAddress clientAddress;
protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0];
protected TransportAddress[] transportAddres = new TransportAddress[0];
protected final ObjectPool<ByteBuffer> bufferPool;
@@ -82,28 +84,33 @@ public final class Transport {
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
InetSocketAddress[] oldAddresses = this.remoteAddres;
List<InetSocketAddress> list = new ArrayList<>();
TransportAddress[] oldAddresses = this.transportAddres;
List<TransportAddress> list = new ArrayList<>();
if (addresses != null) {
for (InetSocketAddress addr : addresses) {
if (clientAddress != null && clientAddress.equals(addr)) continue;
list.add(addr);
list.add(new TransportAddress(addr));
}
}
this.remoteAddres = list.toArray(new InetSocketAddress[list.size()]);
return oldAddresses;
this.transportAddres = list.toArray(new TransportAddress[list.size()]);
InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = oldAddresses[i].getAddress();
}
return rs;
}
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
if (addr == null) return false;
synchronized (this) {
if (this.remoteAddres == null) {
this.remoteAddres = new InetSocketAddress[]{addr};
if (this.transportAddres == null) {
this.transportAddres = new TransportAddress[]{new TransportAddress(addr)};
} else {
for (InetSocketAddress i : this.remoteAddres) {
if (addr.equals(i)) return false;
for (TransportAddress i : this.transportAddres) {
if (addr.equals(i.address)) return false;
}
this.remoteAddres = Utility.append(remoteAddres, addr);
this.transportAddres = Utility.append(transportAddres, new TransportAddress(addr));
}
return true;
}
@@ -111,9 +118,9 @@ public final class Transport {
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
if (this.remoteAddres == null) return false;
if (this.transportAddres == null) return false;
synchronized (this) {
this.remoteAddres = Utility.remove(remoteAddres, addr);
this.transportAddres = Utility.remove(transportAddres, new TransportAddress(addr));
}
return true;
}
@@ -134,8 +141,16 @@ public final class Transport {
return clientAddress;
}
public TransportAddress[] getTransportAddresses() {
return transportAddres;
}
public InetSocketAddress[] getRemoteAddresses() {
return remoteAddres;
InetSocketAddress[] rs = new InetSocketAddress[transportAddres.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = transportAddres[i].getAddress();
}
return rs;
}
public ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> getAsyncConnectionPool() {
@@ -144,7 +159,7 @@ public final class Transport {
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}";
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddres) + "}";
}
public ByteBuffer pollBuffer() {
@@ -169,32 +184,56 @@ public final class Transport {
public AsyncConnection pollConnection(SocketAddress addr) {
if (this.strategy != null) return strategy.pollConnection(addr, this);
if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0];
if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address;
final boolean rand = addr == null;
if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
if (rand && this.transportAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
try {
if (tcp) {
AsynchronousSocketChannel channel = null;
if (rand) { //取地址
for (int i = 0; i < remoteAddres.length; i++) {
addr = remoteAddres[i];
BlockingQueue<AsyncConnection> queue = connPool.get(addr);
if (queue != null && !queue.isEmpty()) {
TransportAddress transportAddr;
boolean tryed = false;
for (int i = 0; i < transportAddres.length; i++) {
transportAddr = transportAddres[i];
addr = transportAddr.address;
if (!transportAddr.enable) continue;
final BlockingQueue<AsyncConnection> queue = transportAddr.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return conn;
}
}
tryed = true;
if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try {
channel.connect(addr).get(2, TimeUnit.SECONDS);
transportAddr.enable = true;
break;
} catch (Exception iex) {
iex.printStackTrace();
if (i == remoteAddres.length - 1) channel = null;
transportAddr.enable = false;
channel = null;
}
}
if (channel == null && !tryed) {
for (int i = 0; i < transportAddres.length; i++) {
transportAddr = transportAddres[i];
addr = transportAddr.address;
if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try {
channel.connect(addr).get(2, TimeUnit.SECONDS);
transportAddr.enable = true;
break;
} catch (Exception iex) {
transportAddr.enable = false;
channel = null;
}
}
}
} else {
@@ -205,7 +244,7 @@ public final class Transport {
if (channel == null) return null;
return AsyncConnection.create(channel, addr, 3000, 3000);
} else { // UDP
if (rand) addr = remoteAddres[0];
if (rand) addr = this.transportAddres[0].address;
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(addr);
@@ -267,4 +306,54 @@ public final class Transport {
});
}
public static class TransportAddress {
protected InetSocketAddress address;
protected volatile boolean enable;
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
public TransportAddress(InetSocketAddress address) {
this.address = address;
this.enable = true;
}
@java.beans.ConstructorProperties({"address", "enable"})
public TransportAddress(InetSocketAddress address, boolean enable) {
this.address = address;
this.enable = enable;
}
public InetSocketAddress getAddress() {
return address;
}
public boolean isEnable() {
return enable;
}
@ConvertColumn(ignore = true)
public BlockingQueue<AsyncConnection> getConns() {
return conns;
}
@Override
public int hashCode() {
return this.address.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
final TransportAddress other = (TransportAddress) obj;
return this.address.equals(other.address);
}
public String toString() {
return JsonConvert.root().convertTo(this);
}
}
}