55 Commits
1.9.0 ... 1.9.2

Author SHA1 Message Date
Redkale
bfbcab4009 2018-04-03 08:29:42 +08:00
Redkale
742c352080 2018-04-02 20:53:23 +08:00
Redkale
643827354c 2018-04-02 19:29:55 +08:00
Redkale
2f7bebfc17 2018-04-02 18:48:56 +08:00
Redkale
7bf8d60ddf 2018-04-02 16:28:21 +08:00
Redkale
09c51b6b4e 2018-04-02 16:26:35 +08:00
Redkale
84f6ce58a0 2018-04-02 14:48:23 +08:00
Redkale
9e90ae5285 2018-04-02 14:08:20 +08:00
Redkale
93b1c9f0d9 2018-04-02 13:22:45 +08:00
Redkale
dab8ed8ceb 2018-03-31 11:27:11 +08:00
Redkale
f016e49111 2018-03-31 11:15:58 +08:00
Redkale
b403f22284 2018-03-31 10:46:21 +08:00
Redkale
ccb9ac93f7 ConvertFactory增加registerIgnoreAll配置 2018-03-31 10:33:57 +08:00
Redkale
72935f1ebe 2018-03-31 10:28:33 +08:00
Redkale
1f7d46219a net和WebSokcet模块增加Cryptor功能 2018-03-30 20:38:29 +08:00
Redkale
9094f76de2 Utility增加contains系列方法 2018-03-30 19:53:20 +08:00
Redkale
d67cbeb3f2 2018-03-30 17:42:18 +08:00
Redkale
90562ebd04 2018-03-30 17:25:57 +08:00
Redkale
b14e14659c 2018-03-30 08:56:00 +08:00
Redkale
84a15afc9a 2018-03-30 08:39:15 +08:00
Redkale
8c1aba5608 Transport.pollConnection的负载均衡策略改成轮询 2018-03-29 20:21:31 +08:00
Redkale
a72c689f07 2018-03-29 19:14:20 +08:00
Redkale
1d74f34575 2018-03-29 19:10:50 +08:00
Redkale
a24092d391 TransportStrategy增加offerConnection方法 2018-03-29 19:02:16 +08:00
Redkale
097ae701c1 2018-03-29 11:18:22 +08:00
Redkale
e250a593a7 完善Transport.pollConnection中连接池功能 2018-03-29 11:14:13 +08:00
Redkale
1547e6b714 Reproduce支持public field和setter、getter混合 2018-03-29 08:33:55 +08:00
Redkale
9f14224269 Reproduce增加不同字段名可以赋值的功能 2018-03-28 14:55:40 +08:00
Redkale
ecc54f9bd5 2018-03-28 09:32:21 +08:00
Redkale
521ee56d31 2018-03-28 08:59:47 +08:00
Redkale
f9da063532 2018-03-24 11:37:28 +08:00
Redkale
5070c27f6a 2018-03-24 11:34:24 +08:00
Redkale
c0664bb0a9 2018-03-24 11:28:25 +08:00
Redkale
f8c92f1ec4 2018-03-24 11:18:48 +08:00
Redkale
01930cfdc8 屏蔽sun.misc.Unsafe 2018-03-24 11:15:38 +08:00
Redkale
e76e321765 删掉过期的AsyncHandler 2018-03-21 14:14:30 +08:00
Redkale
99d2db31f7 2018-03-20 19:58:24 +08:00
Redkale
fa9015447b Redkale 1.9.2 开始 2018-03-20 19:57:19 +08:00
Redkale
735ad0908b 2018-03-17 10:44:03 +08:00
Redkale
a080a6a8cc 2018-03-16 08:45:42 +08:00
Redkale
2cbc51cfdf 2018-03-14 21:21:23 +08:00
Redkale
8631b4bdf5 Entity数据库实体类支持AtomicInteger、AtomicLong字段类型 2018-03-14 17:36:29 +08:00
Redkale
285cb86891 RetResult增加successFuture方法 2018-03-14 14:56:42 +08:00
Redkale
bf535a7161 2018-03-13 09:28:13 +08:00
Redkale
34e37471b8 2018-03-12 21:50:17 +08:00
Redkale
e223548b23 2018-03-12 21:41:36 +08:00
Redkale
3a13d242f6 2018-03-12 20:54:24 +08:00
Redkale
204e6ec99f 2018-03-12 10:04:53 +08:00
Redkale
7e348782e4 2018-03-09 19:05:37 +08:00
Redkale
a4cbe7db17 2018-03-09 09:33:42 +08:00
Redkale
f8101acb4b 2018-03-07 19:55:02 +08:00
Redkale
9ed2d59317 2018-03-07 12:50:26 +08:00
Redkale
0329ad7832 2018-03-07 12:13:06 +08:00
Redkale
53df45456f 删掉对sun.misc.Unsafe的依赖和替换过期方法Class.newInstance() 2018-03-07 11:31:11 +08:00
Redkale
93698bacff Redkale 1.9.1 开始 2018-03-07 08:30:15 +08:00
63 changed files with 1193 additions and 424 deletions

View File

@@ -85,7 +85,9 @@
如果name是mimetype.property.开头的值将会在进程启动时进行MimeType.add("yyyy", "YYYYYY")操作。
load: 加载文件,多个用;隔开。
默认置入的system.property.的有:
System.setProperty("net.transport.poolmaxconns", "100");
System.setProperty("net.transport.pinginterval", "30");
System.setProperty("net.transport.checkinterval", "30");
System.setProperty("convert.json.tiny", "true");
System.setProperty("convert.bson.tiny", "true");
System.setProperty("convert.json.pool.size", "128");
@@ -126,6 +128,17 @@
-->
<server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib="">
<!--
【节点在<server>中唯一】
value: 创建SSLContext的实现类, 可自定义必须是org.redkale.net.SSLCreator的子类
clientauth: true/false/want
keystorepass: KEY密码
keystorefile: KEY文件
truststorepass: TRUST密码
truststorefile: TRUST文件
-->
<ssl creator=""/>
<!--
加载所有的Service服务;
在同一个进程中同一个name同一类型的Service将共用同一个实例
@@ -234,8 +247,7 @@
当Server为HTTP协议时render才有效. 指定输出引擎的实现类
value: 输出引擎的实现类, 必须是org.redkale.net.http.HttpRender的子类
-->
<render value="org.redkalex.htel.HttpTemplateRender">
</render>
<render value="org.redkalex.htel.HttpTemplateRender"/>
<!--
【节点在<server>中唯一】
当Server为HTTP协议时ResourceServlet才有效. 默认存在一个有默认属性的resource-servlet节点

View File

@@ -32,7 +32,7 @@ public class MethodDebugVisitor {
System.out.println();
}
private final Map<Label, Integer> labels = new LinkedHashMap();
private final Map<Label, Integer> labels = new LinkedHashMap<>();
private static final String[] opcodes = new String[200]; //0 -18

View File

