Transport的pollConnection方法改成异步模式

This commit is contained in:
Redkale
2017-11-13 21:27:28 +08:00
parent fc54fc3f24
commit f2963e01e0
5 changed files with 184 additions and 154 deletions

View File

@@ -135,11 +135,34 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
*/ */
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address, public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address,
final int readTimeoutSecond0, final int writeTimeoutSecond0) throws IOException { final int readTimeoutSecond0, final int writeTimeoutSecond0) throws IOException {
return createTCP(group, address, false, readTimeoutSecond0, writeTimeoutSecond0);
}
/**
* 创建TCP协议客户端连接
*
* @param address 连接点子
* @param group 连接AsynchronousChannelGroup
* @param noDelay TcpNoDelay
* @param readTimeoutSecond0 读取超时秒数
* @param writeTimeoutSecond0 写入超时秒数
*
* @return 连接CompletableFuture
* @throws java.io.IOException 异常
*/
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address,
final boolean noDelay, final int readTimeoutSecond0, final int writeTimeoutSecond0) throws IOException {
final CompletableFuture future = new CompletableFuture(); final CompletableFuture future = new CompletableFuture();
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(address, null, new CompletionHandler<Void, Void>() { channel.connect(address, null, new CompletionHandler<Void, Void>() {
@Override @Override
public void completed(Void result, Void attachment) { public void completed(Void result, Void attachment) {
if (noDelay) {
try {
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
} catch (IOException e) {
}
}
future.complete(create(channel, address, readTimeoutSecond0, writeTimeoutSecond0)); future.complete(create(channel, address, readTimeoutSecond0, writeTimeoutSecond0));
} }

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.logging.Level;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.*; import org.redkale.util.*;
@@ -188,7 +189,7 @@ public final class Transport {
return tcp; return tcp;
} }
public AsyncConnection pollConnection(SocketAddress addr) { public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr) {
if (this.strategy != null) return strategy.pollConnection(addr, this); if (this.strategy != null) return strategy.pollConnection(addr, this);
if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address; if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address;
final boolean rand = addr == null; final boolean rand = addr == null;
@@ -207,7 +208,7 @@ public final class Transport {
if (!queue.isEmpty()) { if (!queue.isEmpty()) {
AsyncConnection conn; AsyncConnection conn;
while ((conn = queue.poll()) != null) { while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return conn; if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
} }
} }
tryed = true; tryed = true;
@@ -247,14 +248,14 @@ public final class Transport {
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(addr).get(2, TimeUnit.SECONDS); channel.connect(addr).get(2, TimeUnit.SECONDS);
} }
if (channel == null) return null; if (channel == null) return CompletableFuture.completedFuture(null);
return AsyncConnection.create(channel, addr, 3000, 3000); return CompletableFuture.completedFuture(AsyncConnection.create(channel, addr, 3000, 3000));
} else { // UDP } else { // UDP
if (rand) addr = this.transportAddres[0].address; if (rand) addr = this.transportAddres[0].address;
DatagramChannel channel = DatagramChannel.open(); DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true); channel.configureBlocking(true);
channel.connect(addr); channel.connect(addr);
return AsyncConnection.create(channel, addr, true, 3000, 3000); return CompletableFuture.completedFuture(AsyncConnection.create(channel, addr, true, 3000, 3000));
// AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); // AsyncDatagramChannel channel = AsyncDatagramChannel.open(group);
// channel.connect(addr); // channel.connect(addr);
// return AsyncConnection.create(channel, addr, true, 3000, 3000); // return AsyncConnection.create(channel, addr, true, 3000, 3000);
@@ -280,35 +281,40 @@ public final class Transport {
} }
public <A> void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) { public <A> void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) {
final AsyncConnection conn = pollConnection(addr); pollConnection(addr).whenComplete((conn, ex) -> {
conn.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { if (ex != null) {
factory.getLogger().log(Level.WARNING, Transport.class.getSimpleName() + " async error", ex);
@Override return;
public void completed(Integer result, ByteBuffer attachment) {
buffer.clear();
conn.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (handler != null) handler.completed(result, att);
offerBuffer(buffer);
offerConnection(false, conn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
offerConnection(true, conn);
}
});
} }
conn.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {
offerBuffer(buffer); buffer.clear();
offerConnection(true, conn); conn.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (handler != null) handler.completed(result, att);
offerBuffer(buffer);
offerConnection(false, conn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
offerConnection(true, conn);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
offerConnection(true, conn);
}
});
}); });
} }

View File

