优化sncp
This commit is contained in:
@@ -387,7 +387,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, array);
|
||||
if (allCompleted) {
|
||||
request.pipelineCompleted = true;
|
||||
this.channel.writePipeline(buffers, this.finishBuffersIOThreadHandler);
|
||||
this.channel.writeInIOThread(buffers, buffers, this.finishBuffersIOThreadHandler);
|
||||
} else {
|
||||
AsyncConnection conn = removeChannel();
|
||||
if (conn != null) {
|
||||
@@ -397,7 +397,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
}
|
||||
} else if (this.channel.hasPipelineData()) {
|
||||
//先将pipeline数据写入完再写入buffers
|
||||
this.channel.writePipeline(null, new CompletionHandler<Integer, Void>() {
|
||||
this.channel.writePipelineInIOThread(new CompletionHandler<Integer, Void>() {
|
||||
|
||||
@Override
|
||||
public void completed(Integer result, Void attachment) {
|
||||
@@ -410,7 +410,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
}
|
||||
});
|
||||
} else {
|
||||
this.channel.write(buffers, buffers, finishBuffersIOThreadHandler);
|
||||
this.channel.writeInIOThread(buffers, buffers, finishBuffersIOThreadHandler);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -200,10 +200,16 @@ public class SncpRemoteInfo<T extends Service> {
|
||||
//Client模式RPC
|
||||
protected CompletableFuture<byte[]> remoteClient(final SncpRemoteAction action, final String traceid, final Object[] params) {
|
||||
final SncpClient client = this.sncpClient;
|
||||
final SncpClientRequest request = createSncpClientRequest(action, client.clientSncpAddress, traceid, params);
|
||||
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress();
|
||||
return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent()));
|
||||
}
|
||||
|
||||
protected SncpClientRequest createSncpClientRequest(SncpRemoteAction action, InetSocketAddress clientSncpAddress, String traceid, Object[] params) {
|
||||
final Type[] myParamTypes = action.paramTypes;
|
||||
final Class[] myParamClass = action.paramClasses;
|
||||
if (action.paramAddressSourceIndex >= 0) {
|
||||
params[action.paramAddressSourceIndex] = client.clientSncpAddress;
|
||||
params[action.paramAddressSourceIndex] = clientSncpAddress;
|
||||
}
|
||||
byte[] body = null;
|
||||
if (myParamTypes.length > 0) {
|
||||
@@ -215,31 +221,8 @@ public class SncpRemoteInfo<T extends Service> {
|
||||
convert.offerWriter(writer);
|
||||
}
|
||||
final SncpClientRequest request = new SncpClientRequest();
|
||||
request.prepare(action.header, client.nextSeqno(), traceid, body);
|
||||
|
||||
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress();
|
||||
return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent()));
|
||||
}
|
||||
|
||||
protected SncpClientRequest createSncpClientRequest(final SncpRemoteAction action, final InetSocketAddress clientSncpAddress, final String traceid, final Object[] params) {
|
||||
final Type[] myParamTypes = action.paramTypes;
|
||||
final Class[] myParamClass = action.paramClasses;
|
||||
if (action.paramAddressSourceIndex >= 0) {
|
||||
params[action.paramAddressSourceIndex] = clientSncpAddress;
|
||||
}
|
||||
final long seqid = System.nanoTime();
|
||||
byte[] body = null;
|
||||
if (myParamTypes.length > 0) {
|
||||
Writer writer = convert.pollWriter();
|
||||
for (int i = 0; i < params.length; i++) { //service方法的参数
|
||||
convert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myParamClass[i]) ? CompletionHandler.class : myParamTypes[i], params[i]);
|
||||
}
|
||||
body = ((ByteTuple) writer).toArray();
|
||||
convert.offerWriter(writer);
|
||||
}
|
||||
final SncpClientRequest requet = new SncpClientRequest();
|
||||
requet.prepare(action.header, seqid, traceid, body);
|
||||
return requet;
|
||||
request.prepare(action.header, this.sncpClient.nextSeqno(), traceid, body);
|
||||
return request;
|
||||
}
|
||||
|
||||
protected InetSocketAddress nextRemoteAddress() {
|
||||
@@ -259,7 +242,8 @@ public class SncpRemoteInfo<T extends Service> {
|
||||
@Override
|
||||
public String toString() {
|
||||
InetSocketAddress clientSncpAddress = sncpClient == null ? null : sncpClient.getClientSncpAddress();
|
||||
return this.getClass().getSimpleName() + "(service = " + serviceType.getSimpleName() + ", serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", name = '" + name
|
||||
return this.getClass().getSimpleName() + "(service = " + serviceType.getSimpleName() + ", serviceid = " + serviceid
|
||||
+ ", serviceVersion = " + serviceVersion + ", name = '" + name
|
||||
+ "', address = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
|
||||
+ ", actions.size = " + actions.length + ")";
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user