@@ -275,7 +275,7 @@ public final class Application {
try {
final String strategyClass = transportConf.getValue("strategy");
if (strategyClass != null && !strategyClass.isEmpty()) {
strategy = (TransportStrategy) classLoader.loadClass(strategyClass).newInstance();
strategy = (TransportStrategy) classLoader.loadClass(strategyClass).getDeclaredConstructor().newInstance();
}
final AtomicInteger counter = new AtomicInteger();
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
@@ -315,7 +315,9 @@ public final class Application {
});
}
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSecond, writeTimeoutSecond, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"));
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("net.transport.poolmaxconns", "100"))
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
@@ -355,7 +357,9 @@ public final class Application {
public void init() throws Exception {
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "" + Runtime.getRuntime().availableProcessors() * 4);
System.setProperty("net.transport.poolmaxconns", "100");
System.setProperty("net.transport.pinginterval", "30");
System.setProperty("net.transport.checkinterval", "30");
System.setProperty("convert.bson.tiny", "true");
System.setProperty("convert.json.tiny", "true");
System.setProperty("convert.bson.pool.size", "128");
@@ -502,7 +506,8 @@ public final class Application {
if (listenClass.isEmpty()) continue;
Class clazz = classLoader.loadClass(listenClass);
if (!ApplicationListener.class.isAssignableFrom(clazz)) continue;
ApplicationListener listener = (ApplicationListener) clazz.newInstance();
@SuppressWarnings("unchecked")
ApplicationListener listener = (ApplicationListener) clazz.getDeclaredConstructor().newInstance();
listener.init(config);
this.listeners.add(listener);
}

View File

@@ -266,7 +266,7 @@ public class LogFileHandler extends Handler {
try {
if (filterstr != null) {
Class<?> clz = ClassLoader.getSystemClassLoader().loadClass(filterstr);
setFilter((Filter) clz.newInstance());
setFilter((Filter) clz.getDeclaredConstructor().newInstance());
}
} catch (Exception e) {
}
@@ -274,7 +274,7 @@ public class LogFileHandler extends Handler {
try {
if (formatterstr != null) {
Class<?> clz = ClassLoader.getSystemClassLoader().loadClass(formatterstr);
setFormatter((Formatter) clz.newInstance());
setFormatter((Formatter) clz.getDeclaredConstructor().newInstance());
}
} catch (Exception e) {
}

View File

@@ -56,16 +56,19 @@ public class NodeHttpServer extends NodeServer {
}
@Override
@SuppressWarnings("unchecked")
protected ClassFilter<Service> createServiceClassFilter() {
return createClassFilter(this.sncpGroup, null, Service.class, new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
}
@Override
@SuppressWarnings("unchecked")
protected ClassFilter<Filter> createFilterClassFilter() {
return createClassFilter(null, null, HttpFilter.class, new Class[]{WatchFilter.class}, null, "filters", "filter");
}
@Override
@SuppressWarnings("unchecked")
protected ClassFilter<Servlet> createServletClassFilter() {
return createClassFilter(null, WebServlet.class, HttpServlet.class, new Class[]{WatchServlet.class}, null, "servlets", "servlet");
}
@@ -87,6 +90,7 @@ public class NodeHttpServer extends NodeServer {
}
@Override
@SuppressWarnings("unchecked")
protected void loadServlet(ClassFilter<? extends Servlet> servletFilter, ClassFilter otherFilter) throws Exception {
if (httpServer != null) loadHttpServlet(servletFilter, otherFilter);
}
@@ -136,7 +140,7 @@ public class NodeHttpServer extends NodeServer {
for (FilterEntry<? extends Filter> en : list) {
Class<HttpFilter> clazz = (Class<HttpFilter>) en.getType();
if (Modifier.isAbstract(clazz.getModifiers())) continue;
final HttpFilter filter = clazz.newInstance();
final HttpFilter filter = clazz.getDeclaredConstructor().newInstance();
resourceFactory.inject(filter, this);
DefaultAnyValue filterConf = (DefaultAnyValue) en.getProperty();
this.httpServer.addHttpFilter(filter, filterConf);
@@ -172,7 +176,7 @@ public class NodeHttpServer extends NodeServer {
if (Modifier.isAbstract(clazz.getModifiers())) continue;
WebServlet ws = clazz.getAnnotation(WebServlet.class);
if (ws == null || ws.value().length == 0) continue;
final HttpServlet servlet = clazz.newInstance();
final HttpServlet servlet = clazz.getDeclaredConstructor().newInstance();
resourceFactory.inject(servlet, this);
final String[] mappings = ws.value();
String pref = ws.repair() ? prefix : "";
@@ -312,7 +316,7 @@ public class NodeHttpServer extends NodeServer {
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
if (ws != null && !ws.repair()) prefix2 = "";
resourceFactory.inject(servlet, NodeHttpServer.this);
if (finest) logger.finest(threadName + " " + stype.getName() + " create RestWebSocketServlet " + servlet);
if (finest) logger.finest(threadName + " " + stype.getName() + " create a RestWebSocketServlet");
if (ss != null) {
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
for (int i = 0; i < mappings.length; i++) {

View File

@@ -149,7 +149,7 @@ public abstract class NodeServer {
String interceptorClass = this.serverConf.getValue("interceptor", "");
if (!interceptorClass.isEmpty()) {
Class clazz = serverClassLoader.loadClass(interceptorClass);
this.interceptor = (NodeInterceptor) clazz.newInstance();
this.interceptor = (NodeInterceptor) clazz.getDeclaredConstructor().newInstance();
}
ClassFilter<Service> serviceFilter = createServiceClassFilter();

View File

@@ -93,7 +93,7 @@ public class NodeSncpServer extends NodeServer {
for (FilterEntry<? extends Filter> en : list) {
Class<SncpFilter> clazz = (Class<SncpFilter>) en.getType();
if (Modifier.isAbstract(clazz.getModifiers())) continue;
final SncpFilter filter = clazz.newInstance();
final SncpFilter filter = clazz.getDeclaredConstructor().newInstance();
resourceFactory.inject(filter, this);
DefaultAnyValue filterConf = (DefaultAnyValue) en.getProperty();
this.sncpServer.addSncpFilter(filter, filterConf);

View File

@@ -24,16 +24,19 @@ public class NodeWatchServer extends NodeHttpServer {
}
@Override
@SuppressWarnings("unchecked")
protected ClassFilter<Service> createServiceClassFilter() {
return createClassFilter(this.sncpGroup, null, WatchService.class, null, Annotation.class, "services", "service");
}
@Override
@SuppressWarnings("unchecked")
protected ClassFilter<Filter> createFilterClassFilter() {
return createClassFilter(null, null, WatchFilter.class, null, null, "filters", "filter");
}
@Override
@SuppressWarnings("unchecked")
protected ClassFilter<Servlet> createServletClassFilter() {
return createClassFilter(null, WebServlet.class, WatchServlet.class, null, null, "servlets", "servlet");
}

View File

@@ -53,6 +53,9 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
private final Set<Class> skipIgnores = new HashSet();
//key:需要屏蔽的字段value排除的字段名
private final ConcurrentHashMap<Class, Set<String>> ignoreAlls = new ConcurrentHashMap();
private boolean skipAllIgnore = false;
protected ConvertFactory(ConvertFactory<R, W> parent, boolean tiny) {
@@ -171,12 +174,19 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
public ConvertColumnEntry findRef(AccessibleObject element) {
if (element == null) return null;
ConvertColumnEntry en = this.columnEntrys.get(element);
if (en != null) return en;
Set<String> onlyColumns = null;
if (element instanceof Method) {
onlyColumns = ignoreAlls.get(((Method) element).getDeclaringClass());
} else if (element instanceof Field) {
onlyColumns = ignoreAlls.get(((Field) element).getDeclaringClass());
}
if (en != null && onlyColumns == null) return en;
final ConvertType ct = this.getConvertType();
ConvertColumn[] ccs = element.getAnnotationsByType(ConvertColumn.class);
String fieldName = null;
if (ccs.length == 0 && element instanceof Method) {
final Method method = (Method) element;
String fieldName = readGetSetFieldName(method);
fieldName = readGetSetFieldName(method);
if (fieldName != null) {
try {
ccs = method.getDeclaringClass().getDeclaredField(fieldName).getAnnotationsByType(ConvertColumn.class);
@@ -184,8 +194,22 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
}
}
}
if (onlyColumns != null && fieldName == null) {
if (element instanceof Method) {
fieldName = readGetSetFieldName((Method) element);
} else if (element instanceof Field) {
fieldName = ((Field) element).getName();
}
}
if (ccs.length == 0 && onlyColumns != null && fieldName != null) {
if (!onlyColumns.contains(fieldName)) return new ConvertColumnEntry(fieldName, true);
}
for (ConvertColumn ref : ccs) {
if (ref.type().contains(ct)) {
if (onlyColumns != null && fieldName != null) {
String realName = ref.name().isEmpty() ? fieldName : ref.name();
if (!onlyColumns.contains(realName)) return new ConvertColumnEntry(realName, true);
}
ConvertColumnEntry entry = new ConvertColumnEntry(ref);
if (skipAllIgnore) {
entry.setIgnore(false);
@@ -318,6 +342,22 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
skipIgnores.add(type);
}
/**
* 屏蔽指定类所有字段,仅仅保留指定字段 <br>
* <b>注意: 该配置优先级高于skipAllIgnore和ConvertColumnEntry配置</b>
*
* @param type 指定的类
* @param excludeColumns 需要排除的字段名
*/
public final void registerIgnoreAll(final Class type, String... excludeColumns) {
Set<String> set = ignoreAlls.get(type);
if (set == null) {
ignoreAlls.put(type, new HashSet<>(Arrays.asList(excludeColumns)));
} else {
set.addAll(Arrays.asList(excludeColumns));
}
}
public final void register(final Class type, boolean ignore, String... columns) {
for (String column : columns) {
register(type, column, new ConvertColumnEntry(column, ignore));

View File

@@ -34,6 +34,7 @@ public class OptionalCoder<R extends Reader, W extends Writer, T> extends Simple
private final Object lock = new Object();
@SuppressWarnings("unchecked")
public OptionalCoder(final ConvertFactory factory, final Type type) {
this.type = type;
try {

View File

@@ -25,6 +25,7 @@ public final class BigIntegerSimpledCoder<R extends Reader, W extends Writer> ex
public static final BigIntegerSimpledCoder instance = new BigIntegerSimpledCoder();
@Override
@SuppressWarnings("unchecked")
public void convertTo(W out, BigInteger value) {
if (value == null) {
out.writeNull();
@@ -34,6 +35,7 @@ public final class BigIntegerSimpledCoder<R extends Reader, W extends Writer> ex
}
@Override
@SuppressWarnings("unchecked")
public BigInteger convertFrom(R in) {
byte[] bytes = ByteArraySimpledCoder.instance.convertFrom(in);
return bytes == null ? null : new BigInteger(bytes);

View File

@@ -27,6 +27,7 @@ public final class DLongSimpledCoder<R extends Reader, W extends Writer> extends
public static final DLongSimpledCoder instance = new DLongSimpledCoder();
@Override
@SuppressWarnings("unchecked")
public void convertTo(final W out, final DLong value) {
if (value == null) {
out.writeNull();
@@ -36,6 +37,7 @@ public final class DLongSimpledCoder<R extends Reader, W extends Writer> extends
}
@Override
@SuppressWarnings("unchecked")
public DLong convertFrom(R in) {
byte[] bs = bsSimpledCoder.convertFrom(in);
if (bs == null) return null;

View File

@@ -74,6 +74,7 @@ public final class DoubleArraySimpledCoder<R extends Reader, W extends Writer> e
public static final DoubleStreamSimpledCoder instance = new DoubleStreamSimpledCoder();
@Override
@SuppressWarnings("unchecked")
public void convertTo(W out, DoubleStream values) {
if (values == null) {
out.writeNull();
@@ -83,6 +84,7 @@ public final class DoubleArraySimpledCoder<R extends Reader, W extends Writer> e
}
@Override
@SuppressWarnings("unchecked")
public DoubleStream convertFrom(R in) {
double[] value = DoubleArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : DoubleStream.of(value);

View File

@@ -74,6 +74,7 @@ public final class IntArraySimpledCoder<R extends Reader, W extends Writer> exte
public static final IntStreamSimpledCoder instance = new IntStreamSimpledCoder();
@Override
@SuppressWarnings("unchecked")
public void convertTo(W out, IntStream values) {
if (values == null) {
out.writeNull();
@@ -83,6 +84,7 @@ public final class IntArraySimpledCoder<R extends Reader, W extends Writer> exte
}
@Override
@SuppressWarnings("unchecked")
public IntStream convertFrom(R in) {
int[] value = IntArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : IntStream.of(value);

View File

@@ -74,6 +74,7 @@ public final class LongArraySimpledCoder<R extends Reader, W extends Writer> ext
public static final LongStreamSimpledCoder instance = new LongStreamSimpledCoder();
@Override
@SuppressWarnings("unchecked")
public void convertTo(W out, LongStream values) {
if (values == null) {
out.writeNull();
@@ -83,6 +84,7 @@ public final class LongArraySimpledCoder<R extends Reader, W extends Writer> ext
}
@Override
@SuppressWarnings("unchecked")
public LongStream convertFrom(R in) {
long[] value = LongArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : LongStream.of(value);

View File

@@ -170,7 +170,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
*/
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
final SocketAddress address, final boolean noDelay, final int readTimeoutSecond, final int writeTimeoutSecond) {
final CompletableFuture future = new CompletableFuture();
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
try {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(address, null, new CompletionHandler<Void, Void>() {

View File

@@ -144,6 +144,10 @@ public class Context {
return bufferPool;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return bufferPool;
}
public ByteBuffer pollBuffer() {
return bufferPool.get();
}

View File

@@ -0,0 +1,42 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.nio.ByteBuffer;
import java.util.function.*;
/**
* 加密解密接口
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface Cryptor {
/**
* 加密
*
* @param buffers 待加密数据
* @param supplier ByteBuffer生成器
* @param consumer ByteBuffer回收器
*
* @return 加密后数据
*/
public ByteBuffer[] encrypt(ByteBuffer[] buffers, final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer);
/**
* 解密
*
* @param buffers 待解密数据
* @param supplier ByteBuffer生成器
* @param consumer ByteBuffer回收器
*
* @return 解密后数据
*/
public ByteBuffer[] decrypt(ByteBuffer[] buffers, final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer);
}

View File

@@ -0,0 +1,20 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public enum SSLClientAuth {
NONE,
NEED,
WANT,
CLIENT;
}

View File

@@ -0,0 +1,65 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.*;
import java.security.*;
import java.security.cert.*;
import javax.net.ssl.*;
import org.redkale.util.*;
/**
* 根据配置生成SSLContext
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface SSLCreator {
default SSLContext create(Server server, AnyValue sslConf) throws Exception {
String keyfile = sslConf.getValue("keystorefile");
String keypass = sslConf.getValue("keystorepass", "");
KeyManager[] keyManagers = null;
if (keyfile != null) {
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(keyfile), keypass.toCharArray());
kmf.init(ks, keypass.toCharArray());
keyManagers = kmf.getKeyManagers();
}
String trustfile = sslConf.getValue("truststorefile");
String trustpass = sslConf.getValue("truststorepass", "");
TrustManager[] trustManagers;
if (trustfile != null) {
KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new FileInputStream(trustfile), trustpass.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(ts);
trustManagers = tmf.getTrustManagers();
} else {
trustManagers = new TrustManager[]{new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}};
}
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(keyManagers, trustManagers, new SecureRandom());
return sslContext;
}
}

View File

@@ -124,6 +124,20 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.responsePoolSize = config.getIntValue("responsePoolSize", this.threads * 2);
this.name = config.getValue("name", "Server-" + protocol + "-" + this.address.getPort());
if (!this.name.matches("^[a-zA-Z][\\w_-]{1,64}$")) throw new RuntimeException("server.name (" + this.name + ") is illegal");
AnyValue sslConf = config.getAnyValue("ssl");
if (sslConf != null) {
String creatorClass = sslConf.getValue("creator", SSLCreator.class.getName());
SSLCreator creator = null;
if (SSLCreator.class.getName().equals(creatorClass) || creatorClass.isEmpty()) {
creator = new SSLCreator() {
};
} else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
creator = ((SSLCreator) classLoader.loadClass(creatorClass).getDeclaredConstructor().newInstance());
}
this.resourceFactory.inject(creator);
this.sslContext = creator.create(this, sslConf);
}
final AtomicInteger counter = new AtomicInteger();
final Format f = createFormat();
final String n = name;
@@ -135,12 +149,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
}
protected static int parseLenth(String value, int defValue) {
if (value == null) return defValue;
value = value.toUpperCase().replace("B", "");
if (value.endsWith("G")) return Integer.decode(value.replace("G", "")) * 1024 * 1024 * 1024;
if (value.endsWith("M")) return Integer.decode(value.replace("M", "")) * 1024 * 1024;
if (value.endsWith("K")) return Integer.decode(value.replace("K", "")) * 1024;
return Integer.decode(value);
return (int) parseLenth(value, defValue + 0L);
}
protected static long parseLenth(String value, long defValue) {
@@ -152,6 +161,14 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
return Long.decode(value);
}
protected static String formatLenth(long value) {
if (value < 1) return "" + value;
if (value % (1024 * 1024 * 1024) == 0) return value / (1024 * 1024 * 1024) + "G";
if (value % (1024 * 1024) == 0) return value / (1024 * 1024) + "M";
if (value % 1024 == 0) return value / (1024) + "K";
return value + "B";
}
public void destroy(final AnyValue config) throws Exception {
this.prepare.destroy(context, config);
}
@@ -213,7 +230,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
serverChannel.accept();
final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", bufferCapacity: " + bufferCapacity / 1024 + "K, bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
+ ", threads: " + threads + ", maxbody: " + formatLenth(context.maxbody) + ", bufferCapacity: " + formatLenth(bufferCapacity) + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
+ ", started in " + (System.currentTimeMillis() - context.getServerStartTime()) + " ms");
}

View File

@@ -5,12 +5,14 @@
*/
package org.redkale.net;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Supplier;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
@@ -30,8 +32,6 @@ public final class Transport {
public static final String DEFAULT_PROTOCOL = "TCP";
protected static final int MAX_POOL_LIMIT = Runtime.getRuntime().availableProcessors() * 16;
protected static final boolean supportTcpNoDelay;
static {
@@ -45,6 +45,8 @@ public final class Transport {
supportTcpNoDelay = tcpNoDelay;
}
protected final AtomicInteger seq = new AtomicInteger(-1);
protected final TransportFactory factory;
protected final String name; //即<group>的name属性
@@ -59,7 +61,8 @@ public final class Transport {
protected final InetSocketAddress clientAddress;
protected TransportAddress[] transportAddres = new TransportAddress[0];
//不可能为null
protected TransportNode[] transportNodes = new TransportNode[0];
protected final ObjectPool<ByteBuffer> bufferPool;
@@ -68,8 +71,6 @@ public final class Transport {
//负载均衡策略
protected final TransportStrategy strategy;
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
@@ -95,33 +96,44 @@ public final class Transport {
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
TransportAddress[] oldAddresses = this.transportAddres;
List<TransportAddress> list = new ArrayList<>();
if (addresses != null) {
for (InetSocketAddress addr : addresses) {
if (clientAddress != null && clientAddress.equals(addr)) continue;
list.add(new TransportAddress(addr));
final TransportNode[] oldNodes = this.transportNodes;
synchronized (this) {
List<TransportNode> list = new ArrayList<>();
if (addresses != null) {
for (InetSocketAddress addr : addresses) {
if (clientAddress != null && clientAddress.equals(addr)) continue;
boolean hasold = false;
for (TransportNode oldAddr : oldNodes) {
if (oldAddr.getAddress().equals(addr)) {
list.add(oldAddr);
hasold = true;
break;
}
}
if (hasold) continue;
list.add(new TransportNode(factory.poolmaxconns, addr));
}
}
this.transportNodes = list.toArray(new TransportNode[list.size()]);
}
this.transportAddres = list.toArray(new TransportAddress[list.size()]);
InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length];
InetSocketAddress[] rs = new InetSocketAddress[oldNodes.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = oldAddresses[i].getAddress();
rs[i] = oldNodes[i].getAddress();
}
return rs;
}
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
if (addr == null) return false;
if (clientAddress != null && clientAddress.equals(addr)) return false;
synchronized (this) {
if (this.transportAddres == null) {
this.transportAddres = new TransportAddress[]{new TransportAddress(addr)};
if (this.transportNodes.length == 0) {
this.transportNodes = new TransportNode[]{new TransportNode(factory.poolmaxconns, addr)};
} else {
for (TransportAddress i : this.transportAddres) {
for (TransportNode i : this.transportNodes) {
if (addr.equals(i.address)) return false;
}
this.transportAddres = Utility.append(transportAddres, new TransportAddress(addr));
this.transportNodes = Utility.append(transportNodes, new TransportNode(factory.poolmaxconns, addr));
}
return true;
}
@@ -129,9 +141,8 @@ public final class Transport {
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
if (this.transportAddres == null) return false;
synchronized (this) {
this.transportAddres = Utility.remove(transportAddres, new TransportAddress(addr));
this.transportNodes = Utility.remove(transportNodes, new TransportNode(factory.poolmaxconns, addr));
}
return true;
}
@@ -145,32 +156,39 @@ public final class Transport {
}
public void close() {
connPool.forEach((k, v) -> v.forEach(c -> c.dispose()));
TransportNode[] nodes = this.transportNodes;
if (nodes == null) return;
for (TransportNode node : nodes) {
if (node != null) node.dispose();
}
}
public InetSocketAddress getClientAddress() {
return clientAddress;
}
public TransportAddress[] getTransportAddresses() {
return transportAddres;
public TransportNode[] getTransportNodes() {
return transportNodes;
}
public TransportNode findTransportNode(SocketAddress addr) {
for (TransportNode node : this.transportNodes) {
if (node.address.equals(addr)) return node;
}
return null;
}
public InetSocketAddress[] getRemoteAddresses() {
InetSocketAddress[] rs = new InetSocketAddress[transportAddres.length];
InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = transportAddres[i].getAddress();
rs[i] = transportNodes[i].getAddress();
}
return rs;
}
public ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> getAsyncConnectionPool() {
return connPool;
}
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddres) + "}";
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}";
}
public ByteBuffer pollBuffer() {
@@ -189,90 +207,148 @@ public final class Transport {
for (ByteBuffer buffer : buffers) offerBuffer(buffer);
}
public AsynchronousChannelGroup getTransportChannelGroup() {
return group;
}
public boolean isTCP() {
return tcp;
}
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr) {
if (this.strategy != null) return strategy.pollConnection(addr, this);
if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address;
final boolean rand = addr == null;
if (rand && this.transportAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr0) {
if (this.strategy != null) return strategy.pollConnection(addr0, this);
final TransportNode[] nodes = this.transportNodes;
if (addr0 == null && nodes.length == 1) addr0 = nodes[0].address;
final SocketAddress addr = addr0;
final boolean rand = addr == null; //是否随机取地址
if (rand && nodes.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
try {
if (tcp) {
AsynchronousSocketChannel channel = null;
if (rand) { //取地址
TransportAddress transportAddr;
boolean tryed = false;
for (int i = 0; i < transportAddres.length; i++) {
transportAddr = transportAddres[i];
addr = transportAddr.address;
if (!transportAddr.enable) continue;
final BlockingQueue<AsyncConnection> queue = transportAddr.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
}
}
tryed = true;
if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try {
channel.connect(addr).get(2, TimeUnit.SECONDS);
transportAddr.enable = true;
break;
} catch (Exception iex) {
transportAddr.enable = false;
channel = null;
}
}
if (channel == null && !tryed) {
for (int i = 0; i < transportAddres.length; i++) {
transportAddr = transportAddres[i];
addr = transportAddr.address;
if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try {
channel.connect(addr).get(2, TimeUnit.SECONDS);
transportAddr.enable = true;
break;
} catch (Exception iex) {
transportAddr.enable = false;
channel = null;
}
}
}
} else {
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, 6, 6);
}
if (channel == null) return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(AsyncConnection.create(channel, addr, 6, 6));
} else { // UDP
if (rand) addr = this.transportAddres[0].address;
if (!tcp) { // UDP
SocketAddress udpaddr = rand ? nodes[0].address : addr;
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(addr);
return CompletableFuture.completedFuture(AsyncConnection.create(channel, addr, true, 6, 6));
channel.connect(udpaddr);
return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSecond, factory.writeTimeoutSecond));
}
if (!rand) { //指定地址
TransportNode node = findTransportNode(addr);
if (node == null) {
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSecond, factory.writeTimeoutSecond);
}
final BlockingQueue<AsyncConnection> queue = node.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
}
}
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSecond, factory.writeTimeoutSecond);
}
//---------------------随机取地址------------------------
int enablecount = 0;
final TransportNode[] newnodes = new TransportNode[nodes.length];
for (final TransportNode node : nodes) {
if (node.disabletime > 0) continue;
newnodes[enablecount++] = node;
}
final long now = System.currentTimeMillis();
if (enablecount > 0) { //存在可用的地址
final TransportNode one = newnodes[Math.abs(seq.incrementAndGet()) % enablecount];
final BlockingQueue<AsyncConnection> queue = one.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
}
}
CompletableFuture future = new CompletableFuture();
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(one.address, one, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
try {
channel.close();
} catch (Exception e) {
}
try {
pollConnection0(nodes, one, now).whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(r);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
}
});
return future;
}
return pollConnection0(nodes, null, now);
} catch (Exception ex) {
throw new RuntimeException("transport address = " + addr, ex);
}
}
private CompletableFuture<AsyncConnection> pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException {
//从可用/不可用的地址列表中创建连接
AtomicInteger count = new AtomicInteger(nodes.length);
CompletableFuture future = new CompletableFuture();
for (final TransportNode node : nodes) {
if (node == exclude) continue;
if (future.isDone()) return future;
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(node.address, node, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
if (count.decrementAndGet() < 1) {
future.completeExceptionally(exc);
}
try {
channel.close();
} catch (Exception e) {
}
}
});
}
return future;
}
public void offerConnection(final boolean forceClose, AsyncConnection conn) {
if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return;
if (!forceClose && conn.isTCP()) {
if (conn.isOpen()) {
BlockingQueue<AsyncConnection> queue = connPool.get(conn.getRemoteAddress());
if (queue == null) {
queue = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
connPool.put(conn.getRemoteAddress(), queue);
}
if (!queue.offer(conn)) conn.dispose();
TransportNode node = findTransportNode(conn.getRemoteAddress());
if (node == null || !node.conns.offer(conn)) conn.dispose();
}
} else {
conn.dispose();
@@ -317,38 +393,82 @@ public final class Transport {
});
}
public static class TransportAddress {
public static class TransportNode {
protected InetSocketAddress address;
protected volatile boolean enable;
protected volatile long disabletime; //不可用时的时间, 为0表示可用
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
protected final BlockingQueue<AsyncConnection> conns;
public TransportAddress(InetSocketAddress address) {
protected final ConcurrentHashMap<String, Object> attributes = new ConcurrentHashMap<>();
public TransportNode(int poolmaxconns, InetSocketAddress address) {
this.address = address;
this.enable = true;
this.disabletime = 0;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
@ConstructorParameters({"address", "enable"})
public TransportAddress(InetSocketAddress address, boolean enable) {
@ConstructorParameters({"poolmaxconns", "address", "disabletime"})
public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) {
this.address = address;
this.enable = enable;
this.disabletime = disabletime;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
public int getPoolmaxconns() {
return this.conns.remainingCapacity() + this.conns.size();
}
public <T> T setAttribute(String name, T value) {
attributes.put(name, value);
return value;
}
@SuppressWarnings("unchecked")
public <T> T getAttribute(String name) {
return (T) attributes.get(name);
}
@SuppressWarnings("unchecked")
public <T> T removeAttribute(String name) {
return (T) attributes.remove(name);
}
public TransportNode clearAttributes() {
attributes.clear();
return this;
}
public ConcurrentHashMap<String, Object> getAttributes() {
return attributes;
}
public void setAttributes(ConcurrentHashMap<String, Object> map) {
attributes.clear();
if (map != null) attributes.putAll(map);
}
public InetSocketAddress getAddress() {
return address;
}
public boolean isEnable() {
return enable;
public long getDisabletime() {
return disabletime;
}
@ConvertColumn(ignore = true)
@ConvertDisabled
public BlockingQueue<AsyncConnection> getConns() {
return conns;
}
public void dispose() {
AsyncConnection conn;
while ((conn = conns.poll()) != null) {
conn.dispose();
}
}
@Override
public int hashCode() {
return this.address.hashCode();
@@ -359,10 +479,11 @@ public final class Transport {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
final TransportAddress other = (TransportAddress) obj;
final TransportNode other = (TransportNode) obj;
return this.address.equals(other.address);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}

View File

@@ -7,7 +7,7 @@ package org.redkale.net;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
@@ -22,6 +22,7 @@ import org.redkale.util.*;
/**
* System.getProperty("net.transport.pinginterval", "30") 心跳周期默认30秒
* System.getProperty("net.transport.checkinterval", "30") 检查不可用地址周期默认30秒
*
* <p>
* 详情见: https://redkale.org
@@ -36,8 +37,12 @@ public class TransportFactory {
@Comment("默认TCP写入超时秒数")
public static int DEFAULT_WRITETIMEOUTSECOND = 6;
public static final String NAME_POOLMAXCONNS = "poolmaxconns";
public static final String NAME_PINGINTERVAL = "pinginterval";
public static final String NAME_CHECKINTERVAL = "checkinterval";
protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName());
//传输端的线程池
@@ -59,6 +64,12 @@ public class TransportFactory {
protected final List<WeakReference<Transport>> transportReferences = new CopyOnWriteArrayList<>();
//连接池大小
protected int poolmaxconns = Integer.getInteger("net.transport.poolmaxconns", 100);
//检查不可用地址周期, 单位:秒
protected int checkinterval = Integer.getInteger("net.transport.checkinterval", 30);
//心跳周期, 单位:秒
protected int pinginterval;
@@ -68,8 +79,8 @@ public class TransportFactory {
//TCP写入超时秒数
protected int writeTimeoutSecond;
//ping的定时器
private ScheduledThreadPoolExecutor pingScheduler;
//ping和检查的定时器
private ScheduledThreadPoolExecutor scheduler;
protected SSLContext sslContext;
@@ -100,18 +111,28 @@ public class TransportFactory {
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
if (conf != null) {
this.pinginterval = conf.getIntValue(NAME_PINGINTERVAL, 0);
this.poolmaxconns = conf.getIntValue(NAME_POOLMAXCONNS, this.poolmaxconns);
this.pinginterval = conf.getIntValue(NAME_PINGINTERVAL, this.pinginterval);
this.checkinterval = conf.getIntValue(NAME_CHECKINTERVAL, this.checkinterval);
if (this.poolmaxconns < 2) this.poolmaxconns = 2;
if (this.pinginterval < 2) this.pinginterval = 2;
if (this.checkinterval < 2) this.checkinterval = 2;
}
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryTask-Thread");
t.setDaemon(true);
return t;
});
this.scheduler.scheduleAtFixedRate(() -> {
checks();
}, checkinterval, checkinterval, TimeUnit.SECONDS);
if (this.pinginterval > 0) {
if (this.pingScheduler == null && pingBuffer != null) {
if (pingBuffer != null) {
this.pingBuffer = pingBuffer.asReadOnlyBuffer();
this.pongLength = pongLength;
this.pingScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryPingTask-Thread");
t.setDaemon(true);
return t;
});
pingScheduler.scheduleAtFixedRate(() -> {
scheduler.scheduleAtFixedRate(() -> {
pings();
}, pinginterval, pinginterval, TimeUnit.SECONDS);
}
@@ -309,7 +330,7 @@ public class TransportFactory {
}
public void shutdownNow() {
if (this.pingScheduler != null) this.pingScheduler.shutdownNow();
if (this.scheduler != null) this.scheduler.shutdownNow();
try {
this.channelGroup.shutdownNow();
} catch (Exception e) {
@@ -317,8 +338,7 @@ public class TransportFactory {
}
}
private void pings() {
long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000;
private void checks() {
List<WeakReference> nulllist = new ArrayList<>();
for (WeakReference<Transport> ref : transportReferences) {
Transport transport = ref.get();
@@ -326,8 +346,39 @@ public class TransportFactory {
nulllist.add(ref);
continue;
}
List<BlockingQueue<AsyncConnection>> list = new ArrayList<>(transport.getAsyncConnectionPool().values());
for (final BlockingQueue<AsyncConnection> queue : list) {
Transport.TransportNode[] nodes = transport.getTransportNodes();
for (final Transport.TransportNode node : nodes) {
if (node.disabletime < 1) continue; //可用
try {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(transport.group);
channel.connect(node.address, node, new CompletionHandler<Void, Transport.TransportNode>() {
@Override
public void completed(Void result, Transport.TransportNode attachment) {
attachment.disabletime = 0;
}
@Override
public void failed(Throwable exc, Transport.TransportNode attachment) {
attachment.disabletime = System.currentTimeMillis();
}
});
} catch (Exception e) {
}
}
}
for (WeakReference ref : nulllist) {
transportReferences.remove(ref);
}
}
private void pings() {
long timex = System.currentTimeMillis() - (this.pinginterval < 15 ? this.pinginterval : (this.pinginterval - 3)) * 1000;
for (WeakReference<Transport> ref : transportReferences) {
Transport transport = ref.get();
if (transport == null) continue;
Transport.TransportNode[] nodes = transport.getTransportNodes();
for (final Transport.TransportNode node : nodes) {
final BlockingQueue<AsyncConnection> queue = node.conns;
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.getLastWriteTime() > timex && false) { //最近几秒内已经进行过IO操作
@@ -379,9 +430,6 @@ public class TransportFactory {
}
}
}
for (WeakReference ref : nulllist) {
transportReferences.remove(ref);
}
}
private static boolean checkName(String name) { //不能含特殊字符

View File

@@ -18,5 +18,25 @@ import java.util.concurrent.CompletableFuture;
*/
public interface TransportStrategy {
/**
* 创建AsyncConnection
*
* @param addr 服务器地址
* @param transport Transport
*
* @return AsyncConnection
*/
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr, Transport transport);
/**
* 回收AsyncConnection返回false表示使用Transport默认的回收实现 返回true表示自定义回收实现
*
* @param forceClose 是否强制关闭
* @param conn AsyncConnection
*
* @return boolean
*/
default boolean offerConnection(final boolean forceClose, AsyncConnection conn) {
return false;
}
}

View File

@@ -72,6 +72,7 @@ public class HttpContext extends Context {
//生成规则与SncpAsyncHandler.Factory 很类似
//-------------------------------------------------------------
final boolean handlerinterface = handlerClass.isInterface();
final String cpDesc = Type.getDescriptor(ConstructorParameters.class);
final String handlerClassName = handlerClass.getName().replace('.', '/');
final String handlerName = CompletionHandler.class.getName().replace('.', '/');
final String handlerDesc = Type.getDescriptor(CompletionHandler.class);
@@ -91,7 +92,7 @@ public class HttpContext extends Context {
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "(" + handlerDesc + ")V", null, null));
//mv.setDebug(true);
{
av0 = mv.visitAnnotation("Lorg/redkale/util/ConstructorParameters;", true);
av0 = mv.visitAnnotation(cpDesc, true);
{
AnnotationVisitor av1 = av0.visitArray("value");
av1.visit(null, "handler");

View File

@@ -126,7 +126,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
List<HttpServlet> list = removeHttpServlet(predicateEntry, predicateFilter);
return list == null || list.isEmpty() ? null : list.get(0);
}
@SuppressWarnings("unchecked")
public <T extends WebSocket> HttpServlet removeHttpServlet(Class<T> websocketOrServletType) {
Predicate<MappingEntry> predicateEntry = (t) -> {
Class type = t.servlet.getClass();
@@ -144,6 +144,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
return list == null || list.isEmpty() ? null : list.get(0);
}
@SuppressWarnings("unchecked")
public boolean addForbidURIReg(final String urlreg) {
if (urlreg == null || urlreg.isEmpty()) return false;
synchronized (excludeLock) {
@@ -169,6 +170,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
}
}
@SuppressWarnings("unchecked")
public boolean removeForbidURIReg(final String urlreg) {
if (urlreg == null || urlreg.isEmpty()) return false;
synchronized (excludeLock) {
@@ -198,6 +200,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
}
@Override
@SuppressWarnings("unchecked")
public void init(HttpContext context, AnyValue config) {
super.init(context, config); //必须要执行
Collection<HttpServlet> servlets = getServlets();
@@ -225,7 +228,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
}
String resServlet = resConfig.getValue("servlet", HttpResourceServlet.class.getName());
try {
this.resourceHttpServlet = (HttpServlet) Thread.currentThread().getContextClassLoader().loadClass(resServlet).newInstance();
this.resourceHttpServlet = (HttpServlet) Thread.currentThread().getContextClassLoader().loadClass(resServlet).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
this.resourceHttpServlet = new HttpResourceServlet();
logger.log(Level.WARNING, "init HttpResourceSerlvet(" + resServlet + ") error", e);
@@ -239,7 +242,7 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
for (AnyValue renderConfig : renderConfigs) {
String renderType = renderConfig.getValue("value");
try {
HttpRender render = (HttpRender) Thread.currentThread().getContextClassLoader().loadClass(renderType).newInstance();
HttpRender render = (HttpRender) Thread.currentThread().getContextClassLoader().loadClass(renderType).getDeclaredConstructor().newInstance();
for (HttpRender one : renders) {
if (one.getType().equals(render.getType())) throw new RuntimeException("HttpRender(" + renderType + ") repeat");
}

View File

@@ -134,15 +134,19 @@ public class HttpRequest extends Request<HttpContext> {
String value = array.toString(index, array.size() - index, charset).trim();
switch (name) {
case "Content-Type":
case "content-type":
this.contentType = value;
break;
case "Content-Length":
case "content-length":
this.contentLength = Long.decode(value);
break;
case "Host":
case "host":
this.host = value;
break;
case "Cookie":
case "cookie":
if (this.cookie == null || this.cookie.isEmpty()) {
this.cookie = value;
} else {
@@ -150,9 +154,13 @@ public class HttpRequest extends Request<HttpContext> {
}
break;
case "Connection":
case "connection":
this.connection = value;
this.setKeepAlive(!"close".equalsIgnoreCase(value));
break;
case "user-agent":
header.addValue("User-Agent", value);
break;
default:
header.addValue(name, value);
}

View File

@@ -59,7 +59,8 @@ public class HttpResourceServlet extends HttpServlet {
final String uri = path.toString().substring(rootstr.length()).replace('\\', '/');
//logger.log(Level.FINEST, "file(" + uri + ") happen " + event.kind() + " event");
if (event.kind() == ENTRY_DELETE) {
files.remove(uri);
FileEntry en = files.remove(uri);
if (en != null) en.remove();
} else if (event.kind() == ENTRY_MODIFY) {
FileEntry en = files.get(uri);
if (en != null && en.file != null) {
@@ -317,10 +318,8 @@ public class HttpResourceServlet extends HttpServlet {
}
}
@Override
protected void finalize() throws Throwable {
public void remove() {
if (this.content != null) this.servlet.cachedLength.add(0L - this.content.remaining());
super.finalize();
}
public long getCachedLength() {

View File

@@ -45,13 +45,10 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private static final Set<OpenOption> options = new HashSet<>();
private static final DateFormat GMT_DATE_FORMAT = new SimpleDateFormat("EEE, dd-MMM-yyyy HH:mm:ss z", Locale.ENGLISH);
private static final Map<Integer, String> httpCodes = new HashMap<>();
static {
options.add(StandardOpenOption.READ);
GMT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
httpCodes.put(100, "Continue");
httpCodes.put(101, "Switching Protocols");
@@ -99,6 +96,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
httpCodes.put(505, "HTTP Version Not Supported");
}
private final DateFormat gmtDateFormat = new SimpleDateFormat("EEE, dd-MMM-yyyy HH:mm:ss z", Locale.ENGLISH);
private int status = 200;
private String contentType = "text/plain; charset=utf-8";
@@ -142,6 +141,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.renders = renders;
this.hasRender = renders != null && !renders.isEmpty();
this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null;
gmtDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
}
@Override
@@ -270,7 +270,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param obj 输出对象
*/
public void finishJson(final Object obj) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), obj));
}
@@ -282,7 +282,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param objs 输出对象
*/
public void finishMapJson(final Object... objs) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = objs;
finish(request.getJsonConvert().convertMapTo(getBodyBufferSupplier(), objs));
}
@@ -294,7 +294,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param obj 输出对象
*/
public void finishJson(final JsonConvert convert, final Object obj) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
finish(convert.convertTo(getBodyBufferSupplier(), obj));
}
@@ -307,7 +307,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param objs 输出对象
*/
public void finishMapJson(final JsonConvert convert, final Object... objs) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = objs;
finish(convert.convertMapTo(getBodyBufferSupplier(), objs));
}
@@ -319,7 +319,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param obj 输出对象
*/
public void finishJson(final Type type, final Object obj) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
this.output = obj;
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), type, obj));
}
@@ -332,7 +332,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param obj 输出对象
*/
public void finishJson(final JsonConvert convert, final Type type, final Object obj) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
finish(convert.convertTo(getBodyBufferSupplier(), type, obj));
}
@@ -343,7 +343,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param objs 输出对象
*/
public void finishJson(final Object... objs) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = objs;
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), objs));
}
@@ -354,7 +354,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param ret RetResult输出对象
*/
public void finishJson(final org.redkale.service.RetResult ret) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = ret;
if (ret != null && !ret.isSuccess()) {
this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
@@ -370,7 +370,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param ret RetResult输出对象
*/
public void finishJson(final JsonConvert convert, final org.redkale.service.RetResult ret) {
this.contentType = "text/plain; charset=utf-8";
this.contentType = "application/json; charset=utf-8";
if (this.recycleListener != null) this.output = ret;
if (ret != null && !ret.isSuccess()) {
this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
@@ -493,8 +493,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
}
}
}
if (convert instanceof TextConvert) this.contentType = "text/plain; charset=utf-8";
if (convert instanceof JsonConvert) {
this.contentType = "application/json; charset=utf-8";
} else if (convert instanceof TextConvert) {
this.contentType = "text/plain; charset=utf-8";
}
if (this.recycleListener != null) this.output = obj;
if (obj instanceof org.redkale.service.RetResult) {
org.redkale.service.RetResult ret = (org.redkale.service.RetResult) obj;
@@ -842,6 +845,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
if (!this.request.isKeepAlive()) {
buffer.put("Connection: close\r\n".getBytes());
}
buffer.put(("Date: " + gmtDateFormat.format(new Date()) + "\r\n").getBytes());
buffer.put(serverNameBytes);
if (this.defaultAddHeaders != null) {
@@ -910,9 +914,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
if (cookie.getPortlist() != null) sb.append("; Port=").append(cookie.getPortlist());
if (cookie.getMaxAge() > 0) {
sb.append("; Max-Age=").append(cookie.getMaxAge());
synchronized (GMT_DATE_FORMAT) {
sb.append("; Expires=").append(GMT_DATE_FORMAT.format(new Date(System.currentTimeMillis() + cookie.getMaxAge() * 1000)));
}
sb.append("; Expires=").append(gmtDateFormat.format(new Date(System.currentTimeMillis() + cookie.getMaxAge() * 1000)));
}
if (cookie.getSecure()) sb.append("; Secure");
if (cookie.isHttpOnly()) sb.append("; HttpOnly");

View File

@@ -59,10 +59,12 @@ public class HttpScope {
return this;
}
@SuppressWarnings("unchecked")
public <T> T find(String name) {
return this.attributes == null ? null : (T) this.attributes.get(name);
}
@SuppressWarnings("unchecked")
public <T> T find(HttpScope parent, String name) {
T rs = this.attributes == null ? null : (T) this.attributes.get(name);
if (rs != null) return rs;

View File

@@ -274,7 +274,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
}
}.loadClass(newDynName.replace('/', '.'), bytes);
try {
HttpServlet instance = (HttpServlet) newClazz.newInstance();
HttpServlet instance = (HttpServlet) newClazz.getDeclaredConstructor().newInstance();
instance.getClass().getField(factfield).set(instance, this);
return instance;
} catch (Exception ex) {
@@ -294,7 +294,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
HttpMapping mapping = method.getAnnotation(HttpMapping.class);
this.ignore = mapping == null || !mapping.auth();
this.cacheseconds = mapping == null ? 0 : mapping.cacheseconds();
this.cache = cacheseconds > 0 ? new ConcurrentHashMap() : null;
this.cache = cacheseconds > 0 ? new ConcurrentHashMap<>() : null;
this.cacheHandler = cacheseconds > 0 ? (HttpResponse response, ByteBuffer[] buffers) -> {
int status = response.getStatus();
if (status != 200) return null;

View File

@@ -70,6 +70,7 @@ public class MimeType {
contentTypes.put("jpeg", "image/jpeg");
contentTypes.put("jpg", "image/jpeg");
contentTypes.put("js", "text/javascript");
contentTypes.put("json", "application/json");
contentTypes.put("kar", "audio/x-midi");
contentTypes.put("latex", "application/x-latex");
contentTypes.put("log", "text/plain");

View File

@@ -21,6 +21,7 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.net.Cryptor;
import org.redkale.service.*;
import org.redkale.util.*;
import org.redkale.source.Flipper;
@@ -129,7 +130,7 @@ public final class Rest {
static JsonConvert createJsonConvert(RestConvert[] converts) {
if (converts == null || converts.length < 1) return JsonConvert.root();
final JsonFactory childFactory = JsonFactory.root().createChild();
final JsonFactory childFactory = JsonFactory.create();
List<Class> types = new ArrayList<>();
for (RestConvert rc : converts) {
if (types.contains(rc.type())) throw new RuntimeException("@RestConvert type(" + rc.type() + ") repeat");
@@ -202,8 +203,8 @@ public final class Rest {
}
if (!valid) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") must have public or protected Constructor on createRestWebSocketServlet");
final String rwsname = ResourceFactory.formatResourceName(rws.name());
if (!checkName(rws.catalog())) throw new RuntimeException(webSocketType.getName() + " have illeal " + RestWebSocket.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(rwsname)) throw new RuntimeException(webSocketType.getName() + " have illeal " + RestWebSocket.class.getSimpleName() + ".name, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(rws.catalog())) throw new RuntimeException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(rwsname)) throw new RuntimeException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".name, only 0-9 a-z A-Z _ cannot begin 0-9");
//----------------------------------------------------------------------------------------
final Set<Field> resourcesFieldSet = new LinkedHashSet<>();
@@ -253,6 +254,10 @@ public final class Rest {
messageMethods.add(method);
}
//----------------------------------------------------------------------------------------
final String resDesc = Type.getDescriptor(Resource.class);
final String wsDesc = Type.getDescriptor(WebSocket.class);
final String wsParamDesc = Type.getDescriptor(WebSocketParam.class);
final String jsonConvertDesc = Type.getDescriptor(JsonConvert.class);
final String convertDisabledDesc = Type.getDescriptor(ConvertDisabled.class);
final String webSocketParamName = Type.getInternalName(WebSocketParam.class);
final String supDynName = WebSocketServlet.class.getName().replace('.', '/');
@@ -315,7 +320,7 @@ public final class Rest {
java.lang.reflect.Type fieldType = field.getGenericType();
fv = cw.visitField(ACC_PRIVATE, "_redkale_resource_" + i, Type.getDescriptor(field.getType()), fieldType == field.getType() ? null : Utility.getTypeDescriptor(fieldType), null);
{
av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true);
av0 = fv.visitAnnotation(resDesc, true);
av0.visit("name", res.name());
av0.visitEnd();
}
@@ -355,7 +360,7 @@ public final class Rest {
mv.visitEnd();
}
{ //createWebSocket 方法
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PROTECTED, "createWebSocket", "()Lorg/redkale/net/http/WebSocket;", "<G::Ljava/io/Serializable;T:Ljava/lang/Object;>()Lorg/redkale/net/http/WebSocket<TG;TT;>;", null));
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PROTECTED, "createWebSocket", "()" + wsDesc, "<G::Ljava/io/Serializable;T:Ljava/lang/Object;>()L" + WebSocket.class.getName().replace('.', '/') + "<TG;TT;>;", null));
mv.visitTypeInsn(NEW, newDynName + "$" + newDynWebSokcetSimpleName);
mv.visitInsn(DUP);
for (int i = 0; i < resourcesFields.size(); i++) {
@@ -368,7 +373,7 @@ public final class Rest {
mv.visitEnd();
}
{ //createRestOnMessageConsumer
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PROTECTED, "createRestOnMessageConsumer", "()Ljava/util/function/BiConsumer;", "()Ljava/util/function/BiConsumer<Lorg/redkale/net/http/WebSocket;Ljava/lang/Object;>;", null));
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PROTECTED, "createRestOnMessageConsumer", "()Ljava/util/function/BiConsumer;", "()Ljava/util/function/BiConsumer<" + wsDesc + "Ljava/lang/Object;>;", null));
mv.visitTypeInsn(NEW, newDynConsumerFullName);
mv.visitInsn(DUP);
mv.visitMethodInsn(INVOKESPECIAL, newDynConsumerFullName, "<init>", "()V", false);
@@ -475,7 +480,7 @@ public final class Rest {
mv.visitLdcInsn(method.getAnnotation(RestOnMessage.class).name());
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynWebSokcetFullName, "preOnMessage", "(Ljava/lang/String;Lorg/redkale/net/http/WebSocketParam;Ljava/lang/Runnable;)V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynWebSokcetFullName, "preOnMessage", "(Ljava/lang/String;" + wsParamDesc + "Ljava/lang/Runnable;)V", false);
mv.visitInsn(RETURN);
mv.visitMaxs(4, 2);
mv.visitEnd();
@@ -497,9 +502,9 @@ public final class Rest {
}
{ //toString
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null));
mv.visitMethodInsn(INVOKESTATIC, "org/redkale/convert/json/JsonConvert", "root", "()Lorg/redkale/convert/json/JsonConvert;", false);
mv.visitMethodInsn(INVOKESTATIC, JsonConvert.class.getName().replace('.', '/'), "root", "()" + jsonConvertDesc, false);
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/convert/json/JsonConvert", "convertTo", "(Ljava/lang/Object;)Ljava/lang/String;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, JsonConvert.class.getName().replace('.', '/'), "convertTo", "(Ljava/lang/Object;)Ljava/lang/String;", false);
mv.visitInsn(ARETURN);
mv.visitMaxs(2, 1);
mv.visitEnd();
@@ -532,9 +537,9 @@ public final class Rest {
}
{ //toString
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null));
mv.visitMethodInsn(INVOKESTATIC, "org/redkale/convert/json/JsonConvert", "root", "()Lorg/redkale/convert/json/JsonConvert;", false);
mv.visitMethodInsn(INVOKESTATIC, JsonConvert.class.getName().replace('.', '/'), "root", "()" + jsonConvertDesc, false);
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/convert/json/JsonConvert", "convertTo", "(Ljava/lang/Object;)Ljava/lang/String;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, JsonConvert.class.getName().replace('.', '/'), "convertTo", "(Ljava/lang/Object;)Ljava/lang/String;", false);
mv.visitInsn(ARETURN);
mv.visitMaxs(2, 1);
mv.visitEnd();
@@ -569,7 +574,7 @@ public final class Rest {
{ //_DynRestOnMessageConsumer class
ClassWriter cw2 = new ClassWriter(COMPUTE_FRAMES);
cw2.visit(V1_8, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynConsumerFullName, "Ljava/lang/Object;Ljava/util/function/BiConsumer<Lorg/redkale/net/http/WebSocket;Ljava/lang/Object;>;", "java/lang/Object", new String[]{"java/util/function/BiConsumer"});
cw2.visit(V1_8, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynConsumerFullName, "Ljava/lang/Object;Ljava/util/function/BiConsumer<" + wsDesc + "Ljava/lang/Object;>;", "java/lang/Object", new String[]{"java/util/function/BiConsumer"});
cw2.visitInnerClass(newDynConsumerFullName, newDynName, newDynConsumerSimpleName, ACC_PUBLIC + ACC_STATIC);
cw2.visitInnerClass(newDynMessageFullName, newDynName, newDynMessageSimpleName, ACC_PUBLIC + ACC_STATIC);
@@ -589,7 +594,7 @@ public final class Rest {
}
{ //accept函数
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "accept", "(Lorg/redkale/net/http/WebSocket;Ljava/lang/Object;)V", null, null));
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "accept", "(" + wsDesc + "Ljava/lang/Object;)V", null, null));
mv.visitVarInsn(ALOAD, 1);
mv.visitTypeInsn(CHECKCAST, newDynWebSokcetFullName);
mv.visitVarInsn(ASTORE, 3);
@@ -623,10 +628,10 @@ public final class Rest {
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC + ACC_BRIDGE + ACC_SYNTHETIC, "accept", "(Ljava/lang/Object;Ljava/lang/Object;)V", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitTypeInsn(CHECKCAST, "org/redkale/net/http/WebSocket");
mv.visitTypeInsn(CHECKCAST, WebSocket.class.getName().replace('.', '/'));
mv.visitVarInsn(ALOAD, 2);
mv.visitTypeInsn(CHECKCAST, "java/lang/Object");
mv.visitMethodInsn(INVOKEVIRTUAL, newDynConsumerFullName, "accept", "(Lorg/redkale/net/http/WebSocket;Ljava/lang/Object;)V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynConsumerFullName, "accept", "(" + wsDesc + "Ljava/lang/Object;)V", false);
mv.visitInsn(RETURN);
mv.visitMaxs(3, 3);
mv.visitEnd();
@@ -637,7 +642,14 @@ public final class Rest {
cw.visitEnd();
Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray());
try {
return (T) newClazz.newInstance();
T servlet = (T) newClazz.getDeclaredConstructor().newInstance();
if (rws.cryptor() != Cryptor.class) {
Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance();
Field cryptorField = newClazz.getSuperclass().getDeclaredField("cryptor"); //WebSocketServlet
cryptorField.setAccessible(true);
cryptorField.set(servlet, cryptor);
}
return servlet;
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -652,6 +664,7 @@ public final class Rest {
final String serviceDesc = Type.getDescriptor(serviceType);
final String webServletDesc = Type.getDescriptor(WebServlet.class);
final String resDesc = Type.getDescriptor(Resource.class);
final String reqDesc = Type.getDescriptor(HttpRequest.class);
final String respDesc = Type.getDescriptor(HttpResponse.class);
final String convertDesc = Type.getDescriptor(Convert.class);
@@ -662,6 +675,8 @@ public final class Rest {
final String flipperDesc = Type.getDescriptor(Flipper.class);
final String httprsDesc = Type.getDescriptor(HttpResult.class);
final String attrDesc = Type.getDescriptor(org.redkale.util.Attribute.class);
final String multiContextDesc = Type.getDescriptor(MultiContext.class);
final String multiContextName = MultiContext.class.getName().replace('.', '/');
final String mappingDesc = Type.getDescriptor(HttpMapping.class);
final String webparamDesc = Type.getDescriptor(HttpParam.class);
final String webparamsDesc = Type.getDescriptor(HttpParam.HttpParams.class);
@@ -686,8 +701,8 @@ public final class Rest {
final String defmodulename = getWebModuleNameLowerCase(serviceType);
final String bigmodulename = getWebModuleName(serviceType);
final String catalog = controller == null ? "" : controller.catalog();
if (!checkName(catalog)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(defmodulename)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".value, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(catalog)) throw new RuntimeException(serviceType.getName() + " have illegal " + RestService.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(defmodulename)) throw new RuntimeException(serviceType.getName() + " have illegal " + RestService.class.getSimpleName() + ".value, only 0-9 a-z A-Z _ cannot begin 0-9");
ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
FieldVisitor fv;
MethodDebugVisitor mv;
@@ -729,14 +744,14 @@ public final class Rest {
{ //注入 @Resource private XXXService _service;
fv = cw.visitField(ACC_PRIVATE, REST_SERVICE_FIELD_NAME, serviceDesc, null, null);
av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true);
av0 = fv.visitAnnotation(resDesc, true);
av0.visit("name", "");
av0.visitEnd();
fv.visitEnd();
}
{ //注入 @Resource(name = "APP_HOME") private File _redkale_home;
fv = cw.visitField(ACC_PRIVATE, "_redkale_home", Type.getDescriptor(File.class), null, null);
av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true);
av0 = fv.visitAnnotation(resDesc, true);
av0.visit("name", "APP_HOME");
av0.visitEnd();
fv.visitEnd();
@@ -1028,33 +1043,33 @@ public final class Rest {
if (mupload != null) { //存在文件上传
if (muploadType == byte[].class) {
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()Lorg/redkale/net/http/MultiContext;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()" + multiContextDesc, false);
mv.visitLdcInsn(mupload.maxLength());
mv.visitLdcInsn(mupload.fileNameReg());
mv.visitLdcInsn(mupload.contentTypeReg());
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/net/http/MultiContext", "partsFirstBytes", "(JLjava/lang/String;Ljava/lang/String;)[B", false);
mv.visitMethodInsn(INVOKEVIRTUAL, multiContextName, "partsFirstBytes", "(JLjava/lang/String;Ljava/lang/String;)[B", false);
mv.visitVarInsn(ASTORE, maxLocals);
uploadLocal = maxLocals;
} else if (muploadType == File.class) {
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()Lorg/redkale/net/http/MultiContext;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()" + multiContextDesc, false);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "_redkale_home", "Ljava/io/File;");
mv.visitLdcInsn(mupload.maxLength());
mv.visitLdcInsn(mupload.fileNameReg());
mv.visitLdcInsn(mupload.contentTypeReg());
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/net/http/MultiContext", "partsFirstFile", "(Ljava/io/File;JLjava/lang/String;Ljava/lang/String;)Ljava/io/File;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, multiContextName, "partsFirstFile", "(Ljava/io/File;JLjava/lang/String;Ljava/lang/String;)Ljava/io/File;", false);
mv.visitVarInsn(ASTORE, maxLocals);
uploadLocal = maxLocals;
} else if (muploadType == File[].class) { //File[]
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()Lorg/redkale/net/http/MultiContext;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()" + multiContextDesc, false);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "_redkale_home", "Ljava/io/File;");
mv.visitLdcInsn(mupload.maxLength());
mv.visitLdcInsn(mupload.fileNameReg());
mv.visitLdcInsn(mupload.contentTypeReg());
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/net/http/MultiContext", "partsFiles", "(Ljava/io/File;JLjava/lang/String;Ljava/lang/String;)[Ljava/io/File;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, multiContextName, "partsFiles", "(Ljava/io/File;JLjava/lang/String;Ljava/lang/String;)[Ljava/io/File;", false);
mv.visitVarInsn(ASTORE, maxLocals);
uploadLocal = maxLocals;
}
@@ -1684,7 +1699,7 @@ public final class Rest {
cw.visitEnd();
Class<?> newClazz = new RestClassLoader(loader).loadClass(newDynName.replace('/', '.'), cw.toByteArray());
try {
T obj = ((Class<T>) newClazz).newInstance();
T obj = ((Class<T>) newClazz).getDeclaredConstructor().newInstance();
for (Map.Entry<String, org.redkale.util.Attribute> en : restAttributes.entrySet()) {
Field attrField = newClazz.getDeclaredField(en.getKey());
attrField.setAccessible(true);

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import org.redkale.net.Cryptor;
/**
* 只能依附在WebSocket类上name默认为Service的类名小写并去掉Service字样及后面的字符串 (如HelloWebSocket/HelloWebSocketImpl的默认路径为 hello)。 <br>
@@ -66,6 +67,13 @@ public @interface RestWebSocket {
*/
int liveinterval() default WebSocketServlet.DEFAILT_LIVEINTERVAL;
/**
* 加密解密器
*
* @return Cryptor
*/
Class<? extends Cryptor> cryptor() default Cryptor.class;
/**
* 最大连接数, 小于1表示无限制
*

View File

@@ -14,6 +14,7 @@ import java.util.function.*;
import java.util.logging.*;
import java.util.stream.*;
import org.redkale.convert.Convert;
import org.redkale.net.Cryptor;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import static org.redkale.net.http.WebSocketServlet.*;
import org.redkale.util.*;
@@ -72,8 +73,11 @@ public class WebSocketEngine {
@Comment("最大消息体长度, 小于1表示无限制")
protected int wsmaxbody;
@Comment("加密解密器")
protected Cryptor cryptor;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval,
int wsmaxconns, int wsmaxbody, WebSocketNode node, Convert sendConvert, Logger logger) {
int wsmaxconns, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid;
this.single = single;
this.context = context;
@@ -82,6 +86,7 @@ public class WebSocketEngine {
this.liveinterval = liveinterval;
this.wsmaxconns = wsmaxconns;
this.wsmaxbody = wsmaxbody;
this.cryptor = cryptor;
this.logger = logger;
this.index = sequence.getAndIncrement();
}
@@ -213,7 +218,7 @@ public class WebSocketEngine {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null;
if (single) {
for (WebSocket websocket : websockets.values()) {
@@ -270,7 +275,7 @@ public class WebSocketEngine {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null;
if (single) {
for (Serializable userid : userids) {

View File

@@ -354,7 +354,7 @@ public abstract class WebSocketNode {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(message, last, userids);
}
final Object remoteMessage = formatRemoteMessage(message0);
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> future = null;
for (Serializable userid : userids) {
future = future == null ? sendOneMessage(remoteMessage, last, userid)
@@ -484,7 +484,9 @@ public abstract class WebSocketNode {
private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) {
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid));
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + JsonConvert.root().convertTo(message) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket want send message {userid:" + userid + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid);
if (this.sncpNodeAddresses == null || this.remoteNode == null) {

View File

@@ -10,9 +10,10 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.function.Supplier;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.convert.*;
import org.redkale.net.Cryptor;
/**
*
@@ -201,6 +202,12 @@ public final class WebSocketPacket {
this.last = last;
}
public String toSimpleString() {
if (payload != null) return payload;
if (bytes != null) return "byte[" + bytes.length + "]";
return this.toString();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : (receiveLength > 0 ? (", receivemsg=" + (receiveMessage instanceof byte[] ? ("byte[" + ((byte[]) receiveMessage).length + "]") : receiveMessage)) : "")) + (sendJson != null ? (", json=" + (sendMapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]";
@@ -210,10 +217,11 @@ public final class WebSocketPacket {
* 消息编码
*
* @param supplier Supplier
* @param cryptor Cryptor
*
* @return ByteBuffer[]
*/
ByteBuffer[] encode(final Supplier<ByteBuffer> supplier) {
ByteBuffer[] encode(final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer, final Cryptor cryptor) {
final byte opcode = (byte) (this.type.getValue() | 0x80);
if (this.sendConvert != null) {
Supplier<ByteBuffer> newsupplier = new Supplier<ByteBuffer>() {
@@ -232,6 +240,7 @@ public final class WebSocketPacket {
}
};
ByteBuffer[] buffers = this.sendMapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson);
if (cryptor != null) buffers = cryptor.encrypt(buffers, supplier, consumer);
int len = 0;
for (ByteBuffer buf : buffers) {
len += buf.remaining();
@@ -256,7 +265,27 @@ public final class WebSocketPacket {
}
ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128
final byte[] content = content();
byte[] content = content();
if (cryptor != null) {
ByteBuffer[] ss = new ByteBuffer[]{ByteBuffer.wrap(content)};
ByteBuffer[] bs = cryptor.encrypt(ss, supplier, consumer);
if (bs != ss) {
int r = 0;
for (ByteBuffer bb : bs) {
r += bb.remaining();
}
content = new byte[r];
int index = 0;
for (ByteBuffer bb : bs) {
int re = bb.remaining();
bb.get(content, index, re);
index += re;
}
for (ByteBuffer bb : bs) {
consumer.accept(bb);
}
}
}
final int len = content.length;
if (len <= 0x7D) { //125
buffer.put(opcode);
@@ -444,6 +473,10 @@ public final class WebSocketPacket {
}
void parseReceiveMessage(WebSocket webSocket, ByteBuffer... buffers) {
if (webSocket._engine.cryptor != null) {
HttpContext context = webSocket._engine.context;
buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer());
}
if (this.type == FrameType.TEXT) {
Convert textConvert = webSocket.getTextConvert();
if (textConvert == null) {

View File

@@ -226,7 +226,7 @@ class WebSocketRunner implements Runnable {
return futureResult;
}
}
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier());
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor);
if (debug) context.getLogger().log(Level.FINEST, "sending websocket message: " + packet);
try {
this.lastSendTime = System.currentTimeMillis();
@@ -276,7 +276,7 @@ class WebSocketRunner implements Runnable {
}
if (entry != null) {
future = entry.future;
ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier());
ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), webSocket._engine.cryptor);
lastSendTime = System.currentTimeMillis();
if (debug) context.getLogger().log(Level.FINEST, "sending websocket message: " + entry.packet);
channel.write(buffers, buffers, this);

View File

@@ -17,6 +17,7 @@ import java.util.function.BiConsumer;
import java.util.logging.*;
import javax.annotation.*;
import org.redkale.convert.Convert;
import org.redkale.net.Cryptor;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -52,6 +53,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Comment("最大消息体长度, 小于1表示无限制")
public static final String WEBPARAM__WSMAXBODY = "wsmaxbody";
@Comment("加密解密器")
public static final String WEBPARAM__CRYPTOR = "cryptor";
@Comment("WebScoket服务器给客户端进行ping操作的默认间隔时间, 单位: 秒")
public static final int DEFAILT_LIVEINTERVAL = 15;
@@ -78,6 +82,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.anyuser
protected boolean anyuser = false;
//同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到
protected Cryptor cryptor;
@Resource(name = "jsonconvert")
protected Convert jsonConvert;
@@ -124,12 +131,26 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert;
{
AnyValue props = conf;
if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties");
if (props != null) {
String cryptorClass = props.getValue(WEBPARAM__CRYPTOR);
if (cryptorClass != null && !cryptorClass.isEmpty()) {
try {
this.cryptor = (Cryptor) Thread.currentThread().getContextClassLoader().loadClass(cryptorClass).getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsmaxbody, this.node, this.sendConvert, logger);
this.single, context, liveinterval, wsmaxconns, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger);
this.node.init(conf);
this.node.localEngine.init(conf);
}
@Override

View File

@@ -280,6 +280,7 @@ public abstract class Sncp {
final List<Method> methods = SncpClient.parseMethod(serviceImplClass);
final String supDynName = serviceImplClass.getName().replace('.', '/');
final String clientName = SncpClient.class.getName().replace('.', '/');
final String resDesc = Type.getDescriptor(Resource.class);
final String clientDesc = Type.getDescriptor(SncpClient.class);
final String anyValueDesc = Type.getDescriptor(AnyValue.class);
final String sncpDynDesc = Type.getDescriptor(SncpDyn.class);
@@ -305,7 +306,7 @@ public abstract class Sncp {
cw.visit(V1_8, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null, supDynName, null);
{
av0 = cw.visitAnnotation("Ljavax/annotation/Resource;", true);
av0 = cw.visitAnnotation(resDesc, true);
av0.visit("name", name);
av0.visitEnd();
}
@@ -754,7 +755,7 @@ public abstract class Sncp {
final AnyValue conf) {
try {
final Class newClazz = createLocalServiceClass(classLoader, name, serviceImplClass);
T rs = (T) newClazz.newInstance();
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
//--------------------------------------
Service remoteService = null;
{
@@ -881,6 +882,7 @@ public abstract class Sncp {
if (!java.lang.reflect.Modifier.isPublic(mod)) return null;
final String supDynName = serviceTypeOrImplClass.getName().replace('.', '/');
final String clientName = SncpClient.class.getName().replace('.', '/');
final String resDesc = Type.getDescriptor(Resource.class);
final String clientDesc = Type.getDescriptor(SncpClient.class);
final String sncpDynDesc = Type.getDescriptor(SncpDyn.class);
final String anyValueDesc = Type.getDescriptor(AnyValue.class);
@@ -888,7 +890,7 @@ public abstract class Sncp {
String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceTypeOrImplClass.getSimpleName();
try {
Class newClazz = loader.loadClass(newDynName.replace('/', '.'));
T rs = (T) newClazz.newInstance();
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups));
@@ -907,7 +909,7 @@ public abstract class Sncp {
cw.visit(V1_8, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynName, null, serviceTypeOrImplClass.isInterface() ? "java/lang/Object" : supDynName, serviceTypeOrImplClass.isInterface() ? new String[]{supDynName} : null);
{
av0 = cw.visitAnnotation("Ljavax/annotation/Resource;", true);
av0 = cw.visitAnnotation(resDesc, true);
av0.visit("name", name);
av0.visitEnd();
}
@@ -1069,7 +1071,7 @@ public abstract class Sncp {
}
}.loadClass(newDynName.replace('/', '.'), bytes);
try {
T rs = (T) newClazz.newInstance();
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups));

View File

@@ -102,6 +102,7 @@ public interface SncpAsyncHandler<V, A> extends CompletionHandler<V, A> {
final boolean handlerinterface = handlerClass.isInterface();
final String handlerClassName = handlerClass.getName().replace('.', '/');
final String sncpHandlerName = SncpAsyncHandler.class.getName().replace('.', '/');
final String cpDesc = Type.getDescriptor(ConstructorParameters.class);
final String sncpHandlerDesc = Type.getDescriptor(SncpAsyncHandler.class);
final String sncpFutureDesc = Type.getDescriptor(CompletableFuture.class);
final String newDynName = handlerClass.getName().replace('.', '/') + "_Dync" + SncpAsyncHandler.class.getSimpleName() + "_" + (System.currentTimeMillis() % 10000);
@@ -124,7 +125,7 @@ public interface SncpAsyncHandler<V, A> extends CompletionHandler<V, A> {
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "(" + sncpHandlerDesc + ")V", null, null));
//mv.setDebug(true);
{
av0 = mv.visitAnnotation("org/redkale/util/ConstructorParameters;", true);
av0 = mv.visitAnnotation(cpDesc, true);
{
AnnotationVisitor av1 = av0.visitArray("value");
av1.visit(null, "sncphandler");

View File

@@ -441,6 +441,7 @@ public final class SncpClient {
}
}
@SuppressWarnings("unchecked")
public void success() {
future.complete(this.body);
transport.offerBuffer(buffer);
@@ -547,6 +548,7 @@ public final class SncpClient {
protected final Creator<? extends CompletableFuture> futureCreator;
@SuppressWarnings("unchecked")
public SncpAction(final Class clazz, Method method, DLong actionid) {
this.actionid = actionid == null ? Sncp.hash(method) : actionid;
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), clazz);
@@ -595,7 +597,7 @@ public final class SncpClient {
for (Annotation ann : anns[i]) {
if (ann.annotationType() == RpcCall.class) {
try {
atts[i + 1] = ((RpcCall) ann).value().newInstance();
atts[i + 1] = ((RpcCall) ann).value().getDeclaredConstructor().newInstance();
hasattr = true;
} catch (Exception e) {
logger.log(Level.SEVERE, RpcCall.class.getSimpleName() + ".attribute cannot a newInstance for" + method, e);

View File

@@ -105,6 +105,7 @@ public final class SncpDynServlet extends SncpServlet {
}
@Override
@SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (bufferSupplier == null) {
bufferSupplier = request.getContext().getBufferSupplier();
@@ -586,7 +587,7 @@ public final class SncpDynServlet extends SncpServlet {
}
}.loadClass(newDynName.replace('/', '.'), bytes);
try {
SncpServletAction instance = (SncpServletAction) newClazz.newInstance();
SncpServletAction instance = (SncpServletAction) newClazz.getDeclaredConstructor().newInstance();
instance.method = method;
java.lang.reflect.Type[] ptypes = TypeToken.getGenericType(method.getGenericParameterTypes(), serviceClass);
java.lang.reflect.Type[] types = new java.lang.reflect.Type[ptypes.length + 1];
@@ -606,7 +607,7 @@ public final class SncpDynServlet extends SncpServlet {
for (Annotation ann : anns[i]) {
if (ann.annotationType() == RpcCall.class) {
try {
atts[i + 1] = ((RpcCall) ann).value().newInstance();
atts[i + 1] = ((RpcCall) ann).value().getDeclaredConstructor().newInstance();
hasattr = true;
} catch (Exception e) {
logger.log(Level.SEVERE, RpcCall.class.getSimpleName() + ".attribute cannot a newInstance for" + method, e);

View File

@@ -6,6 +6,7 @@
package org.redkale.service;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.redkale.convert.json.*;
/**
@@ -57,6 +58,10 @@ public class RetResult<T> {
return new RetResult();
}
public static <T> CompletableFuture<RetResult<T>> successFuture() {
return CompletableFuture.completedFuture(new RetResult());
}
/**
* 判断结果是否成功返回, retcode = 0 视为成功, 否则视为错误码
*

View File

@@ -9,8 +9,9 @@ import static org.redkale.net.http.WebSocket.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.logging.Level;
import org.redkale.net.WorkThread;
import org.redkale.net.http.*;
import org.redkale.util.*;
@@ -39,11 +40,19 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
public CompletableFuture<List<String>> getWebSocketAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteWebSocketAddresses(targetAddress, groupid);
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
ExecutorService executor = null;
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
executor = ((WorkThread) thread).getExecutor();
}
if (executor == null) executor = ForkJoinPool.commonPool();
return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>();
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
return rs;
});
}, executor);
}
@Override

View File

@@ -27,6 +27,7 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
@SuppressWarnings("unchecked")
@Local
@AutoLoad(false)
@ResourceType(CacheSource.class)
@@ -85,6 +86,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
@SuppressWarnings("unchecked")
public void init(AnyValue conf) {
if (this.convert == null) this.convert = this.defaultConvert;
if (this.convert == null) this.convert = JsonConvert.root();
@@ -104,7 +106,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler");
if (expireHandlerClass != null) {
try {
this.expireHandler = (Consumer<CacheEntry>) Thread.currentThread().getContextClassLoader().loadClass(expireHandlerClass).newInstance();
this.expireHandler = (Consumer<CacheEntry>) Thread.currentThread().getContextClassLoader().loadClass(expireHandlerClass).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
logger.log(Level.SEVERE, self.getClass().getSimpleName() + " new expirehandler class (" + expireHandlerClass + ") instance error", e);
}
@@ -320,6 +322,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
@SuppressWarnings("unchecked")
public V get(String key) {
if (key == null) return null;
CacheEntry entry = container.get(key);
@@ -362,6 +365,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
@SuppressWarnings("unchecked")
public V getAndRefresh(String key, final int expireSeconds) {
if (key == null) return null;
CacheEntry entry = container.get(key);
@@ -375,6 +379,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
@SuppressWarnings("unchecked")
public String getStringAndRefresh(String key, final int expireSeconds) {
if (key == null) return null;
CacheEntry entry = container.get(key);

View File

@@ -14,6 +14,11 @@ import org.redkale.convert.json.JsonFactory;
import org.redkale.util.ConstructorParameters;
/**
* Redkale中缓存数据源的核心类。 主要供业务开发者使用, 技术开发者提供CacheSource的实现。<br>
* CacheSource提供三种数据类型操作: String、Long和泛型指定的数据类型。<br>
* String统一用setString、getString等系列方法。<br>
* Long统一用setLong、getLong、incr等系列方法。<br>
* 其他则供自定义数据类型使用。
*
* @param <V> value的类型
* <p>

View File

@@ -18,6 +18,7 @@ import org.redkale.util.*;
* @param <T> Entity类的类型
* @param <F> 字段的类型
*/
@SuppressWarnings("unchecked")
public final class DataCallArrayAttribute<T, F> implements Attribute<T[], F> {
public static final DataCallArrayAttribute instance = new DataCallArrayAttribute();

View File

@@ -9,7 +9,8 @@ import java.io.*;
import java.net.URL;
import java.sql.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
@@ -40,6 +41,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected URL conf;
protected int threads;
protected ExecutorService executor;
protected boolean cacheForbidden;
protected PoolJdbcSource readPool;
@@ -54,6 +59,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
this.preConstruct(unitName, readprop, writeprop);
this.initByProperties(unitName, readprop, writeprop);
}
public DataJdbcSource() {
@@ -85,8 +91,36 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
this.initByProperties(unitName, readprop, writeprop);
}
@Override
protected ExecutorService getExecutor() {
return executor;
}
@Override
public void destroy(AnyValue config) {
if (this.executor != null) this.executor.shutdownNow();
}
//构造前调用
protected void preConstruct(String unitName, Properties readprop, Properties writeprop) {
final AtomicInteger counter = new AtomicInteger();
this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
final Thread.UncaughtExceptionHandler ueh = (t, e) -> {
logger.log(Level.SEVERE, "DataJdbcSource error", e);
};
this.executor = Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
String s = "" + counter.incrementAndGet();
if (s.length() == 1) {
s = "00" + s;
} else if (s.length() == 2) {
s = "0" + s;
}
t.setName("DataJdbcSource-Thread-" + s);
t.setUncaughtExceptionHandler(ueh);
return t;
});
}
protected void initByProperties(String unitName, Properties readprop, Properties writeprop) {
@@ -323,6 +357,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++i, ((AtomicLong) val).get());
} else {
prestmt.setObject(++i, val);
}
@@ -599,6 +637,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++k, blob);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++k, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++k, ((AtomicLong) val).get());
} else {
prestmt.setObject(++k, val);
}

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.*;
@@ -226,6 +227,10 @@ public final class EntityCache<T> {
id = ((Number) id).byteValue();
} else if (atype == double.class || atype == Double.class) {
id = ((Number) id).doubleValue();
} else if (atype == AtomicInteger.class) {
id = new AtomicInteger(((Number) id).intValue());
} else if (atype == AtomicLong.class) {
id = new AtomicLong(((Number) id).longValue());
}
}
return this.map.containsKey(id);
@@ -299,11 +304,11 @@ public final class EntityCache<T> {
if (filter != null) stream = stream.filter(filter);
switch (func) {
case AVG:
if (attr.type() == int.class || attr.type() == Integer.class) {
OptionalDouble rs = stream.mapToInt(x -> (Integer) attr.get(x)).average();
if (attr.type() == int.class || attr.type() == Integer.class || attr.type() == AtomicInteger.class) {
OptionalDouble rs = stream.mapToInt(x -> ((Number) attr.get(x)).intValue()).average();
return rs.isPresent() ? (int) rs.getAsDouble() : defResult;
} else if (attr.type() == long.class || attr.type() == Long.class) {
OptionalDouble rs = stream.mapToLong(x -> (Long) attr.get(x)).average();
} else if (attr.type() == long.class || attr.type() == Long.class || attr.type() == AtomicLong.class) {
OptionalDouble rs = stream.mapToLong(x -> ((Number) attr.get(x)).longValue()).average();
return rs.isPresent() ? (long) rs.getAsDouble() : defResult;
} else if (attr.type() == short.class || attr.type() == Short.class) {
OptionalDouble rs = stream.mapToInt(x -> ((Short) attr.get(x)).intValue()).average();
@@ -322,11 +327,11 @@ public final class EntityCache<T> {
return stream.map(x -> attr.get(x)).distinct().count();
case MAX:
if (attr.type() == int.class || attr.type() == Integer.class) {
OptionalInt rs = stream.mapToInt(x -> (Integer) attr.get(x)).max();
if (attr.type() == int.class || attr.type() == Integer.class || attr.type() == AtomicInteger.class) {
OptionalInt rs = stream.mapToInt(x -> ((Number) attr.get(x)).intValue()).max();
return rs.isPresent() ? rs.getAsInt() : defResult;
} else if (attr.type() == long.class || attr.type() == Long.class) {
OptionalLong rs = stream.mapToLong(x -> (Long) attr.get(x)).max();
} else if (attr.type() == long.class || attr.type() == Long.class || attr.type() == AtomicLong.class) {
OptionalLong rs = stream.mapToLong(x -> ((Number) attr.get(x)).longValue()).max();
return rs.isPresent() ? rs.getAsLong() : defResult;
} else if (attr.type() == short.class || attr.type() == Short.class) {
OptionalInt rs = stream.mapToInt(x -> ((Short) attr.get(x)).intValue()).max();
@@ -341,11 +346,11 @@ public final class EntityCache<T> {
throw new RuntimeException("getNumberResult error(type:" + type + ", attr.declaringClass: " + attr.declaringClass() + ", attr.field: " + attr.field() + ", attr.type: " + attr.type());
case MIN:
if (attr.type() == int.class || attr.type() == Integer.class) {
OptionalInt rs = stream.mapToInt(x -> (Integer) attr.get(x)).min();
if (attr.type() == int.class || attr.type() == Integer.class || attr.type() == AtomicInteger.class) {
OptionalInt rs = stream.mapToInt(x -> ((Number) attr.get(x)).intValue()).min();
return rs.isPresent() ? rs.getAsInt() : defResult;
} else if (attr.type() == long.class || attr.type() == Long.class) {
OptionalLong rs = stream.mapToLong(x -> (Long) attr.get(x)).min();
} else if (attr.type() == long.class || attr.type() == Long.class || attr.type() == AtomicLong.class) {
OptionalLong rs = stream.mapToLong(x -> ((Number) attr.get(x)).longValue()).min();
return rs.isPresent() ? rs.getAsLong() : defResult;
} else if (attr.type() == short.class || attr.type() == Short.class) {
OptionalInt rs = stream.mapToInt(x -> ((Short) attr.get(x)).intValue()).min();
@@ -360,10 +365,10 @@ public final class EntityCache<T> {
throw new RuntimeException("getNumberResult error(type:" + type + ", attr.declaringClass: " + attr.declaringClass() + ", attr.field: " + attr.field() + ", attr.type: " + attr.type());
case SUM:
if (attr.type() == int.class || attr.type() == Integer.class) {
return stream.mapToInt(x -> (Integer) attr.get(x)).sum();
} else if (attr.type() == long.class || attr.type() == Long.class) {
return stream.mapToLong(x -> (Long) attr.get(x)).sum();
if (attr.type() == int.class || attr.type() == Integer.class || attr.type() == AtomicInteger.class) {
return stream.mapToInt(x -> ((Number) attr.get(x)).intValue()).sum();
} else if (attr.type() == long.class || attr.type() == Long.class || attr.type() == AtomicLong.class) {
return stream.mapToLong(x -> ((Number) attr.get(x)).longValue()).sum();
} else if (attr.type() == short.class || attr.type() == Short.class) {
return (short) stream.mapToInt(x -> ((Short) attr.get(x)).intValue()).sum();
} else if (attr.type() == float.class || attr.type() == Float.class) {
@@ -628,6 +633,16 @@ public final class EntityCache<T> {
newval = numb.doubleValue();
} else if (ft == byte.class || ft == Byte.class) {
newval = numb.byteValue();
} else if (ft == AtomicInteger.class) {
newval = new AtomicInteger(numb.intValue());
} else if (ft == AtomicLong.class) {
newval = new AtomicLong(numb.longValue());
}
} else {
if (ft == AtomicInteger.class && newval != null && newval.getClass() != AtomicInteger.class) {
newval = new AtomicInteger(((Number) newval).intValue());
} else if (ft == AtomicLong.class && newval != null && newval.getClass() != AtomicLong.class) {
newval = new AtomicLong(((Number) newval).longValue());
}
}
attr.set(rs, (V) newval);
@@ -657,9 +672,9 @@ public final class EntityCache<T> {
final String func = sub[0].substring(0, pos);
if ("ABS".equalsIgnoreCase(func)) {
Function getter = null;
if (pattr.type() == int.class || pattr.type() == Integer.class) {
if (pattr.type() == int.class || pattr.type() == Integer.class || pattr.type() == AtomicInteger.class) {
getter = x -> Math.abs(((Number) pattr.get((T) x)).intValue());
} else if (pattr.type() == long.class || pattr.type() == Long.class) {
} else if (pattr.type() == long.class || pattr.type() == Long.class || pattr.type() == AtomicLong.class) {
getter = x -> Math.abs(((Number) pattr.get((T) x)).longValue());
} else if (pattr.type() == float.class || pattr.type() == Float.class) {
getter = x -> Math.abs(((Number) pattr.get((T) x)).floatValue());

View File

@@ -10,6 +10,7 @@ import java.lang.reflect.*;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import javax.persistence.*;
@@ -141,7 +142,7 @@ public final class EntityInfo<T> {
static <T> EntityInfo<T> load(Class<T> clazz, final boolean cacheForbidden, final Properties conf,
DataSource source, BiFunction<DataSource, Class, List> fullloader) {
EntityInfo rs = entityInfos.get(clazz);
if (rs != null) return rs;
if (rs != null && (rs.cache == null || rs.cache.isFullLoaded())) return rs;
synchronized (entityInfos) {
rs = entityInfos.get(clazz);
if (rs == null) {
@@ -191,7 +192,7 @@ public final class EntityInfo<T> {
this.table = null;
BiFunction<DataSource, Class, List> loader = null;
try {
loader = type.getAnnotation(VirtualEntity.class).loader().newInstance();
loader = type.getAnnotation(VirtualEntity.class).loader().getDeclaredConstructor().newInstance();
} catch (Exception e) {
logger.log(Level.SEVERE, type + " init @VirtualEntity.loader error", e);
}
@@ -204,7 +205,7 @@ public final class EntityInfo<T> {
DistributeTable dt = type.getAnnotation(DistributeTable.class);
DistributeTableStrategy dts = null;
try {
dts = (dt == null) ? null : dt.strategy().newInstance();
dts = (dt == null) ? null : dt.strategy().getDeclaredConstructor().newInstance();
} catch (Exception e) {
logger.log(Level.SEVERE, type + " init DistributeTableStrategy error", e);
}
@@ -760,6 +761,18 @@ public final class EntityInfo<T> {
} else if (t == char.class) {
o = (char) 0;
}
} else if (t == AtomicInteger.class) {
if (o != null) {
o = new AtomicInteger(((Number) o).intValue());
} else {
o = new AtomicInteger();
}
} else if (t == AtomicLong.class) {
if (o != null) {
o = new AtomicLong(((Number) o).longValue());
} else {
o = new AtomicLong();
}
}
}
return o;

View File

@@ -47,7 +47,7 @@ public class PoolJdbcSource {
private final String stype; // "" 或 "read" 或 "write"
private final int max;
private final int maxconns;
private String url;
@@ -65,8 +65,8 @@ public class PoolJdbcSource {
this.url = prop.getProperty(JDBC_URL);
this.user = prop.getProperty(JDBC_USER);
this.password = prop.getProperty(JDBC_PWD);
this.max = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
this.queue = new ArrayBlockingQueue<>(this.max);
this.maxconns = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
this.queue = new ArrayBlockingQueue<>(this.maxconns);
this.listener = new ConnectionEventListener() {
@Override
@@ -154,7 +154,7 @@ public class PoolJdbcSource {
}
}
final Class clazz = Thread.currentThread().getContextClassLoader().loadClass(source);
Object pdsource = clazz.newInstance();
Object pdsource = clazz.getDeclaredConstructor().newInstance();
if (source.contains(".postgresql.")) {
Class driver = Thread.currentThread().getContextClassLoader().loadClass("org.postgresql.Driver");
Properties properties = (Properties) driver.getMethod("parseURL", String.class, Properties.class).invoke(null, url, new Properties());
@@ -287,7 +287,7 @@ public class PoolJdbcSource {
}
PooledConnection result = queue.poll();
if (result == null) {
if (usingCounter.get() >= max) {
if (usingCounter.get() >= maxconns) {
try {
result = queue.poll(6, TimeUnit.SECONDS);
} catch (Exception t) {

View File

@@ -1,97 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.util;
import java.util.function.BiConsumer;
import java.nio.channels.CompletionHandler;
import java.util.function.*;
/**
* 异步回调函数
*
*
* <p>
* 详情见: https://redkale.org
*
* @deprecated 使用 java.nio.channels.CompletionHandler 代替
* @author zhangjx
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
*/
@Deprecated
public interface AsyncHandler<V, A> extends CompletionHandler<V, A> {
/**
* 创建 AsyncHandler 对象
*
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return AsyncHandler
*/
public static <V, A> AsyncHandler<V, A> create(final BiConsumer<V, A> success, final BiConsumer<Throwable, A> fail) {
return new AsyncHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
if (success != null) success.accept(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (fail != null) fail.accept(exc, attachment);
}
};
}
/**
* 创建没有返回结果的 AsyncHandler 对象
*
* @param <A> 附件对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return AsyncHandler
*/
public static <A> AsyncHandler<Void, A> create(final Consumer<A> success, final BiConsumer<Throwable, A> fail) {
return new AsyncHandler<Void, A>() {
@Override
public void completed(Void result, A attachment) {
if (success != null) success.accept(attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (fail != null) fail.accept(exc, attachment);
}
};
}
/**
* 创建没有附件对象的 AsyncNoResultHandler 对象
*
* @param <V> 结果对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return AsyncHandler
*/
public static <V> AsyncHandler<V, Void> create(final Consumer<V> success, final Consumer<Throwable> fail) {
return new AsyncHandler<V, Void>() {
@Override
public void completed(V result, Void attachment) {
if (success != null) success.accept(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
if (fail != null) fail.accept(exc);
}
};
}
}

View File

@@ -441,7 +441,7 @@ public interface Attribute<T, F> {
+ fieldname.substring(fieldname.indexOf('.') + 1) + "_" + pcolumn.getSimpleName().replace("[]", "Array");
}
try {
return (Attribute) loader.loadClass(newDynName.replace('/', '.')).newInstance();
return (Attribute) loader.loadClass(newDynName.replace('/', '.')).getDeclaredConstructor().newInstance();
} catch (Throwable ex) {
}
//---------------------------------------------------
@@ -593,7 +593,7 @@ public interface Attribute<T, F> {
}
}.loadClass(newDynName.replace('/', '.'), bytes);
try {
return creatorClazz.newInstance();
return creatorClazz.getDeclaredConstructor().newInstance();
} catch (Exception ex) {
throw new RuntimeException(ex);
}

View File

@@ -246,7 +246,7 @@ public interface Creator<T> {
newDynName = interName + "_Dyn" + Creator.class.getSimpleName();
}
try {
return (Creator) loader.loadClass(newDynName.replace('/', '.')).newInstance();
return (Creator) loader.loadClass(newDynName.replace('/', '.')).getDeclaredConstructor().newInstance();
} catch (Throwable ex) {
}
@@ -494,7 +494,7 @@ public interface Creator<T> {
return defineClass(name, b, 0, b.length);
}
}.loadClass(newDynName.replace('/', '.'), bytes);
return (Creator) resultClazz.newInstance();
return (Creator) resultClazz.getDeclaredConstructor().newInstance();
} catch (Exception ex) {
throw new RuntimeException(ex);
}

View File

@@ -98,11 +98,6 @@ public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
}
}
@Deprecated
public void offer(final T e) {
accept(e);
}
public long getCreatCount() {
return creatCounter.longValue();
}

View File

@@ -17,7 +17,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "1.9.0";
return "1.9.2";
}
public static int getMajorVersion() {

View File

@@ -5,7 +5,9 @@
*/
package org.redkale.util;
import java.lang.reflect.*;
import java.net.*;
import java.nio.file.Paths;
import java.util.HashSet;
/**
@@ -35,6 +37,15 @@ public class RedkaleClassLoader extends URLClassLoader {
public URL[] getAllURLs() {
ClassLoader loader = this;
HashSet<URL> set = new HashSet<>();
String appPath = System.getProperty("java.class.path");
if (appPath != null && !appPath.isEmpty()) {
for (String path : appPath.replace(":/", "&&").replace(":\\", "##").replace(':', ';').split(";")) {
try {
set.add(Paths.get(path.replace("&&", ":/").replace("##", ":\\")).toRealPath().toFile().toURI().toURL());
} catch (Exception e) {
}
}
}
do {
String loaderName = loader.getClass().getName();
if (loaderName.startsWith("sun.") && loaderName.contains("ExtClassLoader")) continue;
@@ -42,6 +53,33 @@ public class RedkaleClassLoader extends URLClassLoader {
for (URL url : ((URLClassLoader) loader).getURLs()) {
set.add(url);
}
} else { //可能JDK9及以上
loader.getResource("org.redkale"); //必须要运行一次确保URLClassPath的值被填充完毕
Class loaderClazz = loader.getClass();
Object ucp = null;
do { //读取 java.base/jdk.internal.loader.BuiltinClassLoader的URLClassPath ucp值
try {
//需要在命令行里加入: --add-opens java.base/jdk.internal.loader=ALL-UNNAMED
Field field = loaderClazz.getDeclaredField("ucp");
field.setAccessible(true);
ucp = field.get(loader);
break;
} catch (Throwable e) {
}
} while ((loaderClazz = loaderClazz.getSuperclass()) != Object.class);
if (ucp != null) { //URLClassPath
URL[] urls = null;
try { //读取 java.base/jdk.internal.loader.URLClassPath的urls值
Method method = ucp.getClass().getMethod("getURLs");
urls = (URL[]) method.invoke(ucp);
} catch (Exception e) {
}
if (urls != null) {
for (URL url : urls) {
set.add(url);
}
}
}
}
} while ((loader = loader.getParent()) != null);
return set.toArray(new URL[set.size()]);

View File

@@ -1,6 +1,7 @@
package org.redkale.util;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.function.*;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.*;
@@ -8,7 +9,6 @@ import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
/**
* JavaBean类对象的拷贝相同的字段名会被拷贝 <br>
* <b>注意</b>: 拷贝类与被拷贝类的字段可见模式必须一致, 要么都是public field, 要么都是getter、setter模式。 <br>
*
* <p>
* 详情见: https://redkale.org
@@ -23,25 +23,44 @@ public interface Reproduce<D, S> extends BiFunction<D, S, D> {
public D apply(D dest, S src);
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass) {
return create(destClass, srcClass, null);
return create(destClass, srcClass, (BiPredicate) null, (Map<String, String>) null);
}
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass, final Map<String, String> names) {
return create(destClass, srcClass, (BiPredicate) null, names);
}
@SuppressWarnings("unchecked")
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass, final Predicate<String> columnPredicate) {
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass, final Predicate<String> srcColumnPredicate) {
return create(destClass, srcClass, (sc, m) -> srcColumnPredicate.test(m), (Map<String, String>) null);
}
@SuppressWarnings("unchecked")
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass, final Predicate<String> srcColumnPredicate, final Map<String, String> names) {
return create(destClass, srcClass, (sc, m) -> srcColumnPredicate.test(m), names);
}
@SuppressWarnings("unchecked")
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass, final BiPredicate<Class<S>, String> srcColumnPredicate) {
return create(destClass, srcClass, srcColumnPredicate, (Map<String, String>) null);
}
@SuppressWarnings("unchecked")
public static <D, S> Reproduce<D, S> create(final Class<D> destClass, final Class<S> srcClass, final BiPredicate<Class<S>, String> srcColumnPredicate, final Map<String, String> names) {
// ------------------------------------------------------------------------------
final String supDynName = Reproduce.class.getName().replace('.', '/');
final String destName = destClass.getName().replace('.', '/');
final String srcName = srcClass.getName().replace('.', '/');
final String destClassName = destClass.getName().replace('.', '/');
final String srcClassName = srcClass.getName().replace('.', '/');
final String destDesc = Type.getDescriptor(destClass);
final String srcDesc = Type.getDescriptor(srcClass);
String newDynName = supDynName + "Dyn_" + destClass.getSimpleName() + "_" + srcClass.getSimpleName();
ClassLoader loader = Thread.currentThread().getContextClassLoader();
if (String.class.getClassLoader() != destClass.getClassLoader()) {
loader = destClass.getClassLoader();
newDynName = destName + "_Dyn" + Reproduce.class.getSimpleName() + "_" + srcClass.getSimpleName();
newDynName = destClassName + "_Dyn" + Reproduce.class.getSimpleName() + "_" + srcClass.getSimpleName();
}
try {
return (Reproduce) loader.loadClass(newDynName.replace('/', '.')).newInstance();
return (Reproduce) loader.loadClass(newDynName.replace('/', '.')).getDeclaredConstructor().newInstance();
} catch (Throwable ex) {
}
// ------------------------------------------------------------------------------
@@ -69,45 +88,72 @@ public interface Reproduce<D, S> extends BiFunction<D, S, D> {
if (Modifier.isStatic(field.getModifiers())) continue;
if (Modifier.isFinal(field.getModifiers())) continue;
if (!Modifier.isPublic(field.getModifiers())) continue;
final String fname = field.getName();
final String sfname = field.getName();
if (srcColumnPredicate != null && !srcColumnPredicate.test(srcClass, sfname)) continue;
final String dfname = names == null ? sfname : names.getOrDefault(sfname, sfname);
java.lang.reflect.Method setter = null;
try {
if (!field.getType().equals(destClass.getField(fname).getType())) continue;
if (!columnPredicate.test(fname)) continue;
if (!field.getType().equals(destClass.getField(dfname).getType())) continue;
} catch (Exception e) {
continue;
try {
char[] cs = dfname.toCharArray();
cs[0] = Character.toUpperCase(cs[0]);
String dfname2 = new String(cs);
setter = destClass.getMethod("set" + dfname2, field.getType());
} catch (Exception e2) {
continue;
}
}
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
String td = Type.getDescriptor(field.getType());
mv.visitFieldInsn(GETFIELD, srcName, fname, td);
mv.visitFieldInsn(PUTFIELD, destName, fname, td);
mv.visitFieldInsn(GETFIELD, srcClassName, sfname, td);
if (setter == null) {
mv.visitFieldInsn(PUTFIELD, destClassName, dfname, td);
} else {
mv.visitMethodInsn(INVOKEVIRTUAL, destClassName, setter.getName(), Type.getMethodDescriptor(setter), false);
}
}
for (java.lang.reflect.Method getter : srcClass.getMethods()) {
if (Modifier.isStatic(getter.getModifiers())) continue;
if (getter.getParameterTypes().length > 0) continue; //为了兼容android 而不使用 getParameterCount()
if (getter.getParameterTypes().length > 0) continue;
if ("getClass".equals(getter.getName())) continue;
if (!getter.getName().startsWith("get") && !getter.getName().startsWith("is")) continue;
java.lang.reflect.Method setter;
boolean is = getter.getName().startsWith("is");
final boolean is = getter.getName().startsWith("is");
String sfname = getter.getName().substring(is ? 2 : 3);
if (sfname.length() < 2 || Character.isLowerCase(sfname.charAt(1))) {
char[] cs = sfname.toCharArray();
cs[0] = Character.toLowerCase(cs[0]);
sfname = new String(cs);
}
if (srcColumnPredicate != null && !srcColumnPredicate.test(srcClass, sfname)) continue;
final String dfname = names == null ? sfname : names.getOrDefault(sfname, sfname);
java.lang.reflect.Method setter = null;
java.lang.reflect.Field srcField = null;
char[] cs = dfname.toCharArray();
cs[0] = Character.toUpperCase(cs[0]);
String dfname2 = new String(cs);
try {
setter = destClass.getMethod(getter.getName().replaceFirst(is ? "is" : "get", "set"), getter.getReturnType());
if (columnPredicate != null) {
String col = setter.getName().substring(3);
if (col.length() < 2 || Character.isLowerCase(col.charAt(1))) {
char[] cs = col.toCharArray();
cs[0] = Character.toLowerCase(cs[0]);
col = new String(cs);
}
if (!columnPredicate.test(col)) continue;
}
setter = destClass.getMethod("set" + dfname2, getter.getReturnType());
} catch (Exception e) {
continue;
try {
srcField = destClass.getField(dfname);
if (!getter.getReturnType().equals(srcField.getType())) continue;
} catch (Exception e2) {
continue;
}
}
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
mv.visitMethodInsn(INVOKEVIRTUAL, srcName, getter.getName(), Type.getMethodDescriptor(getter), false);
mv.visitMethodInsn(INVOKEVIRTUAL, destName, setter.getName(), Type.getMethodDescriptor(setter), false);
mv.visitMethodInsn(INVOKEVIRTUAL, srcClassName, getter.getName(), Type.getMethodDescriptor(getter), false);
if (srcField == null) {
mv.visitMethodInsn(INVOKEVIRTUAL, destClassName, setter.getName(), Type.getMethodDescriptor(setter), false);
} else {
mv.visitFieldInsn(PUTFIELD, destClassName, dfname, Type.getDescriptor(getter.getReturnType()));
}
}
mv.visitVarInsn(ALOAD, 1);
mv.visitInsn(ARETURN);
@@ -119,9 +165,9 @@ public interface Reproduce<D, S> extends BiFunction<D, S, D> {
//mv.setDebug(true);
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitTypeInsn(CHECKCAST, destName);
mv.visitTypeInsn(CHECKCAST, destClassName);
mv.visitVarInsn(ALOAD, 2);
mv.visitTypeInsn(CHECKCAST, srcName);
mv.visitTypeInsn(CHECKCAST, srcClassName);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynName, "apply", "(" + destDesc + srcDesc + ")" + destDesc, false);
mv.visitInsn(ARETURN);
mv.visitMaxs(3, 3);
@@ -136,7 +182,7 @@ public interface Reproduce<D, S> extends BiFunction<D, S, D> {
}
}.loadClass(newDynName.replace('/', '.'), bytes);
try {
return (Reproduce) creatorClazz.newInstance();
return (Reproduce) creatorClazz.getDeclaredConstructor().newInstance();
} catch (Exception ex) {
throw new RuntimeException(ex);
}

View File

@@ -567,7 +567,11 @@ public final class ResourceFactory {
}
}
if (ns == null) continue;
if (ns.getClass().isPrimitive() || ns.getClass().isArray() || ns.getClass().getName().startsWith("java")) continue;
if (ns.getClass().isPrimitive() || ns.getClass().isArray()
|| ns.getClass().getName().startsWith("java.")
|| ns.getClass().getName().startsWith("javax.")
|| ns.getClass().getName().startsWith("jdk.")
|| ns.getClass().getName().startsWith("sun.")) continue;
if (flag) this.inject(ns, attachment, consumer, list);
continue;
}

View File

@@ -35,35 +35,83 @@ public final class Utility {
private static final char hex[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
private static final sun.misc.Unsafe UNSAFE;
/**
* <blockquote><pre>
* public final class AnonymousArrayFunction implements java.util.function.Function&lt;Object, char[]&gt; {
*
* final sun.misc.Unsafe unsafe;
*
* final long fd;
*
* public AnonymousArrayFunction(Object obj, long fd) {
* this.unsafe = (sun.misc.Unsafe) obj;
* this.fd = fd;
* }
*
* &#64;Override
* public char[] apply(Object t) {
* return (char[]) unsafe.getObject(t, fd);
* }
*
* }
* </pre></blockquote>
*/
private static final String functionClassBinary = "cafebabe00000034002f0a00090022070023090008002409000800250a000200260700"
+ "270a0008002807002907002a07002b010006756e736166650100114c73756e2f6d6973632f556e736166653b01000266640100014a0100063c69"
+ "6e69743e010016284c6a6176612f6c616e672f4f626a6563743b4a2956010004436f646501000f4c696e654e756d6265725461626c650100124c"
+ "6f63616c5661726961626c655461626c65010004746869730100294c6f72672f7265646b616c652f7574696c2f416e6f6e796d6f757341727261"
+ "7946756e6374696f6e3b0100036f626a0100124c6a6176612f6c616e672f4f626a6563743b0100056170706c79010016284c6a6176612f6c616e"
+ "672f4f626a6563743b295b43010001740100236f72672e6e65746265616e732e536f757263654c6576656c416e6e6f746174696f6e730100144c"
+ "6a6176612f6c616e672f4f766572726964653b010026284c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f4f626a6563"
+ "743b0100095369676e61747572650100454c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f46756e"
+ "6374696f6e3c4c6a6176612f6c616e672f4f626a6563743b5b433e3b01000a536f7572636546696c6501001b416e6f6e796d6f75734172726179"
+ "46756e6374696f6e2e6a6176610c000f002c01000f73756e2f6d6973632f556e736166650c000b000c0c000d000e0c002d002e0100025b430c00"
+ "1800190100276f72672f7265646b616c652f7574696c2f416e6f6e796d6f7573417272617946756e6374696f6e0100106a6176612f6c616e672f"
+ "4f626a65637401001b6a6176612f7574696c2f66756e6374696f6e2f46756e6374696f6e0100032829560100096765744f626a65637401002728"
+ "4c6a6176612f6c616e672f4f626a6563743b4a294c6a6176612f6c616e672f4f626a6563743b0031000800090001000a00020010000b000c0000"
+ "0010000d000e000000030001000f0010000100110000005c00030004000000122ab700012a2bc00002b500032a20b50004b10000000200120000"
+ "001200040000000e0004000f000c0010001100110013000000200003000000120014001500000000001200160017000100000012000d000e0002"
+ "000100180019000200110000004400040002000000102ab400032b2ab40004b60005c00006b00000000200120000000600010000001500130000"
+ "001600020000001000140015000000000010001a00170001001b000000060001001c000010410018001d00020011000000300002000200000006"
+ "2a2bb60007b00000000200120000000600010000000800130000000c000100000006001400150000001b000000060001001c00000002001e0000"
+ "0002001f0020000000020021";
private static final long strvaloffset;
private static final Function<Object, char[]> strFunction;
private static final long sbvaloffset;
private static final Function<Object, char[]> sbFunction;
private static final javax.net.ssl.SSLContext DEFAULTSSL_CONTEXT;
private static final javax.net.ssl.HostnameVerifier defaultVerifier = (s, ss) -> true;
static {
sun.misc.Unsafe usafe = null;
long fd1 = 0L;
long fd2 = 0L;
Function<Object, char[]> strFunction0 = null;
Function<Object, char[]> sbFunction0 = null;
try {
Field f = String.class.getDeclaredField("value");
if (f.getType() == char[].class) { //JDK9及以上不再是char[]
Field safeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
Class unsafeClass = Class.forName("sun.misc.Unsafe");
Field safeField = unsafeClass.getDeclaredField("theUnsafe");
safeField.setAccessible(true);
usafe = (sun.misc.Unsafe) safeField.get(null);
fd1 = usafe.objectFieldOffset(f);
fd2 = usafe.objectFieldOffset(StringBuilder.class.getSuperclass().getDeclaredField("value"));
final Object usafe = safeField.get(null);
final Method fm = usafe.getClass().getMethod("objectFieldOffset", Field.class);
final long fd1 = (Long) fm.invoke(usafe, f);
final long fd2 = (Long) fm.invoke(usafe, StringBuilder.class.getSuperclass().getDeclaredField("value"));
byte[] bytes = hexToBin(functionClassBinary);
Class<Attribute> creatorClazz = (Class<Attribute>) new ClassLoader() {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
}.loadClass("org.re" + "dkale.util.AnonymousArrayFunction", bytes);
strFunction0 = (Function<Object, char[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd1);
sbFunction0 = (Function<Object, char[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd2);
}
} catch (Exception e) {
throw new RuntimeException(e); //不可能会发生
} catch (Throwable e) { //不会发生
//e.printStackTrace();
}
UNSAFE = usafe;
strvaloffset = fd1;
sbvaloffset = fd2;
strFunction = strFunction0;
sbFunction = sbFunction0;
try {
DEFAULTSSL_CONTEXT = javax.net.ssl.SSLContext.getInstance("SSL");
@@ -82,7 +130,7 @@ public final class Utility {
}
}}, null);
} catch (Exception e) {
throw new RuntimeException(e); //不可能会发生
throw new RuntimeException(e); //不会发生
}
}
@@ -518,6 +566,56 @@ public final class Utility {
return false;
}
/**
* 判断指定值是否包含指定的数组中包含返回true
*
* @param values 集合
* @param value 单值
*
* @return boolean
*/
public static boolean contains(int[] values, int value) {
if (values == null) return false;
for (int v : values) {
if (v == value) return true;
}
return false;
}
/**
* 判断指定值是否包含指定的数组中包含返回true
*
* @param values 集合
* @param value 单值
*
* @return boolean
*/
public static boolean contains(long[] values, long value) {
if (values == null) return false;
for (long v : values) {
if (v == value) return true;
}
return false;
}
/**
* 判断指定值是否包含指定的数组中包含返回true
*
* @param <T> 泛型
* @param values 集合
* @param value 单值
*
* @return boolean
*/
public static <T> boolean contains(T[] values, T value) {
if (values == null) return false;
for (T v : values) {
if (v == null && value == null) return true;
if (v != null && v.equals(value)) return true;
}
return false;
}
/**
* 删除掉字符串数组中包含指定的字符串
*
@@ -1048,8 +1146,8 @@ public final class Utility {
public static byte[] encodeUTF8(final String value) {
if (value == null) return new byte[0];
if (UNSAFE == null) return encodeUTF8(value.toCharArray());
return encodeUTF8((char[]) UNSAFE.getObject(value, strvaloffset));
if (strFunction == null) return encodeUTF8(value.toCharArray());
return encodeUTF8((char[]) strFunction.apply(value));
}
public static byte[] encodeUTF8(final char[] array) {
@@ -1091,14 +1189,14 @@ public final class Utility {
public static char[] charArray(String value) {
if (value == null) return null;
if (UNSAFE == null) return value.toCharArray();
return (char[]) UNSAFE.getObject(value, strvaloffset);
if (strFunction == null) return value.toCharArray();
return strFunction.apply(value);
}
public static char[] charArray(StringBuilder value) {
if (value == null) return null;
if (UNSAFE == null) return value.toString().toCharArray();
return (char[]) UNSAFE.getObject(value, sbvaloffset);
if (sbFunction == null) return value.toString().toCharArray();
return sbFunction.apply(value);
}
public static ByteBuffer encodeUTF8(final ByteBuffer buffer, final char[] array) {
@@ -1111,8 +1209,8 @@ public final class Utility {
public static int encodeUTF8Length(String value) {
if (value == null) return -1;
if (UNSAFE == null) return encodeUTF8Length(value.toCharArray());
return encodeUTF8Length((char[]) UNSAFE.getObject(value, strvaloffset));
if (strFunction == null) return encodeUTF8Length(value.toCharArray());
return encodeUTF8Length(strFunction.apply(value));
}
public static int encodeUTF8Length(final char[] text) {

View File

@@ -3,7 +3,7 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.net;
package org.redkale.test.http;
import org.redkale.net.http.HttpServlet;
import org.redkale.net.http.MultiPart;

View File

@@ -0,0 +1,72 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.net;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import org.redkale.net.*;
import org.redkale.net.http.HttpServer;
import org.redkale.net.sncp.Sncp;
import org.redkale.util.AnyValue.DefaultAnyValue;
/**
*
* @author zhangjx
*/
public class TransportTest {
private static final String format = "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%tL";
public static void main(String[] args) throws Throwable {
System.setProperty("net.transport.checkinterval", "2");
List<InetSocketAddress> addrs = new ArrayList<>();
addrs.add(new InetSocketAddress("127.0.0.1", 22001));
addrs.add(new InetSocketAddress("127.0.0.1", 22002));
addrs.add(new InetSocketAddress("127.0.0.1", 22003));
addrs.add(new InetSocketAddress("127.0.0.1", 22004));
for (InetSocketAddress servaddr : addrs) {
//if (servaddr.getPort() % 100 == 4) continue;
HttpServer server = new HttpServer();
DefaultAnyValue servconf = DefaultAnyValue.create("port", servaddr.getPort());
server.init(servconf);
server.start();
}
addrs.add(new InetSocketAddress("127.0.0.1", 22005));
Thread.sleep(1000);
TransportFactory factory = TransportFactory.create(10);
DefaultAnyValue conf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, 5);
factory.init(conf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
Transport transport = factory.createTransportTCP("", null, addrs);
System.out.println(String.format(format, System.currentTimeMillis()));
try {
CountDownLatch cdl = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
transport.pollConnection(null).whenComplete((r, t) -> {
cdl.countDown();
System.out.println("连接: " + r.getRemoteAddress());
});
}
cdl.await();
HttpServer server = new HttpServer();
DefaultAnyValue servconf = DefaultAnyValue.create("port", 22005);
server.init(servconf);
server.start();
Thread.sleep(4000);
CountDownLatch cdl2 = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
transport.pollConnection(null).whenComplete((r, t) -> {
cdl2.countDown();
System.out.println("连接: " + r.getRemoteAddress());
});
}
cdl2.await();
} finally {
System.out.println(String.format(format, System.currentTimeMillis()));
}
}
}