This commit is contained in:
Redkale
2017-05-14 13:07:02 +08:00
parent f15754386b
commit 8e36a7b450
12 changed files with 42 additions and 79 deletions

View File

@@ -43,7 +43,7 @@ public class NodeHttpServer extends NodeServer {
} }
private static Server createServer(Application application, AnyValue serconf) { private static Server createServer(Application application, AnyValue serconf) {
return new HttpServer(application.getStartTime(), application.getWatchFactory()); return new HttpServer(application.getStartTime());
} }
@Override @Override
@@ -114,7 +114,6 @@ public class NodeHttpServer extends NodeServer {
WebServlet ws = clazz.getAnnotation(WebServlet.class); WebServlet ws = clazz.getAnnotation(WebServlet.class);
if (ws == null || ws.value().length == 0) continue; if (ws == null || ws.value().length == 0) continue;
final HttpServlet servlet = clazz.newInstance(); final HttpServlet servlet = clazz.newInstance();
resourceFactory.inject(servlet, this);
final String[] mappings = ws.value(); final String[] mappings = ws.value();
String pref = ws.repair() ? prefix : ""; String pref = ws.repair() ? prefix : "";
DefaultAnyValue servletConf = (DefaultAnyValue) en.getProperty(); DefaultAnyValue servletConf = (DefaultAnyValue) en.getProperty();

View File

@@ -244,6 +244,7 @@ public abstract class NodeServer {
} }
field.set(src, source); field.set(src, source);
rf.inject(source, self); // 给其可能包含@Resource的字段赋值; rf.inject(source, self); // 给其可能包含@Resource的字段赋值;
//NodeServer.this.watchFactory.inject(src);
if (source instanceof Service) ((Service) source).init(null); if (source instanceof Service) ((Service) source).init(null);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e); logger.log(Level.SEVERE, "DataSource inject error", e);

View File

@@ -41,7 +41,7 @@ public class NodeSncpServer extends NodeServer {
} }
private static Server createServer(Application application, AnyValue serconf) { private static Server createServer(Application application, AnyValue serconf) {
return new SncpServer(application.getStartTime(), application.getWatchFactory()); return new SncpServer(application.getStartTime());
} }
@Override @Override

View File

@@ -14,7 +14,6 @@ import java.util.logging.*;
import org.redkale.convert.bson.*; import org.redkale.convert.bson.*;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.*;
/** /**
* 服务器上下文对象 * 服务器上下文对象
@@ -70,12 +69,8 @@ public class Context {
//JSON操作工厂 //JSON操作工厂
protected final JsonFactory jsonFactory; protected final JsonFactory jsonFactory;
//监控对象
protected final WatchFactory watch;
public Context(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool, public Context(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final WatchFactory watch, final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final int readTimeoutSecond, final int writeTimeoutSecond) {
final int readTimeoutSecond, final int writeTimeoutSecond) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;
this.logger = logger; this.logger = logger;
this.executor = executor; this.executor = executor;
@@ -86,7 +81,6 @@ public class Context {
this.charset = UTF8.equals(charset) ? null : charset; this.charset = UTF8.equals(charset) ? null : charset;
this.address = address; this.address = address;
this.prepare = prepare; this.prepare = prepare;
this.watch = watch;
this.readTimeoutSecond = readTimeoutSecond; this.readTimeoutSecond = readTimeoutSecond;
this.writeTimeoutSecond = writeTimeoutSecond; this.writeTimeoutSecond = writeTimeoutSecond;
this.jsonFactory = JsonFactory.root(); this.jsonFactory = JsonFactory.root();

View File

@@ -15,7 +15,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.watch.WatchFactory;
/** /**
* *
@@ -39,9 +38,6 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//服务的启动时间 //服务的启动时间
protected final long serverStartTime; protected final long serverStartTime;
//监控对象
protected final WatchFactory watch;
//服务的名称 //服务的名称
protected String name; protected String name;
@@ -93,11 +89,10 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//IO写入 的超时秒数小于1视为不设置 //IO写入 的超时秒数小于1视为不设置
protected int writeTimeoutSecond; protected int writeTimeoutSecond;
protected Server(long serverStartTime, String protocol, PrepareServlet<K, C, R, P, S> servlet, final WatchFactory watch) { protected Server(long serverStartTime, String protocol, PrepareServlet<K, C, R, P, S> servlet) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;
this.protocol = protocol; this.protocol = protocol;
this.prepare = servlet; this.prepare = servlet;
this.watch = watch;
} }
public void init(final AnyValue config) throws Exception { public void init(final AnyValue config) throws Exception {
@@ -161,7 +156,6 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
public void start() throws IOException { public void start() throws IOException {
this.context = this.createContext(); this.context = this.createContext();
this.prepare.init(this.context, config); this.prepare.init(this.context, config);
if (this.watch != null) this.watch.inject(this.prepare);
this.serverChannel = ProtocolServer.create(this.protocol, context); this.serverChannel = ProtocolServer.create(this.protocol, context);
this.serverChannel.open(); this.serverChannel.open();
if (this.serverChannel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY)) { if (this.serverChannel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY)) {

View File

@@ -15,7 +15,6 @@ import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.Opcodes.*; import static jdk.internal.org.objectweb.asm.Opcodes.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.*;
/** /**
* HTTP服务的上下文对象 * HTTP服务的上下文对象
@@ -33,9 +32,9 @@ public class HttpContext extends Context {
public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare, ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare,
WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) { int readTimeoutSecond, int writeTimeoutSecond) {
super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset, super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset,
address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); address, prepare, readTimeoutSecond, writeTimeoutSecond);
random.setSeed(Math.abs(System.nanoTime())); random.setSeed(Math.abs(System.nanoTime()));
} }
@@ -46,10 +45,6 @@ public class HttpContext extends Context {
return new String(Utility.binToHex(bytes)); return new String(Utility.binToHex(bytes));
} }
protected WatchFactory getWatchFactory() {
return watch;
}
protected ExecutorService getExecutor() { protected ExecutorService getExecutor() {
return executor; return executor;
} }

View File

@@ -14,7 +14,6 @@ import java.util.logging.*;
import java.util.regex.*; import java.util.regex.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.*;
/** /**
* HTTP Servlet的总入口请求在HttpPrepareServlet中进行分流。 <br> * HTTP Servlet的总入口请求在HttpPrepareServlet中进行分流。 <br>
@@ -103,12 +102,6 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
s.preInit(context, getServletConf(s)); s.preInit(context, getServletConf(s));
s.init(context, getServletConf(s)); s.init(context, getServletConf(s));
}); });
final WatchFactory watch = context.getWatchFactory();
if (watch != null) {
servlets.forEach(s -> {
watch.inject(s);
});
}
AnyValue resConfig = config.getAnyValue("resource-servlet"); AnyValue resConfig = config.getAnyValue("resource-servlet");
if ((resConfig instanceof DefaultAnyValue) && resConfig.getValue("webroot", "").isEmpty()) { if ((resConfig instanceof DefaultAnyValue) && resConfig.getValue("webroot", "").isEmpty()) {
((DefaultAnyValue) resConfig).addValue("webroot", config.getValue("root")); ((DefaultAnyValue) resConfig).addValue("webroot", config.getValue("root"));

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.WatchFactory;
/** /**
* Http服务器 * Http服务器
@@ -26,11 +25,11 @@ import org.redkale.watch.WatchFactory;
public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpResponse, HttpServlet> { public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpResponse, HttpServlet> {
public HttpServer() { public HttpServer() {
this(System.currentTimeMillis(), null); this(System.currentTimeMillis());
} }
public HttpServer(long serverStartTime, final WatchFactory watch) { public HttpServer(long serverStartTime) {
super(serverStartTime, "TCP", new HttpPrepareServlet(), watch); super(serverStartTime, "TCP", new HttpPrepareServlet());
} }
@Override @Override
@@ -170,8 +169,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected HttpContext createContext() { protected HttpContext createContext() {
final int port = this.address.getPort(); final int port = this.address.getPort();
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.creatCounter"); AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = new AtomicLong();
this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0; this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0;
final int rcapacity = this.bufferCapacity; final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
@@ -250,11 +249,11 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final String[][] setHeaders = defaultSetHeaders.isEmpty() ? null : defaultSetHeaders.toArray(new String[defaultSetHeaders.size()][]); final String[][] setHeaders = defaultSetHeaders.isEmpty() ? null : defaultSetHeaders.toArray(new String[defaultSetHeaders.size()][]);
final HttpCookie defCookie = defaultCookie; final HttpCookie defCookie = defaultCookie;
final String addrHeader = remoteAddrHeader; final String addrHeader = remoteAddrHeader;
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool, HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond); this.maxbody, this.charset, this.address, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), addHeaders, setHeaders, defCookie)); responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), addHeaders, setHeaders, defCookie));
return httpcontext; return httpcontext;
} }

View File

@@ -12,7 +12,6 @@ import java.util.concurrent.ExecutorService;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.ObjectPool; import org.redkale.util.ObjectPool;
import org.redkale.watch.WatchFactory;
/** /**
* <p> * <p>
@@ -23,9 +22,9 @@ import org.redkale.watch.WatchFactory;
public class SncpContext extends Context { public class SncpContext extends Context {
public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare, ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare,
WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) { int readTimeoutSecond, int writeTimeoutSecond) {
super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset, super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset,
address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); address, prepare, readTimeoutSecond, writeTimeoutSecond);
} }
} }

View File

@@ -12,7 +12,6 @@ import org.redkale.convert.bson.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.*;
/** /**
* Service Node Communicate Protocol * Service Node Communicate Protocol
@@ -26,11 +25,11 @@ import org.redkale.watch.*;
public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResponse, SncpServlet> { public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResponse, SncpServlet> {
public SncpServer() { public SncpServer() {
this(System.currentTimeMillis(), null); this(System.currentTimeMillis());
} }
public SncpServer(long serverStartTime, final WatchFactory watch) { public SncpServer(long serverStartTime) {
super(serverStartTime, "TCP", new SncpPrepareServlet(), watch); super(serverStartTime, "TCP", new SncpPrepareServlet());
} }
@Override @Override
@@ -58,8 +57,8 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected SncpContext createContext() { protected SncpContext createContext() {
final int port = this.address.getPort(); final int port = this.address.getPort();
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.creatCounter"); AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = Math.max(this.bufferCapacity, 4 * 1024); final int rcapacity = Math.max(this.bufferCapacity, 4 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
@@ -67,11 +66,11 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
e.clear(); e.clear();
return true; return true;
}); });
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter"); AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter"); AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool, SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond); this.maxbody, this.charset, this.address, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext))); responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext)));
return sncpcontext; return sncpcontext;
} }

View File

@@ -11,7 +11,6 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.logging.*; import java.util.logging.*;
import java.util.regex.Pattern;
import javax.annotation.Resource; import javax.annotation.Resource;
/** /**
@@ -279,20 +278,6 @@ public final class ResourceFactory {
return null; return null;
} }
private <A> void load(final Pattern reg, Class<? extends A> clazz, final A exclude, final Map<String, A> result) {
ConcurrentHashMap<String, ResourceEntry> map = this.store.get(clazz);
if (map != null) {
for (Map.Entry<String, ResourceEntry> en : map.entrySet()) { // 不用forEach为兼容JDK 6
String x = en.getKey();
ResourceEntry re = en.getValue();
if (re == null) continue;
Object y = re.value;
if (y != exclude && reg.matcher(x).find() && result.get(x) == null) result.put(x, (A) y);
}
}
if (parent != null) parent.load(reg, clazz, exclude, result);
}
public <T> boolean inject(final Object src) { public <T> boolean inject(final Object src) {
return inject(src, null); return inject(src, null);
} }

View File

@@ -20,15 +20,18 @@ import java.util.function.LongSupplier;
*/ */
public final class WatchFactory { public final class WatchFactory {
private static final WatchFactory instance = new WatchFactory(null); private static final WatchFactory instance = new WatchFactory("", null);
private final List<WeakReference<WatchFactory>> chidren = new CopyOnWriteArrayList<>(); private final List<WeakReference<WatchFactory>> chidren = new CopyOnWriteArrayList<>();
private final List<WeakReference<WatchNode>> beans = new CopyOnWriteArrayList<>(); private final List<WeakReference<WatchNode>> beans = new CopyOnWriteArrayList<>();
private final String name;
private final WatchFactory parent; private final WatchFactory parent;
private WatchFactory(WatchFactory parent) { private WatchFactory(String name, WatchFactory parent) {
this.name = name;
this.parent = parent; this.parent = parent;
} }
@@ -42,8 +45,8 @@ public final class WatchFactory {
return instance; return instance;
} }
public WatchFactory createChild() { public WatchFactory createChild(final String name) {
WatchFactory child = new WatchFactory(this); WatchFactory child = new WatchFactory(name, this);
this.chidren.add(new WeakReference<>(child)); this.chidren.add(new WeakReference<>(child));
return child; return child;
} }
@@ -87,9 +90,7 @@ public final class WatchFactory {
} }
public WatchNumber createWatchNumber(String name, String description, boolean interval, long v) { public WatchNumber createWatchNumber(String name, String description, boolean interval, long v) {
WatchNumber bean = new WatchNumber(name, description, interval, v); return new WatchNumber(name, description, interval, v);
register(bean);
return bean;
} }
public void register(String name, LongSupplier supplier) { public void register(String name, LongSupplier supplier) {
@@ -106,11 +107,15 @@ public final class WatchFactory {
} }
} }
public boolean inject(final Object src) { protected <T> boolean inject(final Object src) {
return inject(src, new ArrayList<>()); return inject(src, null);
} }
private boolean inject(final Object src, final List<Object> list) { protected <T> boolean inject(final Object src, final T attachment) {
return inject(src, attachment, new ArrayList<>());
}
private <T> boolean inject(final Object src, final T attachment, final List<Object> list) {
if (src == null) return false; if (src == null) return false;
try { try {
list.add(src); list.add(src);
@@ -121,7 +126,7 @@ public final class WatchFactory {
field.setAccessible(true); field.setAccessible(true);
final Class type = field.getType(); final Class type = field.getType();
Watchable wo = field.getAnnotation(Watchable.class); Watchable wo = field.getAnnotation(Watchable.class);
if (wo == null && !WatchNode.class.isAssignableFrom(type)) continue;
} }
} while ((clazz = clazz.getSuperclass()) != Object.class); } while ((clazz = clazz.getSuperclass()) != Object.class);
return true; return true;