This commit is contained in:
Redkale
2016-08-30 02:13:56 +08:00
parent 81f386dcdb
commit a206ecd76b
8 changed files with 30 additions and 20 deletions

View File

@@ -182,8 +182,8 @@ public class NodeHttpServer extends NodeServer {
if (!autoload && !includeValues.contains(stypename)) return;
if (!restFilter.accept(stypename)) return;
RestHttpServlet servlet = httpServer.addRestServlet(stype, wrapper.getName(), wrapper.getService(), baseServletClass, prefix, (AnyValue) null);
if (finest) logger.finest("Create RestServlet = " + servlet);
RestHttpServlet servlet = httpServer.addRestServlet(stype, wrapper.getName(), wrapper.getService(), baseServletClass, prefix, sncp, (AnyValue) null);
if (finest) logger.finest("Create RestServlet[resource=" + wrapper.getName() + "] = " + servlet);
if (ss != null) {
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
for (int i = 0; i < mappings.length; i++) {

View File

@@ -6,7 +6,6 @@
package org.redkale.boot;
import java.io.*;
import static java.lang.Class.forName;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.InetSocketAddress;
@@ -166,7 +165,7 @@ public abstract class NodeServer {
initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。
String interceptorClass = this.serverConf.getValue("interceptor", "");
if (!interceptorClass.isEmpty()) {
Class clazz = forName(interceptorClass);
Class clazz = Class.forName(interceptorClass);
this.interceptor = (NodeInterceptor) clazz.newInstance();
}
@@ -247,7 +246,7 @@ public abstract class NodeServer {
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, sncpServer.getSncpGroup(), gs, null);
sncpServer.getSncpServer().addService(wrapper);
sncpServer.getSncpServer().addSncpServlet(wrapper);
logger.info("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService());
}
logger.info("[" + Thread.currentThread().getName() + "] Load Source " + source);

View File

@@ -27,7 +27,7 @@ public class NodeSncpServer extends NodeServer {
private NodeSncpServer(Application application, AnyValue serconf) {
super(application, createServer(application, serconf));
this.sncpServer = (SncpServer) this.server;
this.consumer = sncpServer == null ? null : x -> sncpServer.addService(x);
this.consumer = sncpServer == null ? null : x -> sncpServer.addSncpServlet(x);
}
public static NodeServer createNodeServer(Application application, AnyValue serconf) {

View File

@@ -42,7 +42,12 @@ public final class HttpServer extends Server<String, HttpContext, HttpRequest, H
}
public <S extends Service, T extends RestHttpServlet> RestHttpServlet addRestServlet(Class<S> serviceType,
final String name, final S service, final Class<T> baseServletClass, final String prefix, AnyValue conf) {
final String name, final S service, final Class<T> baseServletClass, final String prefix) {
return addRestServlet(serviceType, name, service, baseServletClass, prefix, false, null);
}
public <S extends Service, T extends RestHttpServlet> RestHttpServlet addRestServlet(Class<S> serviceType,
final String name, final S service, final Class<T> baseServletClass, final String prefix, final boolean sncp, AnyValue conf) {
RestHttpServlet servlet = null;
for (final HttpServlet item : ((HttpPrepareServlet) this.prepare).getServlets()) {
if (!(item instanceof RestHttpServlet)) continue;
@@ -53,10 +58,9 @@ public final class HttpServer extends Server<String, HttpContext, HttpRequest, H
break;
}
} catch (NoSuchFieldException | SecurityException e) {
continue;
}
}
if (servlet == null) servlet = Rest.createRestServlet(baseServletClass, serviceType, false);
if (servlet == null) servlet = Rest.createRestServlet(baseServletClass, serviceType, sncp);
try { //若提供动态变更Service服务功能则改Rest服务无法做出相应更新
Field field = servlet.getClass().getDeclaredField(Rest.REST_SERVICE_FIELD_NAME);
field.setAccessible(true);

View File

@@ -10,6 +10,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import org.redkale.convert.bson.*;
import org.redkale.net.*;
import org.redkale.service.Service;
import org.redkale.util.*;
import org.redkale.watch.*;
@@ -37,13 +38,18 @@ public final class SncpServer extends Server<DLong, SncpContext, SncpRequest, Sn
super.init(config);
}
public void addService(ServiceWrapper entry) {
public void addSncpServlet(ServiceWrapper entry) {
for (Class type : entry.getTypes()) {
SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), type, entry.getService());
this.prepare.addServlet(sds, null, entry.getConf());
}
}
public <T extends Service> void addSncpServlet(Class<T> serviceType, String name, T service, AnyValue conf) {
SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), name, serviceType, service);
this.prepare.addServlet(sds, null, conf);
}
public List<SncpServlet> getSncpServlets() {
return ((SncpPrepareServlet) this.prepare).getSncpServlets();
}
@@ -56,16 +62,16 @@ public final class SncpServer extends Server<DLong, SncpContext, SncpRequest, Sn
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter");
final int rcapacity = Math.max(this.bufferCapacity, 4 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter");
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond);
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext)));
return sncpcontext;
}

View File

@@ -23,8 +23,8 @@ public class _DynHelloRestServlet1 extends SimpleRestServlet {
HelloService service = new HelloService();
HttpServer server = new HttpServer();
server.addRestServlet(HelloService.class, "", service, SimpleRestServlet.class, "/pipes", null);
server.addRestServlet(HelloService.class, "my-res", new HelloService(3), SimpleRestServlet.class, "/pipes", null);
server.addRestServlet(HelloService.class, "", service, SimpleRestServlet.class, "/pipes");
server.addRestServlet(HelloService.class, "my-res", new HelloService(3), SimpleRestServlet.class, "/pipes");
DefaultAnyValue conf = DefaultAnyValue.create("port", "" + port);
server.init(conf);

View File

@@ -154,7 +154,7 @@ public class SncpTest {
final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set);
SncpTestService service = Sncp.createLocalService("", null, ResourceFactory.root(), SncpTestService.class, addr, transport, null);
ResourceFactory.root().inject(service);
server.addService(new ServiceWrapper(service, "", "", new HashSet<>(), null));
server.addSncpServlet(new ServiceWrapper(service, "", "", new HashSet<>(), null));
System.out.println(service);
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("host", "0.0.0.0");
@@ -187,7 +187,7 @@ public class SncpTest {
//String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set);
Service service = Sncp.createLocalService("", null, ResourceFactory.root(), SncpTestService.class, addr, transport, null);
server.addService(new ServiceWrapper(service, "", "", new HashSet<>(), null));
server.addSncpServlet(new ServiceWrapper(service, "", "", new HashSet<>(), null));
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("host", "0.0.0.0");
conf.addValue("port", "" + port2);

View File

@@ -18,6 +18,7 @@ import org.redkale.convert.json.*;
public class SncpTestBean implements FilterBean {
@Id
@GeneratedValue
private long id;
private String content;