This commit is contained in:
kamhung
2015-11-23 16:58:00 +08:00
parent 0cbb4ed0e2
commit 2c0ad21ef3
12 changed files with 116 additions and 300 deletions

View File

@@ -62,7 +62,7 @@
</properties> </properties>
</resources> </resources>
<!-- <!--
protocol: required server所启动的协议有HTTP、SNCP 目前只支持HTTP、SNCP。SNCP分TCP、UDP实现默认使用TCP实现UDP实现则使用SNCP.UDP值; protocol: required server所启动的协议有HTTP、SNCP 目前只支持HTTP、SNCP。SNCP使用TCP实现;
host: 服务所占address 默认: 0.0.0.0 host: 服务所占address 默认: 0.0.0.0
port: required 服务所占端口 port: required 服务所占端口
root: 如果是web类型服务则包含页面 默认:{APP_HOME}/root root: 如果是web类型服务则包含页面 默认:{APP_HOME}/root

View File

@@ -269,7 +269,7 @@ public final class Application {
for (AnyValue conf : resources.getAnyValues("group")) { for (AnyValue conf : resources.getAnyValues("group")) {
final String group = conf.getValue("name", ""); final String group = conf.getValue("name", "");
String protocol = conf.getValue("protocol", Sncp.DEFAULT_PROTOCOL).toUpperCase(); String protocol = conf.getValue("protocol", Transport.DEFAULT_PROTOCOL).toUpperCase();
if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) { if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) {
throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol")); throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol"));
} }

View File

@@ -47,7 +47,7 @@ public abstract class NodeServer {
private String sncpGroup = null; //当前Server的SNCP协议的组 private String sncpGroup = null; //当前Server的SNCP协议的组
private String nodeProtocol = Sncp.DEFAULT_PROTOCOL; private String nodeProtocol = Transport.DEFAULT_PROTOCOL;
private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构 private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构

View File

@@ -27,13 +27,7 @@ public final class NodeSncpServer extends NodeServer {
} }
private static Server createServer(Application application, AnyValue serconf) { private static Server createServer(Application application, AnyValue serconf) {
String proto = serconf.getValue("protocol", ""); return new SncpServer(application.getStartTime(), application.getWatchFactory());
String subprotocol = Sncp.DEFAULT_PROTOCOL;
int pos = proto.indexOf('.');
if (pos > 0) {
subprotocol = proto.substring(pos + 1);
}
return new SncpServer(application.getStartTime(), subprotocol, application.getWatchFactory());
} }
@Override @Override

View File

