diff --git a/src/com/wentch/redkale/boot/Application.java b/src/com/wentch/redkale/boot/Application.java index bdd2fa872..e727dc754 100644 --- a/src/com/wentch/redkale/boot/Application.java +++ b/src/com/wentch/redkale/boot/Application.java @@ -23,6 +23,7 @@ import java.nio.*; import java.nio.channels.*; import java.nio.file.*; import java.util.*; +import java.util.AbstractMap.SimpleEntry; import java.util.concurrent.*; import java.util.logging.*; import javax.annotation.*; @@ -31,7 +32,7 @@ import org.w3c.dom.*; /** * 编译时需要加入: -XDignore.symbol.file=true - * + *

* 进程启动类,程序启动后读取application.xml,进行classpath扫描动态加载Service与Servlet, * 再进行Service、Servlet与其他资源之间的依赖注入。 * @@ -39,14 +40,28 @@ import org.w3c.dom.*; */ public final class Application { + //进程启动的时间, 类型: long public static final String RESNAME_TIME = "APP_TIME"; + //本地进程的根目录, 类型:String public static final String RESNAME_HOME = "APP_HOME"; + //本地节点的名称, 类型:String public static final String RESNAME_NODE = "APP_NODE"; + //本地节点的所属组, 类型:String、Map>、Map>> + public static final String RESNAME_GROUP = "APP_GROUP"; + + //本地节点的所属组所有节点名, 类型:Set 、List>包含自身节点名 + public static final String RESNAME_INGROUP = "APP_INGROUP"; + + //除本地节点的所属组外其他所有组的所有节点名, 类型:Map>、Map>> + public static final String RESNAME_OUTGROUP = "APP_OUTGROUP"; + + //本地节点的IP地址, 类型:InetAddress、String public static final String RESNAME_ADDR = "APP_ADDR"; - + + //application.xml 文件中resources节点的内容, 类型: AnyValue public static final String RESNAME_GRES = "APP_GRES"; protected final ResourceFactory factory = ResourceFactory.root(); @@ -61,6 +76,8 @@ public final class Application { protected final InetAddress localAddress = Utility.localInetAddress(); + protected String nodeGroup = ""; + protected String nodeName = ""; //-------------------------------------------------------------------------------------------- @@ -92,7 +109,12 @@ public final class Application { } final File logconf = new File(root, "conf/logging.properties"); this.nodeName = config.getValue("node", ""); + this.nodeGroup = config.getValue("group", ""); this.factory.register(RESNAME_NODE, this.nodeName); + this.factory.register(RESNAME_GROUP, this.nodeGroup); + System.setProperty(RESNAME_NODE, this.nodeName); + System.setProperty(RESNAME_GROUP, this.nodeGroup); + this.factory.register(RESNAME_ADDR, this.localAddress.getHostAddress()); this.factory.register(RESNAME_ADDR, InetAddress.class, this.localAddress); if (logconf.isFile() && logconf.canRead()) { @@ -415,14 +437,25 @@ public final class Application { //------------------------------------------------------------------------ final String host = this.localAddress.getHostAddress(); + final Map> groups = new HashMap<>(); + final Map>> groups2 = new HashMap<>(); for (AnyValue conf : resources.getAnyValues("remote")) { final String name = conf.getValue("name"); + final String group = conf.getValue("group", ""); if (name == null) throw new RuntimeException("remote name is null"); String protocol = conf.getValue("protocol", "UDP").toUpperCase(); if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) { throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol")); } + { + Set set = groups.get(group); + if (set == null) { + set = new HashSet<>(); + groups.put(group, set); + } + set.add(name); + } AnyValue[] addrs = conf.getAnyValues("address"); InetSocketAddress[] addresses = new InetSocketAddress[addrs.length]; int i = -1; @@ -430,17 +463,51 @@ public final class Application { addresses[++i] = new InetSocketAddress(addr.getValue("addr"), addr.getIntValue("port")); } if (addresses.length < 1) throw new RuntimeException("Transport(" + name + ") have no address "); - Transport transport = new Transport(name, protocol, watch, 100, addresses[0]); + { + List> list = groups2.get(group); + if (list == null) { + list = new ArrayList<>(); + groups2.put(group, list); + } + list.add(new SimpleEntry<>(name, addresses)); + } + Transport transport = new Transport(name, protocol, watch, 100, addresses); factory.register(name, Transport.class, transport); if (this.nodeName.isEmpty() && host.equals(addrs[0].getValue("addr"))) { this.nodeName = name; + this.nodeGroup = group; this.factory.register(RESNAME_NODE, this.nodeName); + this.factory.register(RESNAME_GROUP, this.nodeGroup); + System.setProperty(RESNAME_NODE, this.nodeName); + System.setProperty(RESNAME_GROUP, this.nodeGroup); } } + + this.factory.register(RESNAME_GROUP, new TypeToken>>() { + }.getType(), groups); + this.factory.register(RESNAME_GROUP, new TypeToken>>>() { + }.getType(), groups2); + + final Map>> outgroups2 = new HashMap<>(); + final Map> outgroups = new HashMap<>(); + groups.entrySet().stream().filter(x -> !x.getKey().equals(nodeName)).forEach(x -> outgroups.put(x.getKey(), x.getValue())); + groups2.entrySet().stream().filter(x -> !x.getKey().equals(nodeName)).forEach(x -> outgroups2.put(x.getKey(), x.getValue())); + + this.factory.register(RESNAME_OUTGROUP, new TypeToken>>() { + }.getType(), outgroups); + this.factory.register(RESNAME_OUTGROUP, new TypeToken>>>() { + }.getType(), outgroups2); + + Set ingroup = groups.get(this.nodeGroup); + if (ingroup != null) this.factory.register(RESNAME_INGROUP, new TypeToken>() { + }.getType(), ingroup); + List> inengroup = groups2.get(this.nodeGroup); + if (inengroup != null) this.factory.register(RESNAME_INGROUP, new TypeToken>>() { + }.getType(), inengroup); } //------------------------------------------------------------------------ - logger.info(RESNAME_NODE + "=" + this.nodeName); + logger.info(RESNAME_NODE + "=" + this.nodeName + "; " + RESNAME_GROUP + "=" + this.nodeGroup); logger.info("datasource.nodeid=" + this.factory.find("property.datasource.nodeid", String.class)); } diff --git a/src/com/wentch/redkale/net/AsyncDatagramChannel.java b/src/com/wentch/redkale/net/AsyncDatagramChannel.java index 4d50ff544..3b28cb1eb 100644 --- a/src/com/wentch/redkale/net/AsyncDatagramChannel.java +++ b/src/com/wentch/redkale/net/AsyncDatagramChannel.java @@ -352,7 +352,7 @@ public final class AsyncDatagramChannel implements AsynchronousByteChannel, Mult if (buffers[index].hasRemaining()) { implSend(buffers[index], target, attachment, this); } else if (index == max) { - handler.completed(resultSum, attachment); + if (handler != null) handler.completed(resultSum, attachment); } else { implSend(buffers[++index], target, attachment, this); } @@ -360,7 +360,7 @@ public final class AsyncDatagramChannel implements AsynchronousByteChannel, Mult @Override public void failed(Throwable exc, A attachment) { - handler.failed(exc, attachment); + if (handler != null) handler.failed(exc, attachment); } }); } diff --git a/src/com/wentch/redkale/net/Response.java b/src/com/wentch/redkale/net/Response.java index 7588722cc..ce8abfe60 100644 --- a/src/com/wentch/redkale/net/Response.java +++ b/src/com/wentch/redkale/net/Response.java @@ -125,7 +125,7 @@ public abstract class Response { } public void finish(ByteBuffer buffer) { - finish(false, buffer); + this.channel.write(buffer, buffer, finishHandler); } public void finish(boolean kill, ByteBuffer buffer) { @@ -134,7 +134,7 @@ public abstract class Response { } public void finish(ByteBuffer... buffers) { - finish(false, buffers); + this.channel.write(buffers, buffers, finishHandler2); } public void finish(boolean kill, ByteBuffer... buffers) { diff --git a/src/com/wentch/redkale/net/SSLBuilder.java b/src/com/wentch/redkale/net/SSLBuilder.java index 4e13e3ddd..4547c05e5 100644 --- a/src/com/wentch/redkale/net/SSLBuilder.java +++ b/src/com/wentch/redkale/net/SSLBuilder.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net; +import java.nio.*; import java.security.*; import javax.net.ssl.*; @@ -35,4 +36,114 @@ public class SSLBuilder { sslEngine.setNeedClientAuth(false); } + + private static final byte CHANGE_CIPHER_SPECT_CONTENT_TYPE = 20; + + private static final byte APPLICATION_DATA_CONTENT_TYPE = 23; + + private static final int SSLV3_RECORD_HEADER_SIZE = 5; // SSLv3 record header + + private static final int SSL20_HELLO_VERSION = 0x0002; + + private static final int MIN_VERSION = 0x0300; + + private static final int MAX_MAJOR_VERSION = 0x03; + + private static int getSSLPacketSize(final ByteBuffer buf) throws SSLException { + + /* + * SSLv2 length field is in bytes 0/1 + * SSLv3/TLS length field is in bytes 3/4 + */ + if (buf.remaining() < 5) return -1; + + final byte byte0; + final byte byte1; + final byte byte2; + final byte byte3; + final byte byte4; + + if (buf.hasArray()) { + final byte[] array = buf.array(); + int pos = buf.arrayOffset() + buf.position(); + byte0 = array[pos++]; + byte1 = array[pos++]; + byte2 = array[pos++]; + byte3 = array[pos++]; + byte4 = array[pos]; + } else { + int pos = buf.position(); + byte0 = buf.get(pos++); + byte1 = buf.get(pos++); + byte2 = buf.get(pos++); + byte3 = buf.get(pos++); + byte4 = buf.get(pos); + } + + int len; + + /* + * If we have already verified previous packets, we can + * ignore the verifications steps, and jump right to the + * determination. Otherwise, try one last hueristic to + * see if it's SSL/TLS. + */ + if (byte0 >= CHANGE_CIPHER_SPECT_CONTENT_TYPE && byte0 <= APPLICATION_DATA_CONTENT_TYPE) { + /* + * Last sanity check that it's not a wild record + */ + final byte major = byte1; + final byte minor = byte2; + final int v = (major << 8) | minor & 0xff; + + // Check if too old (currently not possible) + // or if the major version does not match. + // The actual version negotiation is in the handshaker classes + if ((v < MIN_VERSION) || (major > MAX_MAJOR_VERSION)) { + throw new SSLException("Unsupported record version major=" + major + " minor=" + minor); + } + + /* + * One of the SSLv3/TLS message types. + */ + len = ((byte3 & 0xff) << 8) + (byte4 & 0xff) + SSLV3_RECORD_HEADER_SIZE; + + } else { + /* + * Must be SSLv2 or something unknown. + * Check if it's short (2 bytes) or + * long (3) header. + * + * Internals can warn about unsupported SSLv2 + */ + boolean isShort = ((byte0 & 0x80) != 0); + + if (isShort && ((byte2 == 1) || byte2 == 4)) { + + final byte major = byte3; + final byte minor = byte4; + final int v = (major << 8) | minor & 0xff; + + // Check if too old (currently not possible) + // or if the major version does not match. + // The actual version negotiation is in the handshaker classes + if ((v < MIN_VERSION) || (major > MAX_MAJOR_VERSION)) { + // if it's not SSLv2, we're out of here. + if (v != SSL20_HELLO_VERSION) throw new SSLException("Unsupported record version major=" + major + " minor=" + minor); + + } + + /* + * Client or Server Hello + */ + int mask = 0x7f; + len = ((byte0 & mask) << 8) + (byte1 & 0xff) + (2); + } else { + // Gobblygook! + throw new SSLException("Unrecognized SSL message, plaintext connection?"); + } + } + + return len; + } } diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index 9586183e6..e187ca69a 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -68,6 +68,10 @@ public final class Transport { bufferPool.offer(buffer); } + public void offerBuffer(ByteBuffer... buffers) { + for (ByteBuffer buffer : buffers) offerBuffer(buffer); + } + public AsyncConnection pollConnection() { SocketAddress addr = remoteAddres[0]; try { diff --git a/src/com/wentch/redkale/net/http/HttpPrepareServlet.java b/src/com/wentch/redkale/net/http/HttpPrepareServlet.java index f7641d59e..e060383cf 100644 --- a/src/com/wentch/redkale/net/http/HttpPrepareServlet.java +++ b/src/com/wentch/redkale/net/http/HttpPrepareServlet.java @@ -6,8 +6,8 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.*; -import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.util.*; +import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.watch.*; import java.io.*; import java.nio.*; @@ -107,7 +107,7 @@ public final class HttpPrepareServlet extends PrepareServlet { private final DefaultAnyValue header = new DefaultAnyValue(); + private final String[][] defaultAddHeaders; + + private final String[][] defaultSetHeaders; + + private final HttpCookie defcookie; + public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).recycle()); } - protected HttpResponse(HttpContext context, HttpRequest request) { + protected HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders, HttpCookie defcookie) { super(context, request); + this.defaultAddHeaders = defaultAddHeaders; + this.defaultSetHeaders = defaultSetHeaders; + this.defcookie = defcookie; } @Override @@ -204,6 +213,50 @@ public final class HttpResponse extends Response { super.finish(buffer404.duplicate()); } + @Override + public void finish(ByteBuffer buffer) { + finish(false, buffer); + } + + @Override + public void finish(boolean kill, ByteBuffer buffer) { + if (!this.headsended) { + ByteBuffer headbuf = createHeader(); + headbuf.flip(); + if (buffer == null) { + super.finish(kill, headbuf); + } else { + super.finish(kill, new ByteBuffer[]{headbuf, buffer}); + } + } else { + super.finish(kill, buffer); + } + } + + @Override + public void finish(ByteBuffer... buffers) { + finish(false, buffers); + } + + @Override + public void finish(boolean kill, ByteBuffer... buffers) { + if (kill) refuseAlive(); + if (!this.headsended) { + ByteBuffer headbuf = createHeader(); + headbuf.flip(); + if (buffers == null) { + super.finish(kill, headbuf); + } else { + ByteBuffer[] newbuffers = new ByteBuffer[buffers.length + 1]; + newbuffers[0] = headbuf; + System.arraycopy(buffers, 0, newbuffers, 1, buffers.length); + super.finish(kill, newbuffers); + } + } else { + super.finish(kill, buffers); + } + } + public void sendBody(ByteBuffer buffer, A attachment, CompletionHandler handler) { if (!this.headsended) { ByteBuffer headbuf = createHeader(); @@ -211,7 +264,7 @@ public final class HttpResponse extends Response { if (buffer == null) { super.send(headbuf, attachment, handler); } else { - super.send(new ByteBuffer[]{headbuf, headbuf}, attachment, handler); + super.send(new ByteBuffer[]{headbuf, buffer}, attachment, handler); } } else { super.send(buffer, attachment, handler); @@ -284,19 +337,50 @@ public final class HttpResponse extends Response { if (!this.request.isKeepAlive()) { buffer.put("Connection: close\r\n".getBytes()); } + if (this.defaultAddHeaders != null) { + for (String[] headers : this.defaultAddHeaders) { + if (headers.length > 2) { + String v = request.getHeader(headers[2]); + if (v != null) this.header.addValue(headers[0], v); + } else { + this.header.addValue(headers[0], headers[1]); + } + } + } + if (this.defaultSetHeaders != null) { + for (String[] headers : this.defaultSetHeaders) { + if (headers.length > 2) { + this.header.setValue(headers[0], request.getHeader(headers[2])); + } else { + this.header.setValue(headers[0], headers[1]); + } + } + } for (Entry en : this.header.getStringEntrys()) { buffer.put((en.name + ": " + en.getValue() + "\r\n").getBytes()); } if (request.newsessionid != null) { - if (request.newsessionid.isEmpty()) { - buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=; path=/; Max-Age=0; HttpOnly\r\n").getBytes()); + String domain = defcookie == null ? null : defcookie.getDomain(); + if (domain == null) { + domain = ""; } else { - buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=" + request.newsessionid + "; path=/; HttpOnly\r\n").getBytes()); + domain = "Domain=" + domain + "; "; + } + String path = defcookie == null ? null : defcookie.getPath(); + if (path == null) path = "/"; + if (request.newsessionid.isEmpty()) { + buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=; " + domain + "Path=" + path + "; Max-Age=0; HttpOnly\r\n").getBytes()); + } else { + buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=" + request.newsessionid + "; " + domain + "Path=" + path + "; HttpOnly\r\n").getBytes()); } } if (this.cookies != null) { for (HttpCookie cookie : this.cookies) { if (cookie == null) continue; + if (defcookie != null) { + if (defcookie.getDomain() != null && cookie.getDomain() == null) cookie.setDomain(defcookie.getDomain()); + if (defcookie.getPath() != null && cookie.getPath() == null) cookie.setPath(defcookie.getPath()); + } buffer.put(("Set-Cookie: " + genString(cookie) + "\r\n").getBytes()); } } @@ -307,8 +391,8 @@ public final class HttpResponse extends Response { private CharSequence genString(HttpCookie cookie) { StringBuilder sb = new StringBuilder(); sb.append(cookie.getName()).append("=\"").append(cookie.getValue()).append('"').append("; Version=1"); - if (cookie.getPath() != null) sb.append("; Path=").append(cookie.getPath()); if (cookie.getDomain() != null) sb.append("; Domain=").append(cookie.getDomain()); + if (cookie.getPath() != null) sb.append("; Path=").append(cookie.getPath()); if (cookie.getPortlist() != null) sb.append("; Port=").append(cookie.getPortlist()); if (cookie.getMaxAge() > 0) { sb.append("; Max-Age=").append(cookie.getMaxAge()); diff --git a/src/com/wentch/redkale/net/http/HttpServer.java b/src/com/wentch/redkale/net/http/HttpServer.java index 7260bbbcf..6d4bf97e1 100644 --- a/src/com/wentch/redkale/net/http/HttpServer.java +++ b/src/com/wentch/redkale/net/http/HttpServer.java @@ -8,6 +8,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.*; import com.wentch.redkale.util.*; import com.wentch.redkale.watch.*; +import java.net.*; import java.nio.*; import java.util.*; import java.util.AbstractMap.SimpleEntry; @@ -60,12 +61,62 @@ public final class HttpServer extends Server { prepare.addHttpServlet(en.getKey().getKey(), en.getKey().getValue(), en.getValue()); }); this.servlets.clear(); + String[][] defaultAddHeaders = null; + String[][] defaultSetHeaders = null; + HttpCookie defaultCookie = null; + if (config != null) { + AnyValue resps = config == null ? null : config.getAnyValue("response"); + if (resps != null) { + AnyValue[] addHeaders = resps.getAnyValues("addheader"); + if (addHeaders.length > 0) { + defaultAddHeaders = new String[addHeaders.length][]; + for (int i = 0; i < addHeaders.length; i++) { + String val = addHeaders[i].getValue("value"); + if (val == null) continue; + if (val.startsWith("request.headers.")) { + defaultAddHeaders[i] = new String[]{addHeaders[i].getValue("name"), val, val.substring("request.headers.".length())}; + } else if (val.startsWith("system.property.")) { + String v = System.getProperty(val.substring("system.property.".length())); + if (v != null) defaultAddHeaders[i] = new String[]{addHeaders[i].getValue("name"), v}; + } else { + defaultAddHeaders[i] = new String[]{addHeaders[i].getValue("name"), val}; + } + } + } + AnyValue[] setHeaders = resps.getAnyValues("setheader"); + if (setHeaders.length > 0) { + defaultSetHeaders = new String[setHeaders.length][]; + for (int i = 0; i < setHeaders.length; i++) { + String val = setHeaders[i].getValue("value"); + if (val != null && val.startsWith("request.headers.")) { + defaultSetHeaders[i] = new String[]{setHeaders[i].getValue("name"), val, val.substring("request.headers.".length())}; + } else { + defaultSetHeaders[i] = new String[]{setHeaders[i].getValue("name"), val}; + } + } + } + AnyValue defcookieValue = resps.getAnyValue("defcookie"); + if (defcookieValue != null) { + String domain = defcookieValue.getValue("domain"); + String path = defcookieValue.getValue("path"); + if (domain != null || path != null) { + defaultCookie = new HttpCookie("DEFAULTCOOKIE", ""); + defaultCookie.setDomain(domain); + defaultCookie.setPath(path); + } + } + } + } + final String[][] addHeaders = defaultAddHeaders; + final String[][] setHeaders = defaultSetHeaders; + final HttpCookie defCookie = defaultCookie; AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); ObjectPool responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath); - responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory))); + responsePool.setCreator((Object... params) + -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory), addHeaders, setHeaders, defCookie)); return httpcontext; } diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index cc368ab7b..7434823a6 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -48,7 +48,7 @@ public final class SncpClient { this.resultTypes = rt == void.class ? null : rt; this.paramTypes = method.getGenericParameterTypes(); this.method = method; - this.async = method.getReturnType() == void.class && method.getAnnotation(Async.class) != null; + this.async = false;// method.getReturnType() == void.class && method.getAnnotation(Async.class) != null; } @Override diff --git a/src/com/wentch/redkale/service/DataCacheListenerService.java b/src/com/wentch/redkale/service/DataCacheListenerService.java index 4840b61ab..f09a384cd 100644 --- a/src/com/wentch/redkale/service/DataCacheListenerService.java +++ b/src/com/wentch/redkale/service/DataCacheListenerService.java @@ -31,24 +31,37 @@ public class DataCacheListenerService implements DataCacheListener, Service { private final ConcurrentHashMap>> deleteQueues = new ConcurrentHashMap<>(); - private boolean finer; + private final boolean finest = logger.isLoggable(Level.FINEST); + + ; @Resource(name = "APP_NODE") private String localNodeName = ""; - @Resource(name = ".*") - HashMap sourcemap; + @Resource(name = "APP_GROUP") + private String localGroupName = ""; + + @Resource(name = "APP_GROUP") + private Map> groups; @Resource(name = ".*") - HashMap nodemap; + private HashMap sourcesmap; + + @Resource(name = ".*") + private HashMap nodesmap; @Override public void init(AnyValue config) { - finer = logger.isLoggable(Level.FINER); + if (finest) { + logger.finest(this.getClass().getSimpleName() + "-localgroup: " + localGroupName); + logger.finest(this.getClass().getSimpleName() + "-groups: " + groups); + logger.finest(this.getClass().getSimpleName() + "-sources: " + sourcesmap); + } } @Override public void insert(String sourceName, Class clazz, T... entitys) { + if (finest) logger.finest("(source:" + sourceName + ") insert " + clazz + " --> " + Arrays.toString(entitys)); BlockingQueue> queue = this.insertQueues.get(sourceName); if (queue == null) { synchronized (this.insertQueues) { @@ -68,7 +81,7 @@ public class DataCacheListenerService implements DataCacheListener, Service { while (true) { try { Map.Entry entry = tq.take(); - sendInsert(sourceName, entry.getKey(), entry.getValue()); + sendInsert(localGroupName, false, sourceName, entry.getKey(), entry.getValue()); } catch (Exception e) { logger.log(Level.SEVERE, this.getName() + " sendInsert occur error", e); } @@ -87,24 +100,48 @@ public class DataCacheListenerService implements DataCacheListener, Service { } @RemoteOn - public void sendInsert(String sourceName, Class clazz, T... entitys) { - if (nodemap == null) return; - nodemap.forEach((x, y) -> { - try { - y.sendInsert(sourceName, clazz, entitys); - } catch (Exception e) { - logger.log(Level.FINE, this.getClass().getSimpleName() + " send insert error (" + x + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e); + public void sendInsert(String group, boolean ignoreRemote, String sourceName, Class clazz, T... entitys) { + if (nodesmap == null || groups == null) return; + if (ignoreRemote && finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onGroupSendInsert " + Arrays.toString(entitys)); + for (Map.Entry> en : groups.entrySet()) { + if (group != null && group.equals(en.getKey())) { //同机房 + for (String onode : en.getValue()) { + if (onode.equals(localNodeName)) continue; + DataCacheListenerService service = nodesmap.get(onode); + if (service != null) { + try { + service.sendInsert(group, false, sourceName, clazz, entitys); + } catch (Exception e) { + logger.log(Level.FINE, this.getClass().getSimpleName() + " send insert error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e); + } + } + } + if (ignoreRemote) break; + } else if (!ignoreRemote) { + for (String onode : en.getValue()) { + DataCacheListenerService service = nodesmap.get(onode); + if (service != null) { + try { + service.sendInsert(group, false, sourceName, clazz, entitys); + break; //有一个成功就退出 + } catch (Exception e) { + logger.log(Level.FINE, this.getClass().getSimpleName() + " send insert error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e); + } + } + } } - }); + } } - public final void onSendInsert(String sourceName, Class clazz, T... entitys) { - ((DataJDBCSource) sourcemap.get(sourceName)).insertCache(entitys); - if (finer) logger.finer(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendInsert " + Arrays.toString(entitys)); + public final void onSendInsert(String group, boolean ignoreRemote, String sourceName, Class clazz, T... entitys) { + if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendInsert " + Arrays.toString(entitys)); + ((DataJDBCSource) sourcesmap.get(sourceName)).insertCache(entitys); + if (!this.localGroupName.equals(group)) sendInsert(this.localGroupName, true, sourceName, clazz, entitys); //不是同一机房来的资源需要同步到其他同机房的节点上 } @Override public void update(String sourceName, Class clazz, T... values) { + if (finest) logger.finest("(source:" + sourceName + ") update " + clazz + " --> " + Arrays.toString(values)); BlockingQueue> queue = this.updateQueues.get(sourceName); if (queue == null) { synchronized (this.updateQueues) { @@ -124,7 +161,7 @@ public class DataCacheListenerService implements DataCacheListener, Service { while (true) { try { Map.Entry entry = tq.take(); - sendUpdate(sourceName, entry.getKey(), entry.getValue()); + sendUpdate(localGroupName, false, sourceName, entry.getKey(), entry.getValue()); } catch (Exception e) { logger.log(Level.SEVERE, this.getName() + " sendUpdate occur error", e); } @@ -143,24 +180,48 @@ public class DataCacheListenerService implements DataCacheListener, Service { } @RemoteOn - public void sendUpdate(String sourceName, Class clazz, Object... values) { - if (nodemap == null) return; - nodemap.forEach((x, y) -> { - try { - y.sendUpdate(sourceName, clazz, values); - } catch (Exception e) { - logger.log(Level.FINE, this.getClass().getSimpleName() + " send update error (" + x + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(values) + ")", e); + public void sendUpdate(String group, boolean ignoreRemote, String sourceName, Class clazz, T... entitys) { + if (nodesmap == null || groups == null) return; + if (ignoreRemote && finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onGroupSendUpdate " + Arrays.toString(entitys)); + for (Map.Entry> en : groups.entrySet()) { + if (group != null && group.equals(en.getKey())) { //同机房 + for (String onode : en.getValue()) { + if (onode.equals(localNodeName)) continue; + DataCacheListenerService service = nodesmap.get(onode); + if (service != null) { + try { + service.sendUpdate(group, false, sourceName, clazz, entitys); + } catch (Exception e) { + logger.log(Level.FINE, this.getClass().getSimpleName() + " send update error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e); + } + } + } + if (ignoreRemote) break; + } else if (!ignoreRemote) { + for (String onode : en.getValue()) { + DataCacheListenerService service = nodesmap.get(onode); + if (service != null) { + try { + service.sendUpdate(group, false, sourceName, clazz, entitys); + break; //有一个成功就退出 + } catch (Exception e) { + logger.log(Level.FINE, this.getClass().getSimpleName() + " send update error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e); + } + } + } } - }); + } } - public final void onSendUpdate(String sourceName, Class clazz, T... entitys) { - ((DataJDBCSource) sourcemap.get(sourceName)).updateCache(clazz, entitys); - if (finer) logger.finer(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendUpdate " + Arrays.toString(entitys)); + public final void onSendUpdate(String group, boolean ignoreRemote, String sourceName, Class clazz, T... entitys) { + if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onSendUpdate " + Arrays.toString(entitys)); + ((DataJDBCSource) sourcesmap.get(sourceName)).updateCache(clazz, entitys); + if (!this.localGroupName.equals(group)) sendUpdate(this.localGroupName, true, sourceName, clazz, entitys); //不是同一机房来的资源需要同步到其他同机房的节点上 } @Override public void delete(String sourceName, Class clazz, Serializable... ids) { + if (finest) logger.finest("(source:" + sourceName + ") delete " + clazz + " --> " + Arrays.toString(ids)); BlockingQueue> queue = this.deleteQueues.get(sourceName); if (queue == null) { synchronized (this.deleteQueues) { @@ -180,7 +241,7 @@ public class DataCacheListenerService implements DataCacheListener, Service { while (true) { try { Map.Entry entry = tq.take(); - sendDelete(sourceName, entry.getKey(), entry.getValue()); + sendDelete(localGroupName, false, sourceName, entry.getKey(), entry.getValue()); } catch (Exception e) { logger.log(Level.SEVERE, this.getName() + " sendDelete occur error", e); } @@ -199,19 +260,42 @@ public class DataCacheListenerService implements DataCacheListener, Service { } @RemoteOn - public void sendDelete(String sourceName, Class clazz, Serializable... ids) { - if (nodemap == null) return; - nodemap.forEach((x, y) -> { - try { - y.sendDelete(sourceName, clazz, ids); - } catch (Exception e) { - logger.log(Level.FINE, this.getClass().getSimpleName() + " send delete error (" + x + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(ids) + ")", e); + public void sendDelete(String group, boolean ignoreRemote, String sourceName, Class clazz, Serializable... ids) { + if (nodesmap == null || groups == null) return; + if (ignoreRemote && finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onGroupSendDelete " + Arrays.toString(ids)); + for (Map.Entry> en : groups.entrySet()) { + if (group != null && group.equals(en.getKey())) { //同机房 + for (String onode : en.getValue()) { + if (onode.equals(localNodeName)) continue; + DataCacheListenerService service = nodesmap.get(onode); + if (service != null) { + try { + service.sendDelete(group, false, sourceName, clazz, ids); + } catch (Exception e) { + logger.log(Level.FINE, this.getClass().getSimpleName() + " send delete error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(ids) + ")", e); + } + } + } + if (ignoreRemote) break; + } else if (!ignoreRemote) { + for (String onode : en.getValue()) { + DataCacheListenerService service = nodesmap.get(onode); + if (service != null) { + try { + service.sendDelete(group, false, sourceName, clazz, ids); + break; //有一个成功就退出 + } catch (Exception e) { + logger.log(Level.FINE, this.getClass().getSimpleName() + " send delete error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(ids) + ")", e); + } + } + } } - }); + } } - public final void onSendDelete(String sourceName, Class clazz, Serializable... ids) { - ((DataJDBCSource) sourcemap.get(sourceName)).deleteCache(clazz, ids); - if (finer) logger.finer(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendDelete " + clazz.getName() + " " + Arrays.toString(ids)); + public final void onSendDelete(String group, boolean ignoreRemote, String sourceName, Class clazz, Serializable... ids) { + if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onSendDelete " + clazz.getName() + " " + Arrays.toString(ids)); + ((DataJDBCSource) sourcesmap.get(sourceName)).deleteCache(clazz, ids); + if (!this.localGroupName.equals(group)) sendDelete(this.localGroupName, true, sourceName, clazz, ids); //不是同一机房来的资源需要同步到其他同机房的节点上 } } diff --git a/src/com/wentch/redkale/service/Service.java b/src/com/wentch/redkale/service/Service.java index b2ac63afa..31f8fa860 100644 --- a/src/com/wentch/redkale/service/Service.java +++ b/src/com/wentch/redkale/service/Service.java @@ -5,10 +5,14 @@ */ package com.wentch.redkale.service; -import com.wentch.redkale.util.AnyValue; +import com.wentch.redkale.util.*; /** * 所有Service的实现类不得声明为final, 允许远程模式的public方法不能声明为final。 + * + * @Resource(name = ".*") + * private HashMap nodemap; + * 被注入的多个XXXService实例 但不会包含自身的XXXService。 * * @author zhangjx */ diff --git a/src/com/wentch/redkale/source/DataJDBCSource.java b/src/com/wentch/redkale/source/DataJDBCSource.java index 9ebdffd72..49a918836 100644 --- a/src/com/wentch/redkale/source/DataJDBCSource.java +++ b/src/com/wentch/redkale/source/DataJDBCSource.java @@ -1666,7 +1666,7 @@ public final class DataJDBCSource implements DataSource { public void connectionErrorOccurred(ConnectionEvent event) { usingCounter.decrementAndGet(); if ("08S01".equals(event.getSQLException().getSQLState())) return; //MySQL特性, 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException - dataSource.logger.log(Level.WARNING, "connectionErronOccurred " + event.getSQLException().getSQLState(), event.getSQLException()); + dataSource.logger.log(Level.WARNING, "connectionErronOccurred [" + event.getSQLException().getSQLState() + "]", event.getSQLException()); } }; try { @@ -1797,7 +1797,7 @@ public final class DataJDBCSource implements DataSource { } } catch (SQLException ex) { if (!"08S01".equals(ex.getSQLState())) {//MySQL特性, 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException - dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort " + ex.getSQLState(), ex); + dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort [" + ex.getSQLState() + "]", ex); } return poll(0, null); } diff --git a/src/com/wentch/redkale/util/ResourceFactory.java b/src/com/wentch/redkale/util/ResourceFactory.java index 7c0f64eb2..df015e927 100644 --- a/src/com/wentch/redkale/util/ResourceFactory.java +++ b/src/com/wentch/redkale/util/ResourceFactory.java @@ -7,10 +7,10 @@ package com.wentch.redkale.util; import java.lang.reflect.*; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.logging.*; -import java.util.regex.Pattern; -import javax.annotation.Resource; +import java.util.regex.*; +import javax.annotation.*; /** * @@ -29,6 +29,8 @@ public final class ResourceFactory { private final ConcurrentHashMap, ConcurrentHashMap> store = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> gencstore = new ConcurrentHashMap<>(); + private ResourceFactory(ResourceFactory parent) { this.parent = parent; } @@ -73,10 +75,35 @@ public final class ResourceFactory { } } + public void register(final String name, final Type clazz, final A rs) { + if (clazz instanceof Class) { + register(name, (Class) clazz, rs); + return; + } + ConcurrentHashMap map = this.gencstore.get(clazz); + if (map == null) { + ConcurrentHashMap sub = new ConcurrentHashMap<>(); + sub.put(name, rs); + gencstore.put(clazz, sub); + } else { + map.put(name, rs); + } + } + public A find(Class clazz) { return find("", clazz); } + public A find(String name, Type clazz) { + Map map = this.gencstore.get(clazz); + if (map != null) { + A rs = (A) map.get(name); + if (rs != null) return rs; + } + if (parent != null) return parent.find(name, clazz); + return null; + } + public A find(String name, Class clazz) { Map map = this.store.get(clazz); if (map != null) { @@ -125,7 +152,8 @@ public final class ResourceFactory { for (Field field : clazz.getDeclaredFields()) { if (Modifier.isStatic(field.getModifiers())) continue; field.setAccessible(true); - final Class type = field.getType(); + final Class classtype = field.getType(); + final Type genctype = field.getGenericType(); Resource rc = field.getAnnotation(Resource.class); if (rc == null) { boolean flag = true; @@ -142,14 +170,16 @@ public final class ResourceFactory { continue; } if (Modifier.isFinal(field.getModifiers())) continue; - Object rs; - if (Map.class.isAssignableFrom(type)) { - rs = find(Pattern.compile(rc.name().isEmpty() ? ".+" : rc.name()), (Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1], src); - } else { - if (rc.name().startsWith("property.")) { - rs = find(rc.name(), String.class); + Object rs = genctype == classtype ? null : find(rc.name(), genctype); + if (rs == null) { + if (Map.class.isAssignableFrom(classtype)) { + rs = find(Pattern.compile(rc.name().isEmpty() ? ".+" : rc.name()), (Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1], src); } else { - rs = find(rc.name(), type); + if (rc.name().startsWith("property.")) { + rs = find(rc.name(), String.class); + } else { + rs = find(rc.name(), classtype); + } } } if (rs == null) { @@ -157,20 +187,20 @@ public final class ResourceFactory { if (it != null) it.invoke(this, src, field); continue; } - if (!rs.getClass().isPrimitive() && type.isPrimitive()) { - if (type == int.class) { + if (!rs.getClass().isPrimitive() && classtype.isPrimitive()) { + if (classtype == int.class) { rs = Integer.decode(rs.toString()); - } else if (type == long.class) { + } else if (classtype == long.class) { rs = Long.decode(rs.toString()); - } else if (type == short.class) { + } else if (classtype == short.class) { rs = Short.decode(rs.toString()); - } else if (type == boolean.class) { + } else if (classtype == boolean.class) { rs = "true".equalsIgnoreCase(rs.toString()); - } else if (type == byte.class) { + } else if (classtype == byte.class) { rs = Byte.decode(rs.toString()); - } else if (type == float.class) { + } else if (classtype == float.class) { rs = Float.parseFloat(rs.toString()); - } else if (type == double.class) { + } else if (classtype == double.class) { rs = Double.parseDouble(rs.toString()); } }