This commit is contained in:
地平线
2015-03-27 09:58:52 +08:00
parent e68f2170c9
commit 2ca814fab2
13 changed files with 516 additions and 81 deletions

View File

@@ -23,6 +23,7 @@ import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.*;
import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
@@ -31,7 +32,7 @@ import org.w3c.dom.*;
/**
* 编译时需要加入: -XDignore.symbol.file=true
*
* <p>
* 进程启动类程序启动后读取application.xml,进行classpath扫描动态加载Service与Servlet
* 再进行Service、Servlet与其他资源之间的依赖注入。
*
@@ -39,14 +40,28 @@ import org.w3c.dom.*;
*/
public final class Application {
//进程启动的时间, 类型: long
public static final String RESNAME_TIME = "APP_TIME";
//本地进程的根目录, 类型String
public static final String RESNAME_HOME = "APP_HOME";
//本地节点的名称, 类型String
public static final String RESNAME_NODE = "APP_NODE";
//本地节点的所属组, 类型String、Map<String, Set<String>>、Map<String, List<SimpleEntry<String, InetSocketAddress[]>>>
public static final String RESNAME_GROUP = "APP_GROUP";
//本地节点的所属组所有节点名, 类型Set<String> 、List<SimpleEntry<String, InetSocketAddress[]>>包含自身节点名
public static final String RESNAME_INGROUP = "APP_INGROUP";
//除本地节点的所属组外其他所有组的所有节点名, 类型Map<String, Set<String>>、Map<String, List<SimpleEntry<String, InetSocketAddress[]>>>
public static final String RESNAME_OUTGROUP = "APP_OUTGROUP";
//本地节点的IP地址 类型InetAddress、String
public static final String RESNAME_ADDR = "APP_ADDR";
//application.xml 文件中resources节点的内容 类型: AnyValue
public static final String RESNAME_GRES = "APP_GRES";
protected final ResourceFactory factory = ResourceFactory.root();
@@ -61,6 +76,8 @@ public final class Application {
protected final InetAddress localAddress = Utility.localInetAddress();
protected String nodeGroup = "";
protected String nodeName = "";
//--------------------------------------------------------------------------------------------
@@ -92,7 +109,12 @@ public final class Application {
}
final File logconf = new File(root, "conf/logging.properties");
this.nodeName = config.getValue("node", "");
this.nodeGroup = config.getValue("group", "");
this.factory.register(RESNAME_NODE, this.nodeName);
this.factory.register(RESNAME_GROUP, this.nodeGroup);
System.setProperty(RESNAME_NODE, this.nodeName);
System.setProperty(RESNAME_GROUP, this.nodeGroup);
this.factory.register(RESNAME_ADDR, this.localAddress.getHostAddress());
this.factory.register(RESNAME_ADDR, InetAddress.class, this.localAddress);
if (logconf.isFile() && logconf.canRead()) {
@@ -415,14 +437,25 @@ public final class Application {
//------------------------------------------------------------------------
final String host = this.localAddress.getHostAddress();
final Map<String, Set<String>> groups = new HashMap<>();
final Map<String, List<SimpleEntry<String, InetSocketAddress[]>>> groups2 = new HashMap<>();
for (AnyValue conf : resources.getAnyValues("remote")) {
final String name = conf.getValue("name");
final String group = conf.getValue("group", "");
if (name == null) throw new RuntimeException("remote name is null");
String protocol = conf.getValue("protocol", "UDP").toUpperCase();
if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) {
throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol"));
}
{
Set<String> set = groups.get(group);
if (set == null) {
set = new HashSet<>();
groups.put(group, set);
}
set.add(name);
}
AnyValue[] addrs = conf.getAnyValues("address");
InetSocketAddress[] addresses = new InetSocketAddress[addrs.length];
int i = -1;
@@ -430,17 +463,51 @@ public final class Application {
addresses[++i] = new InetSocketAddress(addr.getValue("addr"), addr.getIntValue("port"));
}
if (addresses.length < 1) throw new RuntimeException("Transport(" + name + ") have no address ");
Transport transport = new Transport(name, protocol, watch, 100, addresses[0]);
{
List<SimpleEntry<String, InetSocketAddress[]>> list = groups2.get(group);
if (list == null) {
list = new ArrayList<>();
groups2.put(group, list);
}
list.add(new SimpleEntry<>(name, addresses));
}
Transport transport = new Transport(name, protocol, watch, 100, addresses);
factory.register(name, Transport.class, transport);
if (this.nodeName.isEmpty() && host.equals(addrs[0].getValue("addr"))) {
this.nodeName = name;
this.nodeGroup = group;
this.factory.register(RESNAME_NODE, this.nodeName);
this.factory.register(RESNAME_GROUP, this.nodeGroup);
System.setProperty(RESNAME_NODE, this.nodeName);
System.setProperty(RESNAME_GROUP, this.nodeGroup);
}
}
this.factory.register(RESNAME_GROUP, new TypeToken<Map<String, Set<String>>>() {
}.getType(), groups);
this.factory.register(RESNAME_GROUP, new TypeToken<Map<String, List<SimpleEntry<String, InetSocketAddress[]>>>>() {
}.getType(), groups2);
final Map<String, List<SimpleEntry<String, InetSocketAddress[]>>> outgroups2 = new HashMap<>();
final Map<String, Set<String>> outgroups = new HashMap<>();
groups.entrySet().stream().filter(x -> !x.getKey().equals(nodeName)).forEach(x -> outgroups.put(x.getKey(), x.getValue()));
groups2.entrySet().stream().filter(x -> !x.getKey().equals(nodeName)).forEach(x -> outgroups2.put(x.getKey(), x.getValue()));
this.factory.register(RESNAME_OUTGROUP, new TypeToken<Map<String, Set<String>>>() {
}.getType(), outgroups);
this.factory.register(RESNAME_OUTGROUP, new TypeToken<Map<String, List<SimpleEntry<String, InetSocketAddress[]>>>>() {
}.getType(), outgroups2);
Set<String> ingroup = groups.get(this.nodeGroup);
if (ingroup != null) this.factory.register(RESNAME_INGROUP, new TypeToken<Set<String>>() {
}.getType(), ingroup);
List<SimpleEntry<String, InetSocketAddress[]>> inengroup = groups2.get(this.nodeGroup);
if (inengroup != null) this.factory.register(RESNAME_INGROUP, new TypeToken<List<SimpleEntry<String, InetSocketAddress[]>>>() {
}.getType(), inengroup);
}
//------------------------------------------------------------------------
logger.info(RESNAME_NODE + "=" + this.nodeName);
logger.info(RESNAME_NODE + "=" + this.nodeName + "; " + RESNAME_GROUP + "=" + this.nodeGroup);
logger.info("datasource.nodeid=" + this.factory.find("property.datasource.nodeid", String.class));
}

View File

@@ -352,7 +352,7 @@ public final class AsyncDatagramChannel implements AsynchronousByteChannel, Mult
if (buffers[index].hasRemaining()) {
implSend(buffers[index], target, attachment, this);
} else if (index == max) {
handler.completed(resultSum, attachment);
if (handler != null) handler.completed(resultSum, attachment);
} else {
implSend(buffers[++index], target, attachment, this);
}
@@ -360,7 +360,7 @@ public final class AsyncDatagramChannel implements AsynchronousByteChannel, Mult
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
if (handler != null) handler.failed(exc, attachment);
}
});
}

