SncpClientRequest.seqno生成规则优化
This commit is contained in:
@@ -5,6 +5,7 @@ package org.redkale.net.sncp;
|
|||||||
|
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.*;
|
||||||
import org.redkale.net.client.*;
|
import org.redkale.net.client.*;
|
||||||
|
|
||||||
@@ -20,11 +21,15 @@ import org.redkale.net.client.*;
|
|||||||
*/
|
*/
|
||||||
public class SncpClient extends Client<SncpClientConnection, SncpClientRequest, SncpClientResult> {
|
public class SncpClient extends Client<SncpClientConnection, SncpClientRequest, SncpClientResult> {
|
||||||
|
|
||||||
|
private final AtomicLong seqno = new AtomicLong();
|
||||||
|
|
||||||
final InetSocketAddress clientSncpAddress;
|
final InetSocketAddress clientSncpAddress;
|
||||||
|
|
||||||
public SncpClient(String name, AsyncGroup group, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) {
|
public SncpClient(String name, AsyncGroup group, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) {
|
||||||
super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns
|
super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns
|
||||||
this.clientSncpAddress = clientSncpAddress;
|
this.clientSncpAddress = clientSncpAddress;
|
||||||
|
this.readTimeoutSeconds = 12;
|
||||||
|
this.writeTimeoutSeconds = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -36,6 +41,11 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
|
|||||||
return clientSncpAddress;
|
return clientSncpAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected long nextSeqno() {
|
||||||
|
//System.nanoTime()值并发下会出现重复,windows11 jdk17出现过
|
||||||
|
return seqno.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected CompletableFuture<SncpClientConnection> connect(SocketAddress addr) {
|
protected CompletableFuture<SncpClientConnection> connect(SocketAddress addr) {
|
||||||
return super.connect(addr);
|
return super.connect(addr);
|
||||||
|
|||||||
@@ -200,9 +200,25 @@ public class SncpRemoteInfo<T extends Service> {
|
|||||||
//Client模式RPC
|
//Client模式RPC
|
||||||
protected CompletableFuture<byte[]> remoteClient(final SncpRemoteAction action, final String traceid, final Object[] params) {
|
protected CompletableFuture<byte[]> remoteClient(final SncpRemoteAction action, final String traceid, final Object[] params) {
|
||||||
final SncpClient client = this.sncpClient;
|
final SncpClient client = this.sncpClient;
|
||||||
final SncpClientRequest request = createSncpClientRequest(action, client.clientSncpAddress, traceid, params);
|
final Type[] myParamTypes = action.paramTypes;
|
||||||
|
final Class[] myParamClass = action.paramClasses;
|
||||||
|
if (action.paramAddressSourceIndex >= 0) {
|
||||||
|
params[action.paramAddressSourceIndex] = client.clientSncpAddress;
|
||||||
|
}
|
||||||
|
byte[] body = null;
|
||||||
|
if (myParamTypes.length > 0) {
|
||||||
|
Writer writer = convert.pollWriter();
|
||||||
|
for (int i = 0; i < params.length; i++) { //service方法的参数
|
||||||
|
convert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myParamClass[i]) ? CompletionHandler.class : myParamTypes[i], params[i]);
|
||||||
|
}
|
||||||
|
body = ((ByteTuple) writer).toArray();
|
||||||
|
convert.offerWriter(writer);
|
||||||
|
}
|
||||||
|
final SncpClientRequest request = new SncpClientRequest();
|
||||||
|
request.prepare(action.header, client.nextSeqno(), traceid, body);
|
||||||
|
|
||||||
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress();
|
final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress();
|
||||||
return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent()));
|
return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent()));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SncpClientRequest createSncpClientRequest(final SncpRemoteAction action, final InetSocketAddress clientSncpAddress, final String traceid, final Object[] params) {
|
protected SncpClientRequest createSncpClientRequest(final SncpRemoteAction action, final InetSocketAddress clientSncpAddress, final String traceid, final Object[] params) {
|
||||||
|
|||||||
@@ -116,7 +116,6 @@ public class SncpTest {
|
|||||||
//service.updateBean(bean);
|
//service.updateBean(bean);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
System.exit(1);
|
|
||||||
} finally {
|
} finally {
|
||||||
long a = ai.incrementAndGet();
|
long a = ai.incrementAndGet();
|
||||||
System.out.println("运行了 " + (a == 100 ? "--------------------------------------------------" : "") + a);
|
System.out.println("运行了 " + (a == 100 ? "--------------------------------------------------" : "") + a);
|
||||||
|
|||||||
Reference in New Issue
Block a user