@@ -248,6 +248,10 @@ public class TransportFactory {
return new ArrayList<>(this.groupInfos.values()); return new ArrayList<>(this.groupInfos.values());
} }
public Logger getLogger() {
return logger;
}
public void addSncpService(Service service) { public void addSncpService(Service service) {
if (service == null) return; if (service == null) return;
services.add(new WeakReference<>(service)); services.add(new WeakReference<>(service));

View File

@@ -6,6 +6,7 @@
package org.redkale.net; package org.redkale.net;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
/** /**
* 远程请求的负载均衡策略 * 远程请求的负载均衡策略
@@ -17,5 +18,5 @@ import java.net.SocketAddress;
*/ */
public interface TransportStrategy { public interface TransportStrategy {
public AsyncConnection pollConnection(SocketAddress addr, Transport transport); public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr, Transport transport);
} }

View File

@@ -342,7 +342,7 @@ public final class SncpClient {
final Type[] myparamtypes = action.paramTypes; final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass; final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
final BsonWriter writer = bsonConvert.pollBsonWriter(transport == null ? bufferSupplier : transport.getBufferSupplier()); // 将head写入 final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER); writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]); bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]);
@@ -351,146 +351,142 @@ public final class SncpClient {
final long seqid = System.nanoTime(); final long seqid = System.nanoTime();
final DLong actionid = action.actionid; final DLong actionid = action.actionid;
final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
final CompletableFuture<byte[]> future = new CompletableFuture(); CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr);
AsyncConnection conn0; return connFuture.thenCompose(conn0 -> {
try { final CompletableFuture<byte[]> future = new CompletableFuture();
conn0 = transport.pollConnection(addr); if (conn0 == null || !conn0.isOpen()) {
} catch (Exception e) { future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect"));
future.completeExceptionally(e); return future;
return future; }
} final AsyncConnection conn = conn0;
if (conn0 == null || !conn0.isOpen()) { final ByteBuffer[] sendBuffers = writer.toBuffers();
future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength);
return future;
}
final AsyncConnection conn = conn0;
final ByteBuffer[] sendBuffers = writer.toBuffers();
fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength);
final ByteBuffer buffer = transport.pollBuffer(); final ByteBuffer buffer = transport.pollBuffer();
conn.write(sendBuffers, sendBuffers, new CompletionHandler<Integer, ByteBuffer[]>() { conn.write(sendBuffers, sendBuffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@Override @Override
public void completed(Integer result, ByteBuffer[] attachments) { public void completed(Integer result, ByteBuffer[] attachments) {
int index = -1; int index = -1;
for (int i = 0; i < attachments.length; i++) { for (int i = 0; i < attachments.length; i++) {
if (attachments[i].hasRemaining()) { if (attachments[i].hasRemaining()) {
index = i; index = i;
break; break;
} else { } else {
transport.offerBuffer(attachments[i]); transport.offerBuffer(attachments[i]);
}
} }
} if (index == 0) {
if (index == 0) { conn.write(attachments, attachments, this);
conn.write(attachments, attachments, this); return;
return; } else if (index > 0) {
} else if (index > 0) { ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index];
ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index]; System.arraycopy(attachments, index, newattachs, 0, newattachs.length);
System.arraycopy(attachments, index, newattachs, 0, newattachs.length); conn.write(newattachs, newattachs, this);
conn.write(newattachs, newattachs, this); return;
return; }
} //----------------------- 读取返回结果 -------------------------------------
//----------------------- 读取返回结果 ------------------------------------- buffer.clear();
buffer.clear(); conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
private byte[] body; private byte[] body;
private int received; private int received;
@Override @Override
public void completed(Integer count, Void attachment2) { public void completed(Integer count, Void attachment2) {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data"));
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(true, conn); transport.offerConnection(true, conn);
return; return;
} }
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
conn.read(buffer, attachment2, this); conn.read(buffer, attachment2, this);
return; return;
} }
buffer.flip(); buffer.flip();
if (received > 0) { if (received > 0) {
int offset = this.received; int offset = this.received;
this.received += buffer.remaining(); this.received += buffer.remaining();
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.read(buffer, attachment2, this);
} else {
success();
}
return;
}
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取
this.body = new byte[respBodyLength];
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear(); buffer.clear();
conn.read(buffer, attachment2, this); conn.read(buffer, attachment2, this);
} else { } else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
success(); success();
} }
return;
}
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
} }
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 public void success() {
this.body = new byte[respBodyLength]; future.complete(this.body);
this.received = buffer.remaining(); transport.offerBuffer(buffer);
buffer.get(body, 0, this.received); transport.offerConnection(false, conn);
buffer.clear(); if (handler != null) {
conn.read(buffer, attachment2, this); final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
} else { final BsonReader reader = bsonConvert.pollBsonReader();
this.body = new byte[respBodyLength]; try {
buffer.get(body, 0, respBodyLength); reader.setBytes(this.body);
success(); int i;
} while ((i = (reader.readByte() & 0xff)) != 0) {
} final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
public void success() { }
future.complete(this.body); Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
transport.offerBuffer(buffer); handler.completed(rs, handlerAttach);
transport.offerConnection(false, conn); } catch (Exception e) {
if (handler != null) { handler.failed(e, handlerAttach);
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; } finally {
final BsonReader reader = bsonConvert.pollBsonReader(); bsonConvert.offerBsonReader(reader);
try {
reader.setBytes(this.body);
int i;
while ((i = (reader.readByte() & 0xff)) != 0) {
final Attribute attr = action.paramAttrs[i];
attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader));
} }
Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader);
handler.completed(rs, handlerAttach);
} catch (Exception e) {
handler.failed(e, handlerAttach);
} finally {
bsonConvert.offerBsonReader(reader);
} }
} }
}
@Override @Override
public void failed(Throwable exc, Void attachment2) { public void failed(Throwable exc, Void attachment2) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc); logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc);
future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(true, conn); transport.offerConnection(true, conn);
if (handler != null) { if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach); handler.failed(exc, handlerAttach);
}
} }
} });
}); }
}
@Override @Override
public void failed(Throwable exc, ByteBuffer[] attachment) { public void failed(Throwable exc, ByteBuffer[] attachment) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc); logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc);
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(true, conn); transport.offerConnection(true, conn);
} }
});
return future;
}); });
return future;
} }
private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) { private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) {