@@ -22,6 +22,8 @@ import java.util.function.*;
*/ */
public final class Transport { public final class Transport {
public static final String DEFAULT_PROTOCOL = "TCP";
protected static final int MAX_POOL_LIMIT = Runtime.getRuntime().availableProcessors() * 16; protected static final int MAX_POOL_LIMIT = Runtime.getRuntime().availableProcessors() * 16;
protected final String name; protected final String name;
@@ -46,6 +48,10 @@ public final class Transport {
this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports)); this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports));
} }
public Transport(String name, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this(name, DEFAULT_PROTOCOL, watch, bufferPoolSize, addresses);
}
public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) { public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this.name = name; this.name = name;
this.protocol = protocol; this.protocol = protocol;

View File

@@ -38,8 +38,6 @@ public abstract class Sncp {
private static final java.lang.reflect.Type GROUPS_TYPE2 = new TypeToken<String[]>() { private static final java.lang.reflect.Type GROUPS_TYPE2 = new TypeToken<String[]>() {
}.getType(); }.getType();
public static final String DEFAULT_PROTOCOL = "TCP";
static final String LOCALPREFIX = "_DynLocal"; static final String LOCALPREFIX = "_DynLocal";
static final String REMOTEPREFIX = "_DynRemote"; static final String REMOTEPREFIX = "_DynRemote";
@@ -611,6 +609,7 @@ public abstract class Sncp {
/** /**
* *
* 创建本地模式Service实例 * 创建本地模式Service实例
*
* @param <T> * @param <T>
* @param name * @param name
* @param executor * @param executor

View File

@@ -181,7 +181,7 @@ public final class SncpClient {
} }
public <T> T remote(final BsonConvert convert, Transport transport, final int index, final Object... params) { public <T> T remote(final BsonConvert convert, Transport transport, final int index, final Object... params) {
Future<byte[]> future = transport.isTCP() ? remoteTCP(convert, transport, actions[index], params) : remoteUDP(convert, transport, actions[index], params); Future<byte[]> future = remote(convert, transport, actions[index], params);
try { try {
return convert.convertFrom(actions[index].resultTypes, future.get(5, TimeUnit.SECONDS)); return convert.convertFrom(actions[index].resultTypes, future.get(5, TimeUnit.SECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) { } catch (InterruptedException | ExecutionException | TimeoutException e) {
@@ -193,11 +193,7 @@ public final class SncpClient {
public <T> void remote(final BsonConvert convert, Transport[] transports, boolean run, final int index, final Object... params) { public <T> void remote(final BsonConvert convert, Transport[] transports, boolean run, final int index, final Object... params) {
if (!run) return; if (!run) return;
for (Transport transport : transports) { for (Transport transport : transports) {
if (transport.isTCP()) { remote(convert, transport, actions[index], params);
remoteTCP(convert, transport, actions[index], params);
} else {
remoteUDP(convert, transport, actions[index], params);
}
} }
} }
@@ -206,94 +202,17 @@ public final class SncpClient {
if (executor != null) { if (executor != null) {
executor.accept(() -> { executor.accept(() -> {
for (Transport transport : transports) { for (Transport transport : transports) {
if (transport.isTCP()) { remote(convert, transport, actions[index], params);
remoteTCP(convert, transport, actions[index], params);
} else {
remoteUDP(convert, transport, actions[index], params);
}
} }
}); });
} else { } else {
for (Transport transport : transports) { for (Transport transport : transports) {
if (transport.isTCP()) { remote(convert, transport, actions[index], params);
remoteTCP(convert, transport, actions[index], params);
} else {
remoteUDP(convert, transport, actions[index], params);
}
} }
} }
} }
private Future<byte[]> remoteUDP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) { private Future<byte[]> remote(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes;
final Supplier<ByteBuffer> supplier = transport.getBufferSupplier();
final BsonWriter bw = convert.pollBsonWriter(() -> supplier.get().put(DEFAULT_HEADER)); // 将head写入
for (int i = 0; i < params.length; i++) {
convert.convertTo(bw, myparamtypes[i], params[i]);
}
final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null;
final AsyncConnection conn = transport.pollConnection(addr);
if (conn == null || !conn.isOpen()) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + jsonConvert.convertTo(params) + ") cannot connect " + (conn == null ? addr : conn.getRemoteAddress()));
throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect");
}
final int reqBodyLength = bw.count(); //body总长度
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final int readto = conn.getReadTimeoutSecond();
final int writeto = conn.getWriteTimeoutSecond();
final ByteBuffer buffer = transport.pollBuffer();
try {
//------------------------------ 发送请求 ---------------------------------------------------
int pos = 0;
for (ByteBuffer buf : bw.toBuffers()) {
int len = buf.remaining() - HEADER_SIZE;
fillHeader(buf, seqid, actionid, reqBodyLength, pos, len);
pos += len;
Thread.sleep(20);
conn.write(buf).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
transport.offerBuffer(buf);
}
//------------------------------ 接收响应 ---------------------------------------------------
int received = 0;
int respBodyLength = 1;
byte[] respBody = null;
while (received < respBodyLength) {
buffer.clear();
conn.read(buffer).get(readto > 0 ? readto : 3, TimeUnit.SECONDS);
buffer.flip();
checkResult(seqid, action, buffer);
int respbodylen = buffer.getInt();
if (respBody == null) {
respBodyLength = respbodylen;
respBody = new byte[respBodyLength];
}
int bodyOffset = buffer.getInt(); //
int frameLength = buffer.getInt(); //
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + jsonConvert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
int len = Math.min(buffer.remaining(), frameLength);
buffer.get(respBody, bodyOffset, len);
received += len;
}
return new SncpFuture<>(respBody);
} catch (RuntimeException e) {
throw e;
} catch (Exception ex) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + jsonConvert.convertTo(params) + ") udp remote error", ex);
throw new RuntimeException(ex);
} finally {
transport.offerBuffer(buffer);
transport.offerConnection(true, conn);
}
}
private Future<byte[]> remoteTCP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes; Type[] myparamtypes = action.paramTypes;
final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER); writer.writeTo(DEFAULT_HEADER);

View File

@@ -12,7 +12,6 @@ import com.wentch.redkale.watch.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.nio.charset.*; import java.nio.charset.*;
import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.logging.*; import java.util.logging.*;
@@ -22,35 +21,6 @@ import java.util.logging.*;
*/ */
public final class SncpContext extends Context { public final class SncpContext extends Context {
protected static class RequestEntry {
protected final long seqid;
protected final byte[] body;
protected final long time = System.currentTimeMillis();
private int received;
public RequestEntry(long seqid, byte[] body) {
this.seqid = seqid;
this.body = body;
}
public void add(ByteBuffer buffer, int pos) {
int len = Math.min(buffer.remaining(), this.body.length - this.received);
this.received += len;
buffer.get(body, pos, len);
}
public boolean isCompleted() {
return this.body.length <= this.received;
}
}
private final ConcurrentHashMap<Long, RequestEntry> requests = new ConcurrentHashMap<>();
protected final BsonFactory bsonFactory; protected final BsonFactory bsonFactory;
public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
@@ -61,32 +31,6 @@ public final class SncpContext extends Context {
this.bsonFactory = BsonFactory.root(); this.bsonFactory = BsonFactory.root();
} }
protected RequestEntry addRequestEntity(long seqid, byte[] bodys) {
RequestEntry entry = new RequestEntry(seqid, bodys);
requests.put(seqid, entry);
return entry;
}
protected void expireRequestEntry(long milliSecond) {
if (requests.size() < 32) return;
List<Long> seqids = new ArrayList<>();
long t = System.currentTimeMillis() - milliSecond;
requests.forEach((x, y) -> {
if (y.time < t) seqids.add(x);
});
for (long seqid : seqids) {
requests.remove(seqid);
}
}
protected void removeRequestEntity(long seqid) {
requests.remove(seqid);
}
protected RequestEntry getRequestEntity(long seqid) {
return requests.get(seqid);
}
protected WatchFactory getWatchFactory() { protected WatchFactory getWatchFactory() {
return watch; return watch;
} }

View File

@@ -85,14 +85,8 @@ public final class SncpDynServlet extends SncpServlet {
@Override @Override
public void execute(SncpRequest request, SncpResponse response) throws IOException { public void execute(SncpRequest request, SncpResponse response) throws IOException {
final boolean tcp = request.isTCP();
if (bufferSupplier == null) { if (bufferSupplier == null) {
if (tcp) {
bufferSupplier = request.getContext().getBufferSupplier(); bufferSupplier = request.getContext().getBufferSupplier();
} else { //UDP 需要分包
final Supplier<ByteBuffer> supplier = request.getContext().getBufferSupplier();
bufferSupplier = () -> supplier.get().put(DEFAULT_HEADER);
}
} }
SncpServletAction action = actions.get(request.getActionid()); SncpServletAction action = actions.get(request.getActionid());
//if (finest) logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); //if (finest) logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));
@@ -100,7 +94,7 @@ public final class SncpDynServlet extends SncpServlet {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else { } else {
BsonWriter out = action.convert.pollBsonWriter(bufferSupplier); BsonWriter out = action.convert.pollBsonWriter(bufferSupplier);
if (tcp) out.writeTo(DEFAULT_HEADER); out.writeTo(DEFAULT_HEADER);
BsonReader in = action.convert.pollBsonReader(); BsonReader in = action.convert.pollBsonReader();
try { try {
in.setBytes(request.getBody()); in.setBytes(request.getBody());

View File

@@ -7,7 +7,6 @@ package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.*; import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.net.*; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.sncp.SncpContext.RequestEntry;
import com.wentch.redkale.util.*; import com.wentch.redkale.util.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
@@ -74,41 +73,15 @@ public final class SncpRequest extends Request {
return -1; return -1;
} }
//---------------------body---------------------------------- //---------------------body----------------------------------
if (this.channel.isTCP()) { // TCP模式 不管数据包大小 只传一帧数据
this.body = new byte[this.bodylength]; this.body = new byte[this.bodylength];
int len = Math.min(this.bodylength, buffer.remaining()); int len = Math.min(this.bodylength, buffer.remaining());
buffer.get(body, 0, len); buffer.get(body, 0, len);
this.bodyoffset = len; this.bodyoffset = len;
return bodylength - len; return bodylength - len;
} }
//--------------------- UDP 模式 ----------------------------
if (this.bodylength == this.framelength) { //只有一帧的数据
if (this.framelength > buffer.remaining()) { //缺失一部分数据
throw new RuntimeException(SncpRequest.class.getSimpleName() + " data need " + this.framelength + " bytes, but only " + buffer.remaining() + " bytes");
}
this.body = new byte[this.framelength];
buffer.get(body);
return 0;
}
//多帧数据
final SncpContext scontext = (SncpContext) this.context;
RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
entry.add(buffer, this.bodyoffset);
if (entry.isCompleted()) { //数据读取完毕
this.body = entry.body;
scontext.removeRequestEntity(this.seqid);
return 0;
} else {
scontext.expireRequestEntry(10 * 1000); //10秒过期
}
if (this.channel.isTCP()) return this.bodylength - this.framelength;
return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE
}
@Override @Override
protected int readBody(ByteBuffer buffer) { // 只有 TCP 模式会调用此方法 protected int readBody(ByteBuffer buffer) {
final int framelen = buffer.remaining(); final int framelen = buffer.remaining();
buffer.get(this.body, this.bodyoffset, framelen); buffer.get(this.body, this.bodyoffset, framelen);
this.bodyoffset += framelen; this.bodyoffset += framelen;
@@ -117,11 +90,7 @@ public final class SncpRequest extends Request {
@Override @Override
protected void prepare() { protected void prepare() {
this.keepAlive = this.channel.isTCP(); this.keepAlive = true;
}
protected boolean isTCP() {
return this.channel.isTCP();
} }
@Override @Override

View File

@@ -57,16 +57,7 @@ public final class SncpResponse extends Response<SncpRequest> {
} }
final int respBodyLength = out.count(); //body总长度 final int respBodyLength = out.count(); //body总长度
final ByteBuffer[] buffers = out.toBuffers(); final ByteBuffer[] buffers = out.toBuffers();
if (this.channel.isTCP()) { //TCP模式 TCP的总长度需要减去第一个buffer的header长度
fillHeader(buffers[0], respBodyLength - HEADER_SIZE, 0, respBodyLength - HEADER_SIZE, retcode); fillHeader(buffers[0], respBodyLength - HEADER_SIZE, 0, respBodyLength - HEADER_SIZE, retcode);
} else {
int pos = 0;
for (ByteBuffer buffer : buffers) {
int len = buffer.remaining() - HEADER_SIZE;
fillHeader(buffer, respBodyLength, pos, len, retcode);
pos += len;
}
}
finish(buffers); finish(buffers);
} }

View File

@@ -20,12 +20,12 @@ import java.util.concurrent.atomic.*;
*/ */
public final class SncpServer extends Server { public final class SncpServer extends Server {
public SncpServer(String protocol) { public SncpServer() {
this(System.currentTimeMillis(), protocol, null); this(System.currentTimeMillis(), null);
} }
public SncpServer(long serverStartTime, String protocol, final WatchFactory watch) { public SncpServer(long serverStartTime, final WatchFactory watch) {
super(serverStartTime, protocol, new SncpPrepareServlet(), watch); super(serverStartTime, "TCP", new SncpPrepareServlet(), watch);
} }
@Override @Override