View File

@@ -125,7 +125,7 @@ public abstract class Response<R extends Request> {
}
public void finish(ByteBuffer buffer) {
finish(false, buffer);
this.channel.write(buffer, buffer, finishHandler);
}
public void finish(boolean kill, ByteBuffer buffer) {
@@ -134,7 +134,7 @@ public abstract class Response<R extends Request> {
}
public void finish(ByteBuffer... buffers) {
finish(false, buffers);
this.channel.write(buffers, buffers, finishHandler2);
}
public void finish(boolean kill, ByteBuffer... buffers) {

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net;
import java.nio.*;
import java.security.*;
import javax.net.ssl.*;
@@ -35,4 +36,114 @@ public class SSLBuilder {
sslEngine.setNeedClientAuth(false);
}
private static final byte CHANGE_CIPHER_SPECT_CONTENT_TYPE = 20;
private static final byte APPLICATION_DATA_CONTENT_TYPE = 23;
private static final int SSLV3_RECORD_HEADER_SIZE = 5; // SSLv3 record header
private static final int SSL20_HELLO_VERSION = 0x0002;
private static final int MIN_VERSION = 0x0300;
private static final int MAX_MAJOR_VERSION = 0x03;
private static int getSSLPacketSize(final ByteBuffer buf) throws SSLException {
/*
* SSLv2 length field is in bytes 0/1
* SSLv3/TLS length field is in bytes 3/4
*/
if (buf.remaining() < 5) return -1;
final byte byte0;
final byte byte1;
final byte byte2;
final byte byte3;
final byte byte4;
if (buf.hasArray()) {
final byte[] array = buf.array();
int pos = buf.arrayOffset() + buf.position();
byte0 = array[pos++];
byte1 = array[pos++];
byte2 = array[pos++];
byte3 = array[pos++];
byte4 = array[pos];
} else {
int pos = buf.position();
byte0 = buf.get(pos++);
byte1 = buf.get(pos++);
byte2 = buf.get(pos++);
byte3 = buf.get(pos++);
byte4 = buf.get(pos);
}
int len;
/*
* If we have already verified previous packets, we can
* ignore the verifications steps, and jump right to the
* determination. Otherwise, try one last hueristic to
* see if it's SSL/TLS.
*/
if (byte0 >= CHANGE_CIPHER_SPECT_CONTENT_TYPE && byte0 <= APPLICATION_DATA_CONTENT_TYPE) {
/*
* Last sanity check that it's not a wild record
*/
final byte major = byte1;
final byte minor = byte2;
final int v = (major << 8) | minor & 0xff;
// Check if too old (currently not possible)
// or if the major version does not match.
// The actual version negotiation is in the handshaker classes
if ((v < MIN_VERSION) || (major > MAX_MAJOR_VERSION)) {
throw new SSLException("Unsupported record version major=" + major + " minor=" + minor);
}
/*
* One of the SSLv3/TLS message types.
*/
len = ((byte3 & 0xff) << 8) + (byte4 & 0xff) + SSLV3_RECORD_HEADER_SIZE;
} else {
/*
* Must be SSLv2 or something unknown.
* Check if it's short (2 bytes) or
* long (3) header.
*
* Internals can warn about unsupported SSLv2
*/
boolean isShort = ((byte0 & 0x80) != 0);
if (isShort && ((byte2 == 1) || byte2 == 4)) {
final byte major = byte3;
final byte minor = byte4;
final int v = (major << 8) | minor & 0xff;
// Check if too old (currently not possible)
// or if the major version does not match.
// The actual version negotiation is in the handshaker classes
if ((v < MIN_VERSION) || (major > MAX_MAJOR_VERSION)) {
// if it's not SSLv2, we're out of here.
if (v != SSL20_HELLO_VERSION) throw new SSLException("Unsupported record version major=" + major + " minor=" + minor);
}
/*
* Client or Server Hello
*/
int mask = 0x7f;
len = ((byte0 & mask) << 8) + (byte1 & 0xff) + (2);
} else {
// Gobblygook!
throw new SSLException("Unrecognized SSL message, plaintext connection?");
}
}
return len;
}
}

View File

@@ -68,6 +68,10 @@ public final class Transport {
bufferPool.offer(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
for (ByteBuffer buffer : buffers) offerBuffer(buffer);
}
public AsyncConnection pollConnection() {
SocketAddress addr = remoteAddres[0];
try {

View File

@@ -6,8 +6,8 @@
package com.wentch.redkale.net.http;
import com.wentch.redkale.net.*;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import com.wentch.redkale.util.*;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import com.wentch.redkale.watch.*;
import java.io.*;
import java.nio.*;
@@ -107,7 +107,7 @@ public final class HttpPrepareServlet extends PrepareServlet<HttpRequest, HttpRe
servlet.execute(request, response);
} catch (Exception e) {
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel ", e);
response.finish(505, null);
response.finish(500, null);
}
}

View File

@@ -99,12 +99,21 @@ public final class HttpResponse extends Response<HttpRequest> {
private final DefaultAnyValue header = new DefaultAnyValue();
private final String[][] defaultAddHeaders;
private final String[][] defaultSetHeaders;
private final HttpCookie defcookie;
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).recycle());
}
protected HttpResponse(HttpContext context, HttpRequest request) {
protected HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders, HttpCookie defcookie) {
super(context, request);
this.defaultAddHeaders = defaultAddHeaders;
this.defaultSetHeaders = defaultSetHeaders;
this.defcookie = defcookie;
}
@Override
@@ -204,6 +213,50 @@ public final class HttpResponse extends Response<HttpRequest> {
super.finish(buffer404.duplicate());
}
@Override
public void finish(ByteBuffer buffer) {
finish(false, buffer);
}
@Override
public void finish(boolean kill, ByteBuffer buffer) {
if (!this.headsended) {
ByteBuffer headbuf = createHeader();
headbuf.flip();
if (buffer == null) {
super.finish(kill, headbuf);
} else {
super.finish(kill, new ByteBuffer[]{headbuf, buffer});
}
} else {
super.finish(kill, buffer);
}
}
@Override
public void finish(ByteBuffer... buffers) {
finish(false, buffers);
}
@Override
public void finish(boolean kill, ByteBuffer... buffers) {
if (kill) refuseAlive();
if (!this.headsended) {
ByteBuffer headbuf = createHeader();
headbuf.flip();
if (buffers == null) {
super.finish(kill, headbuf);
} else {
ByteBuffer[] newbuffers = new ByteBuffer[buffers.length + 1];
newbuffers[0] = headbuf;
System.arraycopy(buffers, 0, newbuffers, 1, buffers.length);
super.finish(kill, newbuffers);
}
} else {
super.finish(kill, buffers);
}
}
public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) {
ByteBuffer headbuf = createHeader();
@@ -211,7 +264,7 @@ public final class HttpResponse extends Response<HttpRequest> {
if (buffer == null) {
super.send(headbuf, attachment, handler);
} else {
super.send(new ByteBuffer[]{headbuf, headbuf}, attachment, handler);
super.send(new ByteBuffer[]{headbuf, buffer}, attachment, handler);
}
} else {
super.send(buffer, attachment, handler);
@@ -284,19 +337,50 @@ public final class HttpResponse extends Response<HttpRequest> {
if (!this.request.isKeepAlive()) {
buffer.put("Connection: close\r\n".getBytes());
}
if (this.defaultAddHeaders != null) {
for (String[] headers : this.defaultAddHeaders) {
if (headers.length > 2) {
String v = request.getHeader(headers[2]);
if (v != null) this.header.addValue(headers[0], v);
} else {
this.header.addValue(headers[0], headers[1]);
}
}
}
if (this.defaultSetHeaders != null) {
for (String[] headers : this.defaultSetHeaders) {
if (headers.length > 2) {
this.header.setValue(headers[0], request.getHeader(headers[2]));
} else {
this.header.setValue(headers[0], headers[1]);
}
}
}
for (Entry<String> en : this.header.getStringEntrys()) {
buffer.put((en.name + ": " + en.getValue() + "\r\n").getBytes());
}
if (request.newsessionid != null) {
if (request.newsessionid.isEmpty()) {
buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=; path=/; Max-Age=0; HttpOnly\r\n").getBytes());
String domain = defcookie == null ? null : defcookie.getDomain();
if (domain == null) {
domain = "";
} else {
buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=" + request.newsessionid + "; path=/; HttpOnly\r\n").getBytes());
domain = "Domain=" + domain + "; ";
}
String path = defcookie == null ? null : defcookie.getPath();
if (path == null) path = "/";
if (request.newsessionid.isEmpty()) {
buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=; " + domain + "Path=" + path + "; Max-Age=0; HttpOnly\r\n").getBytes());
} else {
buffer.put(("Set-Cookie: " + HttpRequest.SESSIONID_NAME + "=" + request.newsessionid + "; " + domain + "Path=" + path + "; HttpOnly\r\n").getBytes());
}
}
if (this.cookies != null) {
for (HttpCookie cookie : this.cookies) {
if (cookie == null) continue;
if (defcookie != null) {
if (defcookie.getDomain() != null && cookie.getDomain() == null) cookie.setDomain(defcookie.getDomain());
if (defcookie.getPath() != null && cookie.getPath() == null) cookie.setPath(defcookie.getPath());
}
buffer.put(("Set-Cookie: " + genString(cookie) + "\r\n").getBytes());
}
}
@@ -307,8 +391,8 @@ public final class HttpResponse extends Response<HttpRequest> {
private CharSequence genString(HttpCookie cookie) {
StringBuilder sb = new StringBuilder();
sb.append(cookie.getName()).append("=\"").append(cookie.getValue()).append('"').append("; Version=1");
if (cookie.getPath() != null) sb.append("; Path=").append(cookie.getPath());
if (cookie.getDomain() != null) sb.append("; Domain=").append(cookie.getDomain());
if (cookie.getPath() != null) sb.append("; Path=").append(cookie.getPath());
if (cookie.getPortlist() != null) sb.append("; Port=").append(cookie.getPortlist());
if (cookie.getMaxAge() > 0) {
sb.append("; Max-Age=").append(cookie.getMaxAge());

View File

@@ -8,6 +8,7 @@ package com.wentch.redkale.net.http;
import com.wentch.redkale.net.*;
import com.wentch.redkale.util.*;
import com.wentch.redkale.watch.*;
import java.net.*;
import java.nio.*;
import java.util.*;
import java.util.AbstractMap.SimpleEntry;
@@ -60,12 +61,62 @@ public final class HttpServer extends Server {
prepare.addHttpServlet(en.getKey().getKey(), en.getKey().getValue(), en.getValue());
});
this.servlets.clear();
String[][] defaultAddHeaders = null;
String[][] defaultSetHeaders = null;
HttpCookie defaultCookie = null;
if (config != null) {
AnyValue resps = config == null ? null : config.getAnyValue("response");
if (resps != null) {
AnyValue[] addHeaders = resps.getAnyValues("addheader");
if (addHeaders.length > 0) {
defaultAddHeaders = new String[addHeaders.length][];
for (int i = 0; i < addHeaders.length; i++) {
String val = addHeaders[i].getValue("value");
if (val == null) continue;
if (val.startsWith("request.headers.")) {
defaultAddHeaders[i] = new String[]{addHeaders[i].getValue("name"), val, val.substring("request.headers.".length())};
} else if (val.startsWith("system.property.")) {
String v = System.getProperty(val.substring("system.property.".length()));
if (v != null) defaultAddHeaders[i] = new String[]{addHeaders[i].getValue("name"), v};
} else {
defaultAddHeaders[i] = new String[]{addHeaders[i].getValue("name"), val};
}
}
}
AnyValue[] setHeaders = resps.getAnyValues("setheader");
if (setHeaders.length > 0) {
defaultSetHeaders = new String[setHeaders.length][];
for (int i = 0; i < setHeaders.length; i++) {
String val = setHeaders[i].getValue("value");
if (val != null && val.startsWith("request.headers.")) {
defaultSetHeaders[i] = new String[]{setHeaders[i].getValue("name"), val, val.substring("request.headers.".length())};
} else {
defaultSetHeaders[i] = new String[]{setHeaders[i].getValue("name"), val};
}
}
}
AnyValue defcookieValue = resps.getAnyValue("defcookie");
if (defcookieValue != null) {
String domain = defcookieValue.getValue("domain");
String path = defcookieValue.getValue("path");
if (domain != null || path != null) {
defaultCookie = new HttpCookie("DEFAULTCOOKIE", "");
defaultCookie.setDomain(domain);
defaultCookie.setPath(path);
}
}
}
}
final String[][] addHeaders = defaultAddHeaders;
final String[][] setHeaders = defaultSetHeaders;
final HttpCookie defCookie = defaultCookie;
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter");
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool,
this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory)));
responsePool.setCreator((Object... params)
-> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory), addHeaders, setHeaders, defCookie));
return httpcontext;
}

View File

@@ -48,7 +48,7 @@ public final class SncpClient {
this.resultTypes = rt == void.class ? null : rt;
this.paramTypes = method.getGenericParameterTypes();
this.method = method;
this.async = method.getReturnType() == void.class && method.getAnnotation(Async.class) != null;
this.async = false;// method.getReturnType() == void.class && method.getAnnotation(Async.class) != null;
}
@Override

View File

@@ -31,24 +31,37 @@ public class DataCacheListenerService implements DataCacheListener, Service {
private final ConcurrentHashMap<String, BlockingQueue<Map.Entry<Class, Serializable[]>>> deleteQueues = new ConcurrentHashMap<>();
private boolean finer;
private final boolean finest = logger.isLoggable(Level.FINEST);
;
@Resource(name = "APP_NODE")
private String localNodeName = "";
@Resource(name = ".*")
HashMap<String, DataSource> sourcemap;
@Resource(name = "APP_GROUP")
private String localGroupName = "";
@Resource(name = "APP_GROUP")
private Map<String, Set<String>> groups;
@Resource(name = ".*")
HashMap<String, DataCacheListenerService> nodemap;
private HashMap<String, DataSource> sourcesmap;
@Resource(name = ".*")
private HashMap<String, DataCacheListenerService> nodesmap;
@Override
public void init(AnyValue config) {
finer = logger.isLoggable(Level.FINER);
if (finest) {
logger.finest(this.getClass().getSimpleName() + "-localgroup: " + localGroupName);
logger.finest(this.getClass().getSimpleName() + "-groups: " + groups);
logger.finest(this.getClass().getSimpleName() + "-sources: " + sourcesmap);
}
}
@Override
public <T> void insert(String sourceName, Class<T> clazz, T... entitys) {
if (finest) logger.finest("(source:" + sourceName + ") insert " + clazz + " --> " + Arrays.toString(entitys));
BlockingQueue<Map.Entry<Class, Object[]>> queue = this.insertQueues.get(sourceName);
if (queue == null) {
synchronized (this.insertQueues) {
@@ -68,7 +81,7 @@ public class DataCacheListenerService implements DataCacheListener, Service {
while (true) {
try {
Map.Entry<Class, Object[]> entry = tq.take();
sendInsert(sourceName, entry.getKey(), entry.getValue());
sendInsert(localGroupName, false, sourceName, entry.getKey(), entry.getValue());
} catch (Exception e) {
logger.log(Level.SEVERE, this.getName() + " sendInsert occur error", e);
}
@@ -87,24 +100,48 @@ public class DataCacheListenerService implements DataCacheListener, Service {
}
@RemoteOn
public <T> void sendInsert(String sourceName, Class<T> clazz, T... entitys) {
if (nodemap == null) return;
nodemap.forEach((x, y) -> {
try {
y.sendInsert(sourceName, clazz, entitys);
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send insert error (" + x + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e);
public <T> void sendInsert(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, T... entitys) {
if (nodesmap == null || groups == null) return;
if (ignoreRemote && finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onGroupSendInsert " + Arrays.toString(entitys));
for (Map.Entry<String, Set<String>> en : groups.entrySet()) {
if (group != null && group.equals(en.getKey())) { //同机房
for (String onode : en.getValue()) {
if (onode.equals(localNodeName)) continue;
DataCacheListenerService service = nodesmap.get(onode);
if (service != null) {
try {
service.sendInsert(group, false, sourceName, clazz, entitys);
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send insert error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e);
}
}
}
if (ignoreRemote) break;
} else if (!ignoreRemote) {
for (String onode : en.getValue()) {
DataCacheListenerService service = nodesmap.get(onode);
if (service != null) {
try {
service.sendInsert(group, false, sourceName, clazz, entitys);
break; //有一个成功就退出
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send insert error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e);
}
}
}
}
});
}
}
public final <T> void onSendInsert(String sourceName, Class<T> clazz, T... entitys) {
((DataJDBCSource) sourcemap.get(sourceName)).insertCache(entitys);
if (finer) logger.finer(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendInsert " + Arrays.toString(entitys));
public final <T> void onSendInsert(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, T... entitys) {
if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendInsert " + Arrays.toString(entitys));
((DataJDBCSource) sourcesmap.get(sourceName)).insertCache(entitys);
if (!this.localGroupName.equals(group)) sendInsert(this.localGroupName, true, sourceName, clazz, entitys); //不是同一机房来的资源需要同步到其他同机房的节点上
}
@Override
public <T> void update(String sourceName, Class<T> clazz, T... values) {
if (finest) logger.finest("(source:" + sourceName + ") update " + clazz + " --> " + Arrays.toString(values));
BlockingQueue<Map.Entry<Class, Object[]>> queue = this.updateQueues.get(sourceName);
if (queue == null) {
synchronized (this.updateQueues) {
@@ -124,7 +161,7 @@ public class DataCacheListenerService implements DataCacheListener, Service {
while (true) {
try {
Map.Entry<Class, Object[]> entry = tq.take();
sendUpdate(sourceName, entry.getKey(), entry.getValue());
sendUpdate(localGroupName, false, sourceName, entry.getKey(), entry.getValue());
} catch (Exception e) {
logger.log(Level.SEVERE, this.getName() + " sendUpdate occur error", e);
}
@@ -143,24 +180,48 @@ public class DataCacheListenerService implements DataCacheListener, Service {
}
@RemoteOn
public <T> void sendUpdate(String sourceName, Class<T> clazz, Object... values) {
if (nodemap == null) return;
nodemap.forEach((x, y) -> {
try {
y.sendUpdate(sourceName, clazz, values);
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send update error (" + x + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(values) + ")", e);
public <T> void sendUpdate(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, T... entitys) {
if (nodesmap == null || groups == null) return;
if (ignoreRemote && finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onGroupSendUpdate " + Arrays.toString(entitys));
for (Map.Entry<String, Set<String>> en : groups.entrySet()) {
if (group != null && group.equals(en.getKey())) { //同机房
for (String onode : en.getValue()) {
if (onode.equals(localNodeName)) continue;
DataCacheListenerService service = nodesmap.get(onode);
if (service != null) {
try {
service.sendUpdate(group, false, sourceName, clazz, entitys);
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send update error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e);
}
}
}
if (ignoreRemote) break;
} else if (!ignoreRemote) {
for (String onode : en.getValue()) {
DataCacheListenerService service = nodesmap.get(onode);
if (service != null) {
try {
service.sendUpdate(group, false, sourceName, clazz, entitys);
break; //有一个成功就退出
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send update error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(entitys) + ")", e);
}
}
}
}
});
}
}
public final <T> void onSendUpdate(String sourceName, Class<T> clazz, T... entitys) {
((DataJDBCSource) sourcemap.get(sourceName)).updateCache(clazz, entitys);
if (finer) logger.finer(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendUpdate " + Arrays.toString(entitys));
public final <T> void onSendUpdate(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, T... entitys) {
if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onSendUpdate " + Arrays.toString(entitys));
((DataJDBCSource) sourcesmap.get(sourceName)).updateCache(clazz, entitys);
if (!this.localGroupName.equals(group)) sendUpdate(this.localGroupName, true, sourceName, clazz, entitys); //不是同一机房来的资源需要同步到其他同机房的节点上
}
@Override
public <T> void delete(String sourceName, Class<T> clazz, Serializable... ids) {
if (finest) logger.finest("(source:" + sourceName + ") delete " + clazz + " --> " + Arrays.toString(ids));
BlockingQueue<Map.Entry<Class, Serializable[]>> queue = this.deleteQueues.get(sourceName);
if (queue == null) {
synchronized (this.deleteQueues) {
@@ -180,7 +241,7 @@ public class DataCacheListenerService implements DataCacheListener, Service {
while (true) {
try {
Map.Entry<Class, Serializable[]> entry = tq.take();
sendDelete(sourceName, entry.getKey(), entry.getValue());
sendDelete(localGroupName, false, sourceName, entry.getKey(), entry.getValue());
} catch (Exception e) {
logger.log(Level.SEVERE, this.getName() + " sendDelete occur error", e);
}
@@ -199,19 +260,42 @@ public class DataCacheListenerService implements DataCacheListener, Service {
}
@RemoteOn
public <T> void sendDelete(String sourceName, Class<T> clazz, Serializable... ids) {
if (nodemap == null) return;
nodemap.forEach((x, y) -> {
try {
y.sendDelete(sourceName, clazz, ids);
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send delete error (" + x + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(ids) + ")", e);
public <T> void sendDelete(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, Serializable... ids) {
if (nodesmap == null || groups == null) return;
if (ignoreRemote && finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onGroupSendDelete " + Arrays.toString(ids));
for (Map.Entry<String, Set<String>> en : groups.entrySet()) {
if (group != null && group.equals(en.getKey())) { //同机房
for (String onode : en.getValue()) {
if (onode.equals(localNodeName)) continue;
DataCacheListenerService service = nodesmap.get(onode);
if (service != null) {
try {
service.sendDelete(group, false, sourceName, clazz, ids);
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send delete error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(ids) + ")", e);
}
}
}
if (ignoreRemote) break;
} else if (!ignoreRemote) {
for (String onode : en.getValue()) {
DataCacheListenerService service = nodesmap.get(onode);
if (service != null) {
try {
service.sendDelete(group, false, sourceName, clazz, ids);
break; //有一个成功就退出
} catch (Exception e) {
logger.log(Level.FINE, this.getClass().getSimpleName() + " send delete error (" + group + "--" + onode + ", " + sourceName + ", " + clazz + ", " + Arrays.toString(ids) + ")", e);
}
}
}
}
});
}
}
public final <T> void onSendDelete(String sourceName, Class<T> clazz, Serializable... ids) {
((DataJDBCSource) sourcemap.get(sourceName)).deleteCache(clazz, ids);
if (finer) logger.finer(DataSource.class.getSimpleName() + "(" + this.localNodeName + "," + sourceName + ") onSendDelete " + clazz.getName() + " " + Arrays.toString(ids));
public final <T> void onSendDelete(String group, boolean ignoreRemote, String sourceName, Class<T> clazz, Serializable... ids) {
if (finest) logger.finest(DataSource.class.getSimpleName() + "(" + group + "--" + this.localNodeName + "," + sourceName + ") onSendDelete " + clazz.getName() + " " + Arrays.toString(ids));
((DataJDBCSource) sourcesmap.get(sourceName)).deleteCache(clazz, ids);
if (!this.localGroupName.equals(group)) sendDelete(this.localGroupName, true, sourceName, clazz, ids); //不是同一机房来的资源需要同步到其他同机房的节点上
}
}

View File

@@ -5,10 +5,14 @@
*/
package com.wentch.redkale.service;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.util.*;
/**
* 所有Service的实现类不得声明为final 允许远程模式的public方法不能声明为final。
*
* @Resource(name = ".*")
* private HashMap<String, XXXService> nodemap;
* 被注入的多个XXXService实例 但不会包含自身的XXXService。
*
* @author zhangjx
*/

View File

@@ -1666,7 +1666,7 @@ public final class DataJDBCSource implements DataSource {
public void connectionErrorOccurred(ConnectionEvent event) {
usingCounter.decrementAndGet();
if ("08S01".equals(event.getSQLException().getSQLState())) return; //MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
dataSource.logger.log(Level.WARNING, "connectionErronOccurred " + event.getSQLException().getSQLState(), event.getSQLException());
dataSource.logger.log(Level.WARNING, "connectionErronOccurred [" + event.getSQLException().getSQLState() + "]", event.getSQLException());
}
};
try {
@@ -1797,7 +1797,7 @@ public final class DataJDBCSource implements DataSource {
}
} catch (SQLException ex) {
if (!"08S01".equals(ex.getSQLState())) {//MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort " + ex.getSQLState(), ex);
dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort [" + ex.getSQLState() + "]", ex);
}
return poll(0, null);
}

View File

@@ -7,10 +7,10 @@ package com.wentch.redkale.util;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.logging.*;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import java.util.regex.*;
import javax.annotation.*;
/**
*
@@ -29,6 +29,8 @@ public final class ResourceFactory {
private final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, ?>> store = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Type, ConcurrentHashMap<String, ?>> gencstore = new ConcurrentHashMap<>();
private ResourceFactory(ResourceFactory parent) {
this.parent = parent;
}
@@ -73,10 +75,35 @@ public final class ResourceFactory {
}
}
public <A> void register(final String name, final Type clazz, final A rs) {
if (clazz instanceof Class) {
register(name, (Class) clazz, rs);
return;
}
ConcurrentHashMap map = this.gencstore.get(clazz);
if (map == null) {
ConcurrentHashMap<String, A> sub = new ConcurrentHashMap<>();
sub.put(name, rs);
gencstore.put(clazz, sub);
} else {
map.put(name, rs);
}
}
public <A> A find(Class<? extends A> clazz) {
return find("", clazz);
}
public <A> A find(String name, Type clazz) {
Map<String, ?> map = this.gencstore.get(clazz);
if (map != null) {
A rs = (A) map.get(name);
if (rs != null) return rs;
}
if (parent != null) return parent.find(name, clazz);
return null;
}
public <A> A find(String name, Class<? extends A> clazz) {
Map<String, ?> map = this.store.get(clazz);
if (map != null) {
@@ -125,7 +152,8 @@ public final class ResourceFactory {
for (Field field : clazz.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers())) continue;
field.setAccessible(true);
final Class type = field.getType();
final Class classtype = field.getType();
final Type genctype = field.getGenericType();
Resource rc = field.getAnnotation(Resource.class);
if (rc == null) {
boolean flag = true;
@@ -142,14 +170,16 @@ public final class ResourceFactory {
continue;
}
if (Modifier.isFinal(field.getModifiers())) continue;
Object rs;
if (Map.class.isAssignableFrom(type)) {
rs = find(Pattern.compile(rc.name().isEmpty() ? ".+" : rc.name()), (Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1], src);
} else {
if (rc.name().startsWith("property.")) {
rs = find(rc.name(), String.class);
Object rs = genctype == classtype ? null : find(rc.name(), genctype);
if (rs == null) {
if (Map.class.isAssignableFrom(classtype)) {
rs = find(Pattern.compile(rc.name().isEmpty() ? ".+" : rc.name()), (Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1], src);
} else {
rs = find(rc.name(), type);
if (rc.name().startsWith("property.")) {
rs = find(rc.name(), String.class);
} else {
rs = find(rc.name(), classtype);
}
}
}
if (rs == null) {
@@ -157,20 +187,20 @@ public final class ResourceFactory {
if (it != null) it.invoke(this, src, field);
continue;
}
if (!rs.getClass().isPrimitive() && type.isPrimitive()) {
if (type == int.class) {
if (!rs.getClass().isPrimitive() && classtype.isPrimitive()) {
if (classtype == int.class) {
rs = Integer.decode(rs.toString());
} else if (type == long.class) {
} else if (classtype == long.class) {
rs = Long.decode(rs.toString());
} else if (type == short.class) {
} else if (classtype == short.class) {
rs = Short.decode(rs.toString());
} else if (type == boolean.class) {
} else if (classtype == boolean.class) {
rs = "true".equalsIgnoreCase(rs.toString());
} else if (type == byte.class) {
} else if (classtype == byte.class) {
rs = Byte.decode(rs.toString());
} else if (type == float.class) {
} else if (classtype == float.class) {
rs = Float.parseFloat(rs.toString());
} else if (type == double.class) {
} else if (classtype == double.class) {
rs = Double.parseDouble(rs.toString());
}
}