This commit is contained in:
地平线
2015-11-05 10:06:35 +08:00
parent 15273faa48
commit fe5d934dd7
14 changed files with 502 additions and 310 deletions

View File

@@ -89,26 +89,38 @@ public final class BsonWriter implements Writer {
*
* @param position
* @param chs
* @return
*/
public void rewriteTo(int position, byte... chs) {
public int rewriteTo(int position, byte... chs) {
System.arraycopy(chs, 0, content, position, chs.length);
return position + chs.length;
}
public void rewriteTo(int position, short value) {
public int rewriteTo(int position, short value) {
rewriteTo(position, (byte) (value >> 8), (byte) value);
return position + 2;
}
public void rewriteTo(int position, char value) {
public int rewriteTo(int position, char value) {
rewriteTo(position, (byte) ((value & 0xFF00) >> 8), (byte) (value & 0xFF));
return position + 2;
}
public void rewriteTo(int position, int value) {
public int rewriteTo(int position, int value) {
rewriteTo(position, (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value);
return position + 4;
}
public void rewriteTo(int position, long value) {
public int rewriteTo(int position, long value) {
rewriteTo(position, (byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32),
(byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value);
return position + 8;
}
public BsonWriter fillRange(final int len) {
expand(len);
count += len;
return this;
}
public void writeTo(final byte ch) {

View File

@@ -24,6 +24,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public abstract SocketAddress getRemoteAddress();
public abstract SocketAddress getLocalAddress();
public abstract int getReadTimeoutSecond();
public abstract int getWriteTimeoutSecond();
@@ -104,7 +106,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
return create(channel, address, readTimeoutSecond0, writeTimeoutSecond0);
} else if ("UDP".equalsIgnoreCase(protocol)) {
AsyncDatagramChannel channel = AsyncDatagramChannel.open(null);
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(address);
return create(channel, address, true, readTimeoutSecond0, writeTimeoutSecond0);
} else {
@@ -112,100 +115,6 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
private static class AIOUDPAsyncConnection extends AsyncConnection {
private int readTimeoutSecond;
private int writeTimeoutSecond;
private final AsyncDatagramChannel channel;
private final SocketAddress remoteAddress;
private final boolean client;
public AIOUDPAsyncConnection(final AsyncDatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
this.channel = ch;
this.client = client0;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
this.remoteAddress = addr;
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (readTimeoutSecond > 0) {
channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(src, remoteAddress, attachment, handler);
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(srcs, offset, length, remoteAddress, attachment, handler);
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
public int getReadTimeoutSecond() {
return this.readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return this.writeTimeoutSecond;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
super.close();
if (client) {
channel.close();
}
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return false;
}
}
private static class BIOUDPAsyncConnection extends AsyncConnection {
private int readTimeoutSecond;
@@ -252,14 +161,22 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
int end = offset + length - 1;
for (int i = offset; i < offset + length; i++) {
rs += channel.send(srcs[i], remoteAddress);
if (i != end) Thread.sleep(1);
if(i != offset) Thread.sleep(10);
}
if (handler != null) handler.completed(rs, attachment);
} catch (Exception e) {
@@ -326,15 +243,6 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) {
return create(ch, addr, client0, 0, 0);
}
public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0);
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0);
@@ -425,6 +333,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
return socket.getLocalSocketAddress();
}
@Override
public int getReadTimeoutSecond() {
return readTimeoutSecond;
@@ -510,6 +423,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
/**
* 通常用于 ssl socket
* @param socket
* @return
*/
public static AsyncConnection create(final Socket socket) {
return create(socket, null, 0, 0);
}
@@ -604,6 +522,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);

View File

@@ -25,6 +25,8 @@ public class Context {
protected final ExecutorService executor;
protected final int bufferCapacity;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final ObjectPool<Response> responsePool;
@@ -45,12 +47,13 @@ public class Context {
protected final WatchFactory watch;
public Context(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
public Context(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final WatchFactory watch,
final int readTimeoutSecond, final int writeTimeoutSecond) {
this.serverStartTime = serverStartTime;
this.logger = logger;
this.executor = executor;
this.bufferCapacity = bufferCapacity;
this.bufferPool = bufferPool;
this.responsePool = responsePool;
this.maxbody = maxbody;
@@ -82,6 +85,10 @@ public class Context {
executor.submit(r);
}
public int getBufferCapacity() {
return bufferCapacity;
}
public ByteBuffer pollBuffer() {
return bufferPool.poll();
}

View File

@@ -40,76 +40,13 @@ public abstract class ProtocolServer {
private static final class ProtocolUDPServer extends ProtocolServer {
private final Context context;
private AsynchronousChannelGroup group;
private AsyncDatagramChannel serverChannel;
public ProtocolUDPServer(Context context) {
this.context = context;
}
@Override
public void open() throws IOException {
this.group = AsynchronousChannelGroup.withCachedThreadPool(context.executor, 1);
this.serverChannel = AsyncDatagramChannel.open(group);
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() {
final AsyncDatagramChannel serchannel = this.serverChannel;
final ByteBuffer buffer = this.context.pollBuffer();
serchannel.receive(buffer, buffer, new CompletionHandler<SocketAddress, ByteBuffer>() {
@Override
public void completed(final SocketAddress address, ByteBuffer attachment) {
final ByteBuffer buffer2 = context.pollBuffer();
serchannel.receive(buffer2, buffer2, this);
attachment.flip();
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, context.readTimeoutSecond, context.writeTimeoutSecond);
context.submit(new PrepareRunner(context, conn, attachment));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
context.offerBuffer(attachment);
//if (exc != null) context.logger.log(Level.FINEST, AsyncDatagramChannel.class.getSimpleName() + " accept erroneous", exc);
}
});
}
@Override
public void close() throws IOException {
this.serverChannel.close();
}
@Override
public AsynchronousChannelGroup getChannelGroup() {
return this.group;
}
}
private static final class ProtocolUDPWinServer extends ProtocolServer {
private boolean running;
private final Context context;
private DatagramChannel serverChannel;
public ProtocolUDPWinServer(Context context) {
public ProtocolUDPServer(Context context) {
this.context = context;
}

View File

@@ -23,8 +23,6 @@ public final class Transport {
protected static final int MAX_POOL_LIMIT = 16;
protected final boolean aio;
protected final String name;
protected final int bufferPoolSize;
@@ -44,17 +42,12 @@ public final class Transport {
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(Transport transport, InetSocketAddress localAddress, Collection<Transport> transports) {
this(transport.name, transport.protocol, transport.aio, null, transport.bufferPoolSize, parse(localAddress, transports));
this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports));
}
public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this(name, protocol, false, watch, bufferPoolSize, addresses);
}
public Transport(String name, String protocol, boolean aio, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this.name = name;
this.protocol = protocol;
this.aio = aio;
this.bufferPoolSize = bufferPoolSize;
this.bufferCapacity = 8192;
AsynchronousChannelGroup g = null;
@@ -114,6 +107,10 @@ public final class Transport {
return Transport.class.getSimpleName() + "{name=" + name + ",protocol=" + protocol + ",remoteAddres=" + Arrays.toString(remoteAddres) + "}";
}
public int getBufferCapacity() {
return bufferCapacity;
}
public ByteBuffer pollBuffer() {
return bufferPool.poll();
}
@@ -126,6 +123,10 @@ public final class Transport {
for (ByteBuffer buffer : buffers) offerBuffer(buffer);
}
public boolean isTCP() {
return "TCP".equalsIgnoreCase(protocol);
}
public AsyncConnection pollConnection(SocketAddress addr) {
final boolean rand = addr == null;
try {
@@ -144,19 +145,13 @@ public final class Transport {
if (conn.isOpen()) return conn;
}
}
if (aio) {
if (channel == null) channel = AsynchronousSocketChannel.open(group);
} else {
if (socket == null) socket = new Socket();
}
if (channel == null) channel = AsynchronousSocketChannel.open(group);
try {
if (aio) {
channel.connect(addr).get(1, TimeUnit.SECONDS);
} else {
socket.connect(addr, 1000);
}
channel.connect(addr).get(1, TimeUnit.SECONDS);
break;
} catch (Exception iex) {
iex.printStackTrace();
if (i == remoteAddres.length - 1) {
p = 0;
socket = null;
@@ -166,29 +161,20 @@ public final class Transport {
}
index.set(p);
} else {
if (aio) {
channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(1, TimeUnit.SECONDS);
} else {
socket = new Socket();
socket.connect(addr, 1000);
}
channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(1, TimeUnit.SECONDS);
}
if (aio && channel == null) return null;
if (!aio && socket == null) return null;
return aio ? AsyncConnection.create(channel, addr, 3000, 3000) : AsyncConnection.create(socket, addr, 3000, 3000);
if (channel == null) return null;
return AsyncConnection.create(channel, addr, 3000, 3000);
} else { // UDP
if (rand) addr = remoteAddres[0];
if (aio) {
AsyncDatagramChannel channel = AsyncDatagramChannel.open(group);
channel.connect(addr);
return AsyncConnection.create(channel, addr, true, 3000, 3000);
} else {
DatagramChannel socket = DatagramChannel.open();
socket.configureBlocking(true);
socket.connect(addr);
return AsyncConnection.create(socket, addr, true, 3000, 3000);
}
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(addr);
return AsyncConnection.create(channel, addr, true, 3000, 3000);
// AsyncDatagramChannel channel = AsyncDatagramChannel.open(group);
// channel.connect(addr);
// return AsyncConnection.create(channel, addr, true, 3000, 3000);
}
} catch (Exception ex) {
throw new RuntimeException("transport address = " + addr, ex);
@@ -196,7 +182,7 @@ public final class Transport {
}
public void offerConnection(AsyncConnection conn) {
if (conn.isTCP() && false) { //暂时每次都关闭
if (false && conn.isTCP()) { //暂时每次都关闭
if (conn.isOpen()) {
BlockingQueue<AsyncConnection> queue = connPool.get(conn.getRemoteAddress());
if (queue == null) {

View File

@@ -28,10 +28,10 @@ public final class HttpContext extends Context {
protected final SecureRandom random = new SecureRandom();
public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool,
public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare,
WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond, String contextPath) {
super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset,
super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset,
address, prepare, watch, readTimeoutSecond, writeTimeoutSecond);
this.contextPath = contextPath;
this.jsonFactory = JsonFactory.root();

View File

@@ -118,7 +118,7 @@ public final class HttpServer extends Server {
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter");
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool,
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory, addrHeader), addHeaders, setHeaders, defCookie));
return httpcontext;

View File

@@ -7,12 +7,13 @@ package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.net.*;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import static com.wentch.redkale.net.sncp.SncpRequest.*;
import com.wentch.redkale.util.*;
import java.lang.annotation.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
@@ -177,34 +178,237 @@ public final class SncpClient {
}
public <T> T remote(final BsonConvert convert, Transport transport, final int index, final Object... params) {
return convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params));
Future<byte[]> future = transport.isTCP() ? remoteTCP(convert, transport, actions[index], params) : remoteUDP(convert, transport, actions[index], params);
try {
return convert.convertFrom(actions[index].resultTypes, future.get(5, TimeUnit.SECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(actions[index].method + " sncp remote error", e);
}
}
public <T> void remote(final BsonConvert convert, Transport[] transports, boolean run, final int index, final Object... params) {
if (!run) return;
this.remote(false, convert, transports, run, index, params);
for (Transport transport : transports) {
if (transport.isTCP()) {
remoteTCP(convert, transport, actions[index], params);
} else {
remoteUDP(convert, transport, actions[index], params);
}
}
}
public <T> void asyncRemote(final BsonConvert convert, Transport[] transports, boolean run, final int index, final Object... params) {
if (!run) return;
this.remote(true, convert, transports, run, index, params);
}
private <T> void remote(final boolean async, final BsonConvert convert, final Transport[] transports, final boolean run, final int index, final Object... params) {
if (!run) return;
if (async && executor != null) {
if (executor != null) {
executor.accept(() -> {
for (Transport transport : transports) {
convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params));
if (transport.isTCP()) {
remoteTCP(convert, transport, actions[index], params);
} else {
remoteUDP(convert, transport, actions[index], params);
}
}
});
} else {
for (Transport transport : transports) {
convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params));
if (transport.isTCP()) {
remoteTCP(convert, transport, actions[index], params);
} else {
remoteUDP(convert, transport, actions[index], params);
}
}
}
}
private Future<byte[]> remoteUDP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes;
final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入
for (int i = 0; i < params.length; i++) {
convert.convertTo(bw, myparamtypes[i], params[i]);
}
final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null;
final AsyncConnection conn = transport.pollConnection(addr);
if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect");
final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final int readto = conn.getReadTimeoutSecond();
final int writeto = conn.getWriteTimeoutSecond();
final ByteBuffer buffer = transport.pollBuffer();
try {
//------------------------------ 发送请求 ---------------------------------------------------
if (transport.getBufferCapacity() >= bw.count()) { //只有一帧数据
fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength);
conn.write(bw.toBuffer()).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
} else {
final int bufsize = transport.getBufferCapacity() - HEADER_SIZE;
final int frames = (reqBodyLength / bufsize) + (reqBodyLength % bufsize > 0 ? 1 : 0);
int pos = 0;
for (int i = 0; i < frames; i++) {
int len = Math.min(bufsize, reqBodyLength - pos);
fillHeader(buffer, seqid, actionid, reqBodyLength, pos, len);
bw.toBuffer(pos + HEADER_SIZE, buffer);
pos += len;
buffer.flip();
if (i != 0) Thread.sleep(10);
conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
buffer.clear();
}
}
//------------------------------ 接收响应 ---------------------------------------------------
int received = 0;
int respBodyLength = 1;
byte[] respBody = null;
while (received < respBodyLength) {
buffer.clear();
conn.read(buffer).get(readto > 0 ? readto : 3, TimeUnit.SECONDS);
buffer.flip();
checkResult(seqid, action, buffer);
int respbodylen = buffer.getInt();
if (respBody == null) {
respBodyLength = respbodylen;
respBody = new byte[respBodyLength];
}
int bodyOffset = buffer.getInt(); //
int frameLength = buffer.getInt(); //
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
int len = Math.min(buffer.remaining(), frameLength);
buffer.get(respBody, bodyOffset, len);
received += len;
}
return new SncpFuture<>(respBody);
} catch (RuntimeException e) {
throw e;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
transport.offerBuffer(buffer);
transport.offerConnection(conn);
}
}
private Future<byte[]> remoteTCP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes;
final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入
for (int i = 0; i < params.length; i++) {
convert.convertTo(bw, myparamtypes[i], params[i]);
}
final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null;
final AsyncConnection conn = transport.pollConnection(addr);
if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect");
fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength);
final ByteBuffer buffer = transport.pollBuffer();
final ByteBuffer sendbuf = bw.toBuffer();
final SncpFuture<byte[]> future = new SncpFuture();
conn.write(sendbuf, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (sendbuf.hasRemaining()) { //buffer没有传输完
conn.write(sendbuf, attachment, this);
return;
}
//----------------------- 读取返回结果 -------------------------------------
buffer.clear();
conn.read(buffer, null, new CompletionHandler<Integer, Void>() {
private byte[] body;
private int received;
@Override
public void completed(Integer count, Void attachment) {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.set(new RuntimeException(action.method + " sncp remote no response data"));
transport.offerBuffer(buffer);
transport.offerConnection(conn);
return;
}
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
conn.read(buffer, attachment, this);
return;
}
buffer.flip();
if (received > 0) {
int offset = this.received;
this.received += buffer.remaining();
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.read(buffer, attachment, this);
} else {
success();
}
return;
}
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
buffer.getInt(); // bodyOffset
buffer.getInt(); // frameLength
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取
this.body = new byte[respBodyLength];
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear();
conn.read(buffer, attachment, this);
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
success();
}
}
public void success() {
future.set(this.body);
transport.offerBuffer(buffer);
transport.offerConnection(conn);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.set(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer);
transport.offerConnection(conn);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
transport.offerBuffer(buffer);
transport.offerConnection(conn);
}
});
return future;
}
private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) {
long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") response.serviceid = " + serviceid + ", but request.serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") response.nameid = " + nameid + ", but receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!action.actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt(); //地址
buffer.getChar(); //端口
}
private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) {
Type[] myparamtypes = action.paramTypes;
final BsonWriter bw = convert.pollBsonWriter();
@@ -222,7 +426,7 @@ public final class SncpClient {
final int writeto = conn.getWriteTimeoutSecond();
try {
if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
//if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
//if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + reqBodyLength));
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
int pos = 0;
for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据
@@ -230,7 +434,7 @@ public final class SncpClient {
fillHeader(buffer, seqid, actionid, bodyLength, pos, len);
pos += bw.toBuffer(pos, buffer);
buffer.flip();
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
buffer.clear();
}
convert.offerBsonWriter(bw);
@@ -241,7 +445,7 @@ public final class SncpClient {
bw.toBuffer(buffer);
convert.offerBsonWriter(bw);
buffer.flip();
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
buffer.clear();
}
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); //读取第一帧的结果数据
@@ -328,6 +532,23 @@ public final class SncpClient {
}
}
private void fillHeader(BsonWriter writer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) {
//---------------------head----------------------------------
int pos = 0;
pos = writer.rewriteTo(pos, seqid); //序列号
pos = writer.rewriteTo(pos, (char) HEADER_SIZE); //header长度
pos = writer.rewriteTo(pos, this.serviceid);
pos = writer.rewriteTo(pos, this.nameid);
pos = writer.rewriteTo(pos, actionid.getFirst());
pos = writer.rewriteTo(pos, actionid.getSecond());
pos = writer.rewriteTo(pos, addrBytes);
pos = writer.rewriteTo(pos, (char) this.addrPort);
pos = writer.rewriteTo(pos, bodyLength); //body长度
pos = writer.rewriteTo(pos, bodyOffset);
pos = writer.rewriteTo(pos, frameLength); //一帧数据的长度
writer.rewriteTo(pos, 0); //结果码, 请求方固定传0
}
private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) {
//---------------------head----------------------------------
buffer.putLong(seqid); //序列号

View File

@@ -38,8 +38,9 @@ public final class SncpContext extends Context {
}
public void add(ByteBuffer buffer, int pos) {
this.received += buffer.remaining();
buffer.get(body, pos, buffer.remaining());
int len = Math.min(buffer.remaining(), this.body.length - this.received);
this.received += len;
buffer.get(body, pos, len);
}
public boolean isCompleted() {
@@ -52,10 +53,10 @@ public final class SncpContext extends Context {
protected final BsonFactory bsonFactory;
public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool,
public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare,
WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) {
super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset,
super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset,
address, prepare, watch, readTimeoutSecond, writeTimeoutSecond);
this.bsonFactory = BsonFactory.root();
}

View File

@@ -6,6 +6,7 @@
package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.*;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import com.wentch.redkale.service.*;
import com.wentch.redkale.util.*;
import java.io.*;
@@ -85,17 +86,18 @@ public final class SncpDynServlet extends SncpServlet {
if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else {
BsonWriter out = action.convert.pollBsonWriter().fillRange(HEADER_SIZE);
BsonReader in = action.convert.pollBsonReader();
try {
in.setBytes(request.getBody());
BsonWriter bw = action.action(in);
response.finish(0, bw);
if (bw != null) action.convert.offerBsonWriter(bw);
action.action(in, out);
response.finish(0, out);
} catch (Throwable t) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
} finally {
action.convert.offerBsonReader(in);
action.convert.offerBsonWriter(out);
}
}
}
@@ -109,7 +111,7 @@ public final class SncpDynServlet extends SncpServlet {
protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null
public abstract BsonWriter action(final BsonReader in) throws Throwable;
public abstract void action(final BsonReader in, final BsonWriter out) throws Throwable;
/*
*
@@ -124,12 +126,12 @@ public final class SncpDynServlet extends SncpServlet {
* public TestService service;
*
* @Override
* public BsonWriter action(final BsonReader in) throws Throwable {
* public void action(final BsonReader in, final BsonWriter out) throws Throwable {
* TestBean arg1 = convert.convertFrom(in, paramTypes[1]);
* String arg2 = convert.convertFrom(in, paramTypes[2]);
* int arg3 = convert.convertFrom(in, paramTypes[3]);
* Object rs = service.change(arg1, arg2, arg3);
* return convert.convertTo(paramTypes[0], rs);
* convert.convertTo(out, paramTypes[0], rs);
* }
* }
*/
@@ -188,11 +190,11 @@ public final class SncpDynServlet extends SncpServlet {
throw new RuntimeException(ex); //不可能会发生
}
{ // action方法
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + ")" + convertWriterDesc, null, new String[]{"java/lang/Throwable"}));
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + ")V", null, new String[]{"java/lang/Throwable"}));
//mv.setDebug(true);
int iconst = ICONST_1;
int intconst = 1;
int store = 2;
int store = 3; //action的参数个数+1
final Class[] paramClasses = method.getParameterTypes();
int[][] codes = new int[paramClasses.length][2];
for (int i = 0; i < paramClasses.length; i++) { //参数
@@ -259,8 +261,7 @@ public final class SncpDynServlet extends SncpServlet {
int maxStack = codes.length > 0 ? codes[codes.length - 1][1] : 1;
Class returnClass = method.getReturnType();
if (method.getReturnType() == void.class) { //返回
mv.visitInsn(ACONST_NULL);
mv.visitInsn(ARETURN);
mv.visitInsn(RETURN);
maxStack = 8;
} else {
if (returnClass.isPrimitive()) {
@@ -275,13 +276,14 @@ public final class SncpDynServlet extends SncpServlet {
mv.visitVarInsn(ASTORE, store); //11
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
mv.visitVarInsn(ALOAD, 2);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "paramTypes", "[Ljava/lang/reflect/Type;");
mv.visitInsn(ICONST_0);
mv.visitInsn(AALOAD);
mv.visitVarInsn(ALOAD, store);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertToWriter", "(Ljava/lang/reflect/Type;Ljava/lang/Object;)" + convertWriterDesc, false);
mv.visitInsn(ARETURN);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertTo", "(" + convertWriterDesc + "Ljava/lang/reflect/Type;Ljava/lang/Object;)V", false);
mv.visitInsn(RETURN);
store++;
if (maxStack < 10) maxStack = 10;
}

View File

@@ -0,0 +1,94 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.sncp;
import java.util.concurrent.*;
/**
* 简单的Future实现 set、get方法均只能一个线程调用
*
* @author zhangjx
* @param <T>
*/
public class SncpFuture<T> implements Future<T> {
private volatile boolean done;
private T result;
private RuntimeException ex;
public SncpFuture() {
}
public SncpFuture(T result) {
this.result = result;
this.done = true;
}
public void set(T result) {
this.result = result;
this.done = true;
synchronized (this) {
notifyAll();
}
}
public void set(RuntimeException ex) {
this.ex = ex;
this.done = true;
synchronized (this) {
notifyAll();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return done;
}
@Override
public T get() throws InterruptedException, ExecutionException {
if (done) {
if (ex != null) throw ex;
return result;
}
synchronized (this) {
if (!done) wait(10_000);
}
if (done) {
if (ex != null) throw ex;
return result;
}
throw new InterruptedException();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (done) {
if (ex != null) throw ex;
return result;
}
synchronized (this) {
if (!done) wait(unit.toMillis(timeout));
}
if (done) {
if (ex != null) throw ex;
return result;
}
throw new TimeoutException();
}
}

View File

@@ -36,14 +36,11 @@ public final class SncpRequest extends Request {
private int framelength;
//private byte[][] paramBytes;
private boolean ping;
private byte[] body;
private byte[] bufferbytes = new byte[4];
private InetSocketAddress remoteAddress;
private byte[] bufferbytes = new byte[6];
protected SncpRequest(SncpContext context, BsonFactory factory) {
super(context);
@@ -66,10 +63,6 @@ public final class SncpRequest extends Request {
this.nameid = buffer.getLong();
this.actionid = new DLong(buffer.getLong(), buffer.getLong());
buffer.get(bufferbytes);
int port = buffer.getChar();
if (bufferbytes[0] > 0 && port > 0) {
this.remoteAddress = new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]), port);
}
this.bodylength = buffer.getInt();
this.bodyoffset = buffer.getInt();
this.framelength = buffer.getInt();
@@ -79,7 +72,18 @@ public final class SncpRequest extends Request {
return -1;
}
//---------------------body----------------------------------
if (this.channel.isTCP()) { // TCP模式 不管数据包大小 只传一帧数据
this.body = new byte[this.bodylength];
int len = Math.min(this.bodylength, buffer.remaining());
buffer.get(body, 0, len);
this.bodyoffset = len;
return bodylength - len;
}
//--------------------- UDP 模式 ----------------------------
if (this.bodylength == this.framelength) { //只有一帧的数据
if (this.framelength > buffer.remaining()) { //缺失一部分数据
throw new RuntimeException(SncpRequest.class.getSimpleName() + " data need " + this.framelength + " bytes, but only " + buffer.remaining() + " bytes");
}
this.body = new byte[this.framelength];
buffer.get(body);
return 0;
@@ -102,33 +106,10 @@ public final class SncpRequest extends Request {
}
@Override
protected int readBody(ByteBuffer buffer) { // TCP 模式会调用此方法
long rseqid = buffer.getLong();
if (rseqid != this.seqid) throw new RuntimeException("sncp frame receive seqid = " + seqid + ", but first receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != this.serviceid) throw new RuntimeException("sncp frame receive serviceid = " + serviceid + ", but first receive serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != this.nameid) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!this.actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp frame receive actionid = " + actionid + ", but first receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt(); //地址
buffer.getChar(); //端口
final int bodylen = buffer.getInt();
if (bodylen != this.bodylength) throw new RuntimeException("sncp frame receive bodylength = " + bodylen + ", but first bodylength =" + bodylength);
final int bodyOffset = buffer.getInt();
final int framelen = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("sncp frame receive retcode error (retcode=" + retcode + ")");
final SncpContext scontext = (SncpContext) this.context;
RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
entry.add(buffer, bodyOffset);
if (entry.isCompleted()) { //数据读取完毕
this.body = entry.body;
scontext.removeRequestEntity(this.seqid);
}
protected int readBody(ByteBuffer buffer) { // 只有 TCP 模式会调用此方法
final int framelen = buffer.remaining();
buffer.get(this.body, this.bodyoffset, framelen);
this.bodyoffset += framelen;
return framelen;
}
@@ -137,12 +118,16 @@ public final class SncpRequest extends Request {
this.keepAlive = this.channel.isTCP();
}
protected boolean isTCP() {
return this.channel.isTCP();
}
@Override
public String toString() {
return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid + ",actionid=" + this.actionid
+ ",bodylength=" + this.bodylength + ",bodyoffset=" + this.bodyoffset
+ ",framelength=" + this.framelength + ",remoteAddress=" + remoteAddress + "}";
+ ",framelength=" + this.framelength + ",remoteAddress=" + getRemoteAddress() + "}";
}
@Override
@@ -155,7 +140,6 @@ public final class SncpRequest extends Request {
this.bodyoffset = 0;
this.body = null;
this.ping = false;
this.remoteAddress = null;
this.bufferbytes[0] = 0;
super.recycle();
}
@@ -185,7 +169,9 @@ public final class SncpRequest extends Request {
}
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
if (bufferbytes[0] == 0) return null;
return new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]),
((0xff00 & (bufferbytes[4] << 8)) | (0xff & bufferbytes[5])));
}
}

View File

@@ -49,31 +49,50 @@ public final class SncpResponse extends Response<SncpRequest> {
}
public void finish(final int retcode, final BsonWriter out) {
ByteBuffer buffer = context.pollBuffer();
final int bodyLength = (out == null ? 0 : out.count());
final int bufsize = buffer.capacity() - HEADER_SIZE;
if (bufsize > bodyLength) { //只需一帧
//---------------------head----------------------------------
fillHeader(buffer, bodyLength, 0, bodyLength, retcode);
//---------------------body----------------------------------
out.toBuffer(buffer);
buffer.flip();
if (out == null) {
final ByteBuffer buffer = context.pollBuffer();
fillHeader(buffer, 0, 0, 0, retcode);
finish(buffer);
} else {
final int frames = (bodyLength / bufsize) + (bodyLength % bufsize > 0 ? 1 : 0);
final ByteBuffer[] buffers = new ByteBuffer[frames];
int pos = 0;
for (int i = 0; i < frames; i++) {
if (i != 0) buffer = context.pollBuffer();
int len = Math.min(bufsize, bodyLength - pos);
fillHeader(buffer, bodyLength, pos, len, retcode);
buffers[i] = buffer;
out.toBuffer(pos, buffer);
pos += len;
buffer.flip();
}
finish(buffers);
return;
}
final int respBodyLength = out.count() - HEADER_SIZE; //body总长度
if (this.channel.isTCP() || out.count() <= context.getBufferCapacity()) { //TCP模式 或者 一帧数据
fillHeader(out, respBodyLength, 0, respBodyLength, retcode);
finish(out.toBuffer());
return;
}
final int bufsize = context.getBufferCapacity() - HEADER_SIZE;
final int frames = (respBodyLength / bufsize) + (respBodyLength % bufsize > 0 ? 1 : 0);
final ByteBuffer[] buffers = new ByteBuffer[frames];
int pos = 0;
for (int i = 0; i < frames; i++) {
final ByteBuffer buffer = context.pollBuffer();
int len = Math.min(bufsize, respBodyLength - pos);
fillHeader(buffer, respBodyLength, pos, len, retcode);
buffers[i] = buffer;
out.toBuffer(pos + HEADER_SIZE, buffer);
pos += len;
buffer.flip();
}
finish(buffers);
}
private void fillHeader(BsonWriter writer, int bodyLength, int bodyOffset, int framelength, int retcode) {
//---------------------head----------------------------------
int pos = 0;
pos = writer.rewriteTo(pos, request.getSeqid());
pos = writer.rewriteTo(pos, (char) SncpRequest.HEADER_SIZE);
pos = writer.rewriteTo(pos, request.getServiceid());
pos = writer.rewriteTo(pos, request.getNameid());
DLong actionid = request.getActionid();
pos = writer.rewriteTo(pos, actionid.getFirst());
pos = writer.rewriteTo(pos, actionid.getSecond());
pos = writer.rewriteTo(pos, addrBytes);
pos = writer.rewriteTo(pos, (char) this.addrPort);
pos = writer.rewriteTo(pos, bodyLength);
pos = writer.rewriteTo(pos, bodyOffset);
pos = writer.rewriteTo(pos, framelength);
writer.rewriteTo(pos, retcode);
}
private void fillHeader(ByteBuffer buffer, int bodyLength, int bodyOffset, int framelength, int retcode) {

View File

@@ -57,7 +57,7 @@ public final class SncpServer extends Server {
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter");
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool,
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext, sncpcontext.bsonFactory)));
return sncpcontext;