34 Commits

Author SHA1 Message Date
Redkale
8b69e7d02b Redkale 2.0.0.rc3 结束 2019-11-25 16:33:48 +08:00
Redkale
02a10bf014 2019-11-23 10:55:29 +08:00
Redkale
004b83172e 2019-11-23 09:28:18 +08:00
Redkale
fee4555cef 2019-11-23 09:04:08 +08:00
Redkale
758bd7de72 2019-11-23 09:01:53 +08:00
Redkale
b2dd366640 2019-11-19 20:47:16 +08:00
Redkale
934c82eadd 优化JsonWrite.writeSmallString方法 2019-11-18 13:42:55 +08:00
Redkale
c7ed6574cc 优化JsonWrite.writeSmallString方法 2019-11-18 13:34:51 +08:00
Redkale
2ea2667fa7 Utility增加byteArray方法 2019-11-18 13:25:51 +08:00
Redkale
34ae2d38c5 2019-11-15 15:54:34 +08:00
Redkale
a1c95544cb 2019-11-15 15:19:16 +08:00
Redkale
c6dc38c35c PoolTcpSource增加ping接口 2019-11-14 12:04:22 +08:00
Redkale
39203ab598 2019-11-13 10:11:35 +08:00
Redkale
51a95a84aa 去掉AsyncConnection内的ByteBufferPool,HttpResponse合并header和body的Buffer 2019-11-13 10:07:48 +08:00
Redkale
8a8d45e642 DataSource增加判断字符串字段值长度的FilterExpress 2019-11-11 11:10:32 +08:00
Redkale
52eb7dbc0c 2019-11-07 09:55:16 +08:00
Redkale
0e14b60f12 2019-11-05 21:21:57 +08:00
Redkale
d373ab7204 2019-11-05 09:31:38 +08:00
Redkale
4f9a563ba7 2019-11-05 09:26:15 +08:00
Redkale
852da19b1e Redkale 2.0.0.rc2 结束 2019-11-05 09:21:49 +08:00
Redkale
ddfc040a53 2019-11-05 09:16:34 +08:00
Redkale
df3ccb763a 2019-11-04 11:53:11 +08:00
Redkale
f42561ca93 convert支持sql包的几个date类型 2019-11-03 11:47:54 +08:00
Redkale
580e28519a 2019-11-02 15:08:38 +08:00
Redkale
9ecc1d8f19 2019-11-02 14:20:07 +08:00
Redkale
40629ed7b9 2019-10-28 13:14:38 +08:00
Redkale
5790135add 2019-10-28 11:59:23 +08:00
Redkale
fd862ed6c6 Convert兼容java.util.Map.Entry 2019-10-28 11:56:45 +08:00
Redkale
33763af96c 兼容TypeToken.typeToClass 方法 2019-10-26 16:03:36 +08:00
Redkale
7c05df3cfb 2019-10-26 15:32:16 +08:00
Redkale
f471e2d4c5 配置支持远程地址 2019-10-26 09:45:33 +08:00
Redkale
d4fd093521 2019-10-25 11:42:50 +08:00
Redkale
40003c7789 2019-10-24 11:45:51 +08:00
Redkale
b94f99f338 2019-10-22 09:11:37 +08:00
41 changed files with 814 additions and 356 deletions

View File

@@ -77,6 +77,7 @@
<version>2.6</version> <version>2.6</version>
<configuration> <configuration>
<archive> <archive>
<addMavenDescriptor>false</addMavenDescriptor>
<manifest> <manifest>
<mainClass>org.redkale.boot.Application</mainClass> <mainClass>org.redkale.boot.Application</mainClass>
</manifest> </manifest>

View File

@@ -116,7 +116,7 @@
excludelibs: 排除lib.path与excludes中的正则表达式匹配的路径, 多个正则表达式用分号;隔开 excludelibs: 排除lib.path与excludes中的正则表达式匹配的路径, 多个正则表达式用分号;隔开
charset: 文本编码, 默认: UTF-8 charset: 文本编码, 默认: UTF-8
backlog: 默认10K backlog: 默认10K
threads 线程数, 默认: CPU核数*32 threads 线程数, 默认: CPU核数*2最小8个
maxconns最大连接数, 小于1表示无限制 默认: 0 maxconns最大连接数, 小于1表示无限制 默认: 0
maxbody: request.body最大值 默认: 64K maxbody: request.body最大值 默认: 64K
bufferCapacity: ByteBuffer的初始化大小 TCP默认: 32K; (HTTP 2.0、WebSocket必须要16k以上); UDP默认: 1350B bufferCapacity: ByteBuffer的初始化大小 TCP默认: 32K; (HTTP 2.0、WebSocket必须要16k以上); UDP默认: 1350B

View File

@@ -30,5 +30,7 @@ module org.redkale {
exports org.redkale.util; exports org.redkale.util;
exports org.redkale.watch; exports org.redkale.watch;
uses org.redkale.source.SourceLoader;
uses org.redkale.util.ResourceInjectLoader;
} }
*/ */

View File

@@ -196,7 +196,7 @@ public final class ApiDocsService {
final FileOutputStream out = new FileOutputStream(new File(app.getHome(), "apidoc.json")); final FileOutputStream out = new FileOutputStream(new File(app.getHome(), "apidoc.json"));
out.write(json.getBytes("UTF-8")); out.write(json.getBytes("UTF-8"));
out.close(); out.close();
File doctemplate = new File(app.getConfPath(), "apidoc-template.html"); File doctemplate = new File(app.getConfPath().toString(), "apidoc-template.html");
InputStream in = null; InputStream in = null;
if (doctemplate.isFile() && doctemplate.canRead()) { if (doctemplate.isFile() && doctemplate.canRead()) {
in = new FileInputStream(doctemplate); in = new FileInputStream(doctemplate);

View File

@@ -57,12 +57,12 @@ public final class Application {
public static final String RESNAME_APP_TIME = "APP_TIME"; public static final String RESNAME_APP_TIME = "APP_TIME";
/** /**
* 当前进程的根目录, 类型String、File、Path * 当前进程的根目录, 类型String、File、Path、URI
*/ */
public static final String RESNAME_APP_HOME = "APP_HOME"; public static final String RESNAME_APP_HOME = "APP_HOME";
/** /**
* 当前进程的配置目录如果不是绝对路径则视为HOME目录下的相对路径 类型String、File、Path * 当前进程的配置目录如果不是绝对路径则视为HOME目录下的相对路径 类型String、File、Path、URI
*/ */
public static final String RESNAME_APP_CONF = "APP_CONF"; public static final String RESNAME_APP_CONF = "APP_CONF";
@@ -143,7 +143,7 @@ public final class Application {
private final File home; private final File home;
//配置文件目录 //配置文件目录
private final File confPath; private final URI confPath;
//日志 //日志
private final Logger logger; private final Logger logger;
@@ -176,16 +176,19 @@ public final class Application {
this.resourceFactory.register(RESNAME_APP_TIME, long.class, this.startTime); this.resourceFactory.register(RESNAME_APP_TIME, long.class, this.startTime);
this.resourceFactory.register(RESNAME_APP_HOME, Path.class, root.toPath()); this.resourceFactory.register(RESNAME_APP_HOME, Path.class, root.toPath());
this.resourceFactory.register(RESNAME_APP_HOME, File.class, root); this.resourceFactory.register(RESNAME_APP_HOME, File.class, root);
this.resourceFactory.register(RESNAME_APP_HOME, URI.class, root.toURI());
try { try {
this.resourceFactory.register(RESNAME_APP_HOME, root.getCanonicalPath()); this.resourceFactory.register(RESNAME_APP_HOME, root.getCanonicalPath());
this.home = root.getCanonicalFile(); this.home = root.getCanonicalFile();
String confsubpath = System.getProperty(RESNAME_APP_CONF, "conf"); String confsubpath = System.getProperty(RESNAME_APP_CONF, "conf");
if (confsubpath.charAt(0) == '/' || confsubpath.indexOf(':') > 0) { if (confsubpath.contains("://")) {
this.confPath = new File(confsubpath).getCanonicalFile(); this.confPath = new URI(confsubpath);
} else if (confsubpath.charAt(0) == '/' || confsubpath.indexOf(':') > 0) {
this.confPath = new File(confsubpath).getCanonicalFile().toURI();
} else { } else {
this.confPath = new File(this.home, confsubpath).getCanonicalFile(); this.confPath = new File(this.home, confsubpath).getCanonicalFile().toURI();
} }
} catch (IOException e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
String localaddr = config.getValue("address", "").trim(); String localaddr = config.getValue("address", "").trim();
@@ -209,11 +212,12 @@ public final class Application {
System.setProperty(RESNAME_APP_NODE, node); System.setProperty(RESNAME_APP_NODE, node);
} }
//以下是初始化日志配置 //以下是初始化日志配置
final File logconf = new File(confPath, "logging.properties"); final URI logConfURI = "file".equals(confPath.getScheme()) ? new File(new File(confPath), "logging.properties").toURI()
if (logconf.isFile() && logconf.canRead()) { : URI.create(confPath.toString() + (confPath.toString().endsWith("/") ? "" : "/") + "logging.properties");
if (!"file".equals(confPath.getScheme()) || (new File(logConfURI).isFile() && new File(logConfURI).canRead())) {
try { try {
final String rootpath = root.getCanonicalPath().replace('\\', '/'); final String rootpath = root.getCanonicalPath().replace('\\', '/');
FileInputStream fin = new FileInputStream(logconf); InputStream fin = logConfURI.toURL().openStream();
Properties properties = new Properties(); Properties properties = new Properties();
properties.load(fin); properties.load(fin);
fin.close(); fin.close();
@@ -301,7 +305,7 @@ public final class Application {
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> { transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r);
t.setDaemon(true); t.setDaemon(true);
t.setName("Transport-Thread-" + counter.incrementAndGet()); t.setName("Redkale-Transport-Thread-" + counter.incrementAndGet());
return t; return t;
}); });
transportGroup = AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1); transportGroup = AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1);
@@ -316,7 +320,7 @@ public final class Application {
transportExec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8, (Runnable r) -> { transportExec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r);
t.setDaemon(true); t.setDaemon(true);
t.setName("Transport-Thread-" + counter.incrementAndGet()); t.setName("Redkale-Transport-Thread-" + counter.incrementAndGet());
return t; return t;
}); });
try { try {
@@ -375,7 +379,7 @@ public final class Application {
return home; return home;
} }
public File getConfPath() { public URI getConfPath() {
return confPath; return confPath;
} }
@@ -398,10 +402,14 @@ public final class Application {
System.setProperty("convert.bson.writer.buffer.defsize", "4096"); System.setProperty("convert.bson.writer.buffer.defsize", "4096");
System.setProperty("convert.json.writer.buffer.defsize", "4096"); System.setProperty("convert.json.writer.buffer.defsize", "4096");
File persist = new File(this.confPath, "persistence.xml"); final String confpath = this.confPath.toString();
final String homepath = this.home.getCanonicalPath(); final String homepath = this.home.getCanonicalPath();
final String confpath = this.confPath.getCanonicalPath(); if ("file".equals(this.confPath.getScheme())) {
File persist = new File(new File(confPath), "persistence.xml");
if (persist.isFile()) System.setProperty(DataSources.DATASOURCE_CONFPATH, persist.getCanonicalPath()); if (persist.isFile()) System.setProperty(DataSources.DATASOURCE_CONFPATH, persist.getCanonicalPath());
} else {
System.setProperty(DataSources.DATASOURCE_CONFPATH, confpath + (confpath.endsWith("/") ? "" : "/") + "persistence.xml");
}
String pidstr = ""; String pidstr = "";
try { //JDK 9+ try { //JDK 9+
Class phclass = Class.forName("java.lang.ProcessHandle"); Class phclass = Class.forName("java.lang.ProcessHandle");
@@ -425,13 +433,17 @@ public final class Application {
if (dfloads != null) { if (dfloads != null) {
for (String dfload : dfloads.split(";")) { for (String dfload : dfloads.split(";")) {
if (dfload.trim().isEmpty()) continue; if (dfload.trim().isEmpty()) continue;
final File df = (dfload.indexOf('/') < 0) ? new File(confPath, "/" + dfload) : new File(dfload); final URI df = (dfload.indexOf('/') < 0) ? URI.create(confpath + (confpath.endsWith("/") ? "" : "/") + dfload) : new File(dfload).toURI();
if (df.isFile()) { if (!"file".equals(df.getScheme()) || new File(df).isFile()) {
Properties ps = new Properties(); Properties ps = new Properties();
InputStream in = new FileInputStream(df); try {
InputStream in = df.toURL().openStream();
ps.load(in); ps.load(in);
in.close(); in.close();
ps.forEach((x, y) -> resourceFactory.register("property." + x, y.toString().replace("${APP_HOME}", homepath))); ps.forEach((x, y) -> resourceFactory.register("property." + x, y.toString().replace("${APP_HOME}", homepath)));
} catch (Exception e) {
logger.log(Level.WARNING, "load properties(" + dfload + ") error", e);
}
} }
} }
} }
@@ -558,9 +570,10 @@ public final class Application {
} }
public void restoreConfig() throws IOException { public void restoreConfig() throws IOException {
if (!"file".equals(this.confPath.getScheme())) return;
synchronized (this) { synchronized (this) {
File confFile = new File(this.confPath, "application.xml"); File confFile = new File(this.confPath.toString(), "application.xml");
confFile.renameTo(new File(this.confPath, "application_" + String.format("%1$tY%1$tm%1$td%1$tH%1$tM%1$tS", System.currentTimeMillis()) + ".xml")); confFile.renameTo(new File(this.confPath.toString(), "application_" + String.format("%1$tY%1$tm%1$td%1$tH%1$tM%1$tS", System.currentTimeMillis()) + ".xml"));
final PrintStream ps = new PrintStream(new FileOutputStream(confFile)); final PrintStream ps = new PrintStream(new FileOutputStream(confFile));
ps.append(config.toXML("application")); ps.append(config.toXML("application"));
ps.close(); ps.close();
@@ -571,7 +584,7 @@ public final class Application {
final Application application = this; final Application application = this;
new Thread() { new Thread() {
{ {
setName("Application-Control-Thread"); setName("Redkale-Application-SelfServer-Thread");
} }
@Override @Override
@@ -742,7 +755,7 @@ public final class Application {
Thread thread = new Thread() { Thread thread = new Thread() {
{ {
String host = serconf.getValue("host", "0.0.0.0").replace("0.0.0.0", "*"); String host = serconf.getValue("host", "0.0.0.0").replace("0.0.0.0", "*");
setName(serconf.getValue("protocol", "Server").toUpperCase() + "-" + host + ":" + serconf.getIntValue("port") + "-Thread"); setName("Redkale-" + serconf.getValue("protocol", "Server").toUpperCase() + "-" + host + ":" + serconf.getIntValue("port") + "-Thread");
this.setDaemon(true); this.setDaemon(true);
} }
@@ -843,18 +856,20 @@ public final class Application {
final String home = new File(System.getProperty(RESNAME_APP_HOME, "")).getCanonicalPath().replace('\\', '/'); final String home = new File(System.getProperty(RESNAME_APP_HOME, "")).getCanonicalPath().replace('\\', '/');
System.setProperty(RESNAME_APP_HOME, home); System.setProperty(RESNAME_APP_HOME, home);
String confsubpath = System.getProperty(RESNAME_APP_CONF, "conf"); String confsubpath = System.getProperty(RESNAME_APP_CONF, "conf");
File appfile; URI appconf;
if (confsubpath.charAt(0) == '/' || confsubpath.indexOf(':') > 0) { if (confsubpath.contains("://")) {
appfile = new File(confsubpath).getCanonicalFile(); appconf = URI.create(confsubpath + (confsubpath.endsWith("/") ? "" : "/") + "application.xml");
} else if (confsubpath.charAt(0) == '/' || confsubpath.indexOf(':') > 0) {
appconf = new File(confsubpath, "application.xml").toURI();
} else { } else {
appfile = new File(new File(home), confsubpath); appconf = new File(new File(home, confsubpath), "application.xml").toURI();
} }
File appconf = new File(appfile, "application.xml"); return new Application(singleton, load(appconf.toURL().openStream()));
return new Application(singleton, load(new FileInputStream(appconf)));
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Utility.midnight(); //先初始化一下Utility Utility.midnight(); //先初始化一下Utility
Thread.currentThread().setName("Redkale-Application-Main-Thread");
//运行主程序 //运行主程序
final Application application = Application.create(false); final Application application = Application.create(false);
if (System.getProperty("CMD") != null) { if (System.getProperty("CMD") != null) {

View File

@@ -133,7 +133,7 @@ public class LogFileHandler extends Handler {
} }
private void open() { private void open() {
final String name = "Logging-" + getClass().getSimpleName() + "-Thread"; final String name = "Redkale-Logging-" + getClass().getSimpleName() + "-Thread";
new Thread() { new Thread() {
{ {
setName(name); setName(name);

View File

@@ -35,7 +35,7 @@ public class FilterWatchService extends AbstractWatchService {
@Resource @Resource
protected Application application; protected Application application;
@RestMapping(name = "addfilter", auth = false, comment = "动态增加Filter") @RestMapping(name = "addFilter", auth = false, comment = "动态增加Filter")
public RetResult addFilter(@RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar, public RetResult addFilter(@RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar,
@RestParam(name = "server", comment = "Server节点名") final String serverName, @RestParam(name = "server", comment = "Server节点名") final String serverName,
@RestParam(name = "type", comment = "Filter类名") final String filterType) throws IOException { @RestParam(name = "type", comment = "Filter类名") final String filterType) throws IOException {

View File

@@ -50,7 +50,7 @@ public class ServerWatchService extends AbstractWatchService {
return new RetResult(rs); return new RetResult(rs);
} }
@RestMapping(name = "changeaddress", comment = "更改Server的监听地址和端口") @RestMapping(name = "changeAddress", comment = "更改Server的监听地址和端口")
public RetResult changeAddress(@RestParam(name = "#port:") final int oldport, public RetResult changeAddress(@RestParam(name = "#port:") final int oldport,
@RestParam(name = "#newhost:") final String newhost, @RestParam(name = "#newport:") final int newport) { @RestParam(name = "#newhost:") final String newhost, @RestParam(name = "#newport:") final int newport) {
if (oldport < 1) return new RetResult(RET_WATCH_PARAMS_ILLEGAL, "not found param `oldport`"); if (oldport < 1) return new RetResult(RET_WATCH_PARAMS_ILLEGAL, "not found param `oldport`");

View File

@@ -30,8 +30,8 @@ public class ServiceWatchService extends AbstractWatchService {
protected Application application; protected Application application;
@RestConvert(type = void.class) @RestConvert(type = void.class)
@RestMapping(name = "setfield", auth = false, comment = "设置Service中指定字段的内容") @RestMapping(name = "setField", auth = false, comment = "设置Service中指定字段的内容")
public RetResult setfield(@RestParam(name = "name", comment = "Service的资源名") String name, public RetResult setField(@RestParam(name = "name", comment = "Service的资源名") String name,
@RestParam(name = "type", comment = "Service的类名") String type, @RestParam(name = "type", comment = "Service的类名") String type,
@RestParam(name = "field", comment = "字段名") String field, @RestParam(name = "field", comment = "字段名") String field,
@RestParam(name = "value", comment = "字段值") String value) { @RestParam(name = "value", comment = "字段值") String value) {
@@ -65,8 +65,8 @@ public class ServiceWatchService extends AbstractWatchService {
} }
@RestConvert(type = void.class) @RestConvert(type = void.class)
@RestMapping(name = "getfield", auth = false, comment = "查询Service中指定字段的内容") @RestMapping(name = "getField", auth = false, comment = "查询Service中指定字段的内容")
public RetResult getfield(@RestParam(name = "name", comment = "Service的资源名") String name, public RetResult getField(@RestParam(name = "name", comment = "Service的资源名") String name,
@RestParam(name = "type", comment = "Service的类名") String type, @RestParam(name = "type", comment = "Service的类名") String type,
@RestParam(name = "field", comment = "字段名") String field) { @RestParam(name = "field", comment = "字段名") String field) {
if (name == null) name = ""; if (name == null) name = "";
@@ -98,8 +98,8 @@ public class ServiceWatchService extends AbstractWatchService {
} }
@RestConvert(type = void.class) @RestConvert(type = void.class)
@RestMapping(name = "runmethod", auth = false, comment = "调用Service中指定方法") @RestMapping(name = "runMethod", auth = false, comment = "调用Service中指定方法")
public RetResult runmethod(@RestParam(name = "name", comment = "Service的资源名") String name, public RetResult runMethod(@RestParam(name = "name", comment = "Service的资源名") String name,
@RestParam(name = "type", comment = "Service的类名") String type, @RestParam(name = "type", comment = "Service的类名") String type,
@RestParam(name = "method", comment = "Service的方法名") String method, @RestParam(name = "method", comment = "Service的方法名") String method,
@RestParam(name = "params", comment = "方法的参数值") List<String> params, @RestParam(name = "params", comment = "方法的参数值") List<String> params,
@@ -169,28 +169,28 @@ public class ServiceWatchService extends AbstractWatchService {
return dest; return dest;
} }
@RestMapping(name = "load", auth = false, comment = "动态增加Service") @RestMapping(name = "loadService", auth = false, comment = "动态增加Service")
public RetResult loadService(@RestParam(name = "type", comment = "Service的类名") String type, public RetResult loadService(@RestParam(name = "type", comment = "Service的类名") String type,
@RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) { @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) {
//待开发 //待开发
return RetResult.success(); return RetResult.success();
} }
@RestMapping(name = "reload", auth = false, comment = "重新加载Service") @RestMapping(name = "reloadService", auth = false, comment = "重新加载Service")
public RetResult reloadService(@RestParam(name = "name", comment = "Service的资源名") String name, public RetResult reloadService(@RestParam(name = "name", comment = "Service的资源名") String name,
@RestParam(name = "type", comment = "Service的类名") String type) { @RestParam(name = "type", comment = "Service的类名") String type) {
//待开发 //待开发
return RetResult.success(); return RetResult.success();
} }
@RestMapping(name = "stop", auth = false, comment = "动态停止Service") @RestMapping(name = "stopService", auth = false, comment = "动态停止Service")
public RetResult stopService(@RestParam(name = "name", comment = "Service的资源名") String name, public RetResult stopService(@RestParam(name = "name", comment = "Service的资源名") String name,
@RestParam(name = "type", comment = "Service的类名") String type) { @RestParam(name = "type", comment = "Service的类名") String type) {
//待开发 //待开发
return RetResult.success(); return RetResult.success();
} }
@RestMapping(name = "find", auth = false, comment = "查找Service") @RestMapping(name = "findService", auth = false, comment = "查找Service")
public RetResult find(@RestParam(name = "name", comment = "Service的资源名") String name, public RetResult find(@RestParam(name = "name", comment = "Service的资源名") String name,
@RestParam(name = "type", comment = "Service的类名") String type) { @RestParam(name = "type", comment = "Service的类名") String type) {
//待开发 //待开发

View File

@@ -25,13 +25,13 @@ public class ServletWatchService extends AbstractWatchService {
@Resource @Resource
protected TransportFactory transportFactory; protected TransportFactory transportFactory;
// //
// @RestMapping(name = "load", auth = false, comment = "动态增加Servlet") // @RestMapping(name = "loadServlet", auth = false, comment = "动态增加Servlet")
// public RetResult loadServlet(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) { // public RetResult loadServlet(String type, @RestUploadFile(maxLength = 10 * 1024 * 1024, fileNameReg = "\\.jar$") byte[] jar) {
// //待开发 // //待开发
// return RetResult.success(); // return RetResult.success();
// } // }
// //
// @RestMapping(name = "stop", auth = false, comment = "动态停止Servlet") // @RestMapping(name = "stopServlet", auth = false, comment = "动态停止Servlet")
// public RetResult stopServlet(String type) { // public RetResult stopServlet(String type) {
// //待开发 // //待开发
// return RetResult.success(); // return RetResult.success();

View File

@@ -133,6 +133,54 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
} }
}); });
try {
Class sqldateClass = Class.forName("java.sql.Date");
this.register(sqldateClass, new SimpledCoder<R, W, java.sql.Date>() {
@Override
public void convertTo(W out, java.sql.Date value) {
out.writeSmallString(value == null ? null : value.toString());
}
@Override
public java.sql.Date convertFrom(R in) {
String t = in.readSmallString();
return t == null ? null : java.sql.Date.valueOf(t);
}
});
Class sqltimeClass = Class.forName("java.sql.Time");
this.register(sqltimeClass, new SimpledCoder<R, W, java.sql.Time>() {
@Override
public void convertTo(W out, java.sql.Time value) {
out.writeSmallString(value == null ? null : value.toString());
}
@Override
public java.sql.Time convertFrom(R in) {
String t = in.readSmallString();
return t == null ? null : java.sql.Time.valueOf(t);
}
});
Class timestampClass = Class.forName("java.sql.Timestamp");
this.register(timestampClass, new SimpledCoder<R, W, java.sql.Timestamp>() {
@Override
public void convertTo(W out, java.sql.Timestamp value) {
out.writeSmallString(value == null ? null : value.toString());
}
@Override
public java.sql.Timestamp convertFrom(R in) {
String t = in.readSmallString();
return t == null ? null : java.sql.Timestamp.valueOf(t);
}
});
} catch (Throwable t) {
}
} }
} }
@@ -711,7 +759,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
encoder = new OptionalCoder(this, type); encoder = new OptionalCoder(this, type);
} else if (clazz == Object.class) { } else if (clazz == Object.class) {
return (Encodeable<W, E>) this.anyEncoder; return (Encodeable<W, E>) this.anyEncoder;
} else if (!clazz.getName().startsWith("java.") || java.net.HttpCookie.class == clazz || java.util.AbstractMap.SimpleEntry.class == clazz) { } else if (!clazz.getName().startsWith("java.") || java.net.HttpCookie.class == clazz
|| java.util.Map.Entry.class == clazz || java.util.AbstractMap.SimpleEntry.class == clazz) {
Encodeable simpleCoder = null; Encodeable simpleCoder = null;
for (final Method method : clazz.getDeclaredMethods()) { for (final Method method : clazz.getDeclaredMethods()) {
if (!Modifier.isStatic(method.getModifiers())) continue; if (!Modifier.isStatic(method.getModifiers())) continue;

View File

@@ -241,19 +241,39 @@ public class JsonByteBufferWriter extends JsonWriter {
* @param value String值 * @param value String值
*/ */
@Override @Override
public void writeTo(final boolean quote, final String value) { public void writeLatin1To(final boolean quote, final String value) {
char[] chs = Utility.charArray(value); byte[] bs = Utility.byteArray(value);
writeTo(-1, quote, chs, 0, chs.length); int expandsize = expand(bs.length + (quote ? 2 : 0));
if (expandsize == 0) {// 只需要一个buffer
final ByteBuffer buffer = this.buffers[index];
if (quote) buffer.put((byte) '"');
buffer.put(bs);
if (quote) buffer.put((byte) '"');
} else {
ByteBuffer buffer = this.buffers[index];
if (quote) {
if (!buffer.hasRemaining()) buffer = nextByteBuffer();
buffer.put((byte) '"');
}
for (byte b : bs) {
if (!buffer.hasRemaining()) buffer = nextByteBuffer();
buffer.put(b);
}
if (quote) {
if (!buffer.hasRemaining()) buffer = nextByteBuffer();
buffer.put((byte) '"');
}
}
} }
@Override @Override
public void writeInt(int value) { public void writeInt(int value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override
public void writeLong(long value) { public void writeLong(long value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override

View File

@@ -26,10 +26,7 @@ public final class JsonFactory extends ConvertFactory<JsonReader, JsonWriter> {
private static final JsonFactory instance = new JsonFactory(null, Boolean.getBoolean("convert.json.tiny")); private static final JsonFactory instance = new JsonFactory(null, Boolean.getBoolean("convert.json.tiny"));
static { static {
instance.register(InetAddress.class, InetAddressSimpledCoder.InetAddressJsonSimpledCoder.instance);
instance.register(InetSocketAddress.class, InetAddressSimpledCoder.InetSocketAddressJsonSimpledCoder.instance);
instance.register(DLong.class, DLongSimpledCoder.DLongJsonSimpledCoder.instance);
instance.register(BigInteger.class, BigIntegerSimpledCoder.BigIntegerJsonSimpledCoder.instance);
instance.register(Serializable.class, instance.loadEncoder(Object.class)); instance.register(Serializable.class, instance.loadEncoder(Object.class));
instance.register(AnyValue.class, instance.loadDecoder(AnyValue.DefaultAnyValue.class)); instance.register(AnyValue.class, instance.loadDecoder(AnyValue.DefaultAnyValue.class));
@@ -38,6 +35,12 @@ public final class JsonFactory extends ConvertFactory<JsonReader, JsonWriter> {
private JsonFactory(JsonFactory parent, boolean tiny) { private JsonFactory(JsonFactory parent, boolean tiny) {
super(parent, tiny); super(parent, tiny);
if (parent == null) {
this.register(InetAddress.class, InetAddressSimpledCoder.InetAddressJsonSimpledCoder.instance);
this.register(InetSocketAddress.class, InetAddressSimpledCoder.InetSocketAddressJsonSimpledCoder.instance);
this.register(DLong.class, DLongSimpledCoder.DLongJsonSimpledCoder.instance);
this.register(BigInteger.class, BigIntegerSimpledCoder.BigIntegerJsonSimpledCoder.instance);
}
} }
@Override @Override

View File

@@ -87,19 +87,19 @@ class JsonStreamWriter extends JsonByteBufferWriter {
* @param value String值 * @param value String值
*/ */
@Override @Override
public void writeTo(final boolean quote, final String value) { public void writeLatin1To(final boolean quote, final String value) {
char[] chs = Utility.charArray(value); char[] chs = Utility.charArray(value);
writeTo(quote, chs, 0, chs.length); writeTo(quote, chs, 0, chs.length);
} }
@Override @Override
public void writeInt(int value) { public void writeInt(int value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override
public void writeLong(long value) { public void writeLong(long value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override

View File

@@ -89,7 +89,7 @@ public class JsonWriter extends Writer {
* @param quote 是否加双引号 * @param quote 是否加双引号
* @param value 非null且不含需要转义的字符的String值 * @param value 非null且不含需要转义的字符的String值
*/ */
public void writeTo(final boolean quote, final String value) { public void writeLatin1To(final boolean quote, final String value) {
int len = value.length(); int len = value.length();
expand(len + (quote ? 2 : 0)); expand(len + (quote ? 2 : 0));
if (quote) content[count++] = '"'; if (quote) content[count++] = '"';
@@ -162,13 +162,13 @@ public class JsonWriter extends Writer {
@Override @Override
public final void writeFieldName(String fieldName, Type fieldType, int fieldPos) { public final void writeFieldName(String fieldName, Type fieldType, int fieldPos) {
if (this.comma) writeTo(','); if (this.comma) writeTo(',');
writeTo(true, fieldName); writeLatin1To(true, fieldName);
writeTo(':'); writeTo(':');
} }
@Override @Override
public final void writeSmallString(String value) { public final void writeSmallString(String value) {
writeTo(true, value); writeLatin1To(true, value);
} }
@Override @Override
@@ -314,17 +314,17 @@ public class JsonWriter extends Writer {
@Override @Override
public final void writeFloat(float value) { public final void writeFloat(float value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override
public final void writeDouble(double value) { public final void writeDouble(double value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override
public final void writeWrapper(StringWrapper value) { public final void writeWrapper(StringWrapper value) {
writeTo(false, String.valueOf(value)); writeLatin1To(false, String.valueOf(value));
} }
@Override @Override

View File

@@ -14,7 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.*; import java.util.function.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.ObjectPool; import org.redkale.util.*;
/** /**
* *
@@ -118,7 +118,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler); public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
@Override @Override
public abstract int write(ByteBuffer src) throws IOException; public abstract int write(ByteBuffer src) throws IOException;
@@ -141,22 +140,40 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
this.readBuffer = null; this.readBuffer = null;
return rs; return rs;
} }
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// return ((IOThread) thread).getBufferPool().get();
// }
return bufferSupplier.get(); return bufferSupplier.get();
} }
public void offerBuffer(Buffer buffer) { public void offerBuffer(Buffer buffer) {
if (buffer == null) return; if (buffer == null) return;
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer);
// return;
// }
bufferConsumer.accept((ByteBuffer) buffer); bufferConsumer.accept((ByteBuffer) buffer);
} }
public void offerBuffer(Buffer... buffers) { public void offerBuffer(Buffer... buffers) {
if (buffers == null) return; if (buffers == null) return;
Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool();
// }
for (Buffer buffer : buffers) { for (Buffer buffer : buffers) {
bufferConsumer.accept((ByteBuffer) buffer); consumer.accept((ByteBuffer) buffer);
} }
} }
public ByteBuffer pollWriteBuffer() { public ByteBuffer pollWriteBuffer() {
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// return ((IOThread) thread).getBufferPool().get();
// }
return bufferSupplier.get(); return bufferSupplier.get();
} }
@@ -189,7 +206,12 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
} }
} }
if (this.readBuffer != null) { if (this.readBuffer != null) {
bufferConsumer.accept(this.readBuffer); Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool();
// }
consumer.accept(this.readBuffer);
} }
if (attributes == null) return; if (attributes == null) return;
try { try {

View File

@@ -0,0 +1,61 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import org.redkale.util.*;
/**
* 协议处理的IO线程类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class IOThread extends Thread {
protected Thread localThread;
protected final ExecutorService executor;
protected ObjectPool<ByteBuffer> bufferPool;
public IOThread(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, Runnable runner) {
super(runner);
this.executor = executor;
this.bufferPool = bufferPool;
this.setDaemon(true);
}
public void runAsync(Runnable runner) {
executor.execute(runner);
}
public ExecutorService getExecutor() {
return executor;
}
public ObjectPool<ByteBuffer> getBufferPool() {
return bufferPool;
}
@Override
public void run() {
this.localThread = Thread.currentThread();
super.run();
}
public boolean inSameThread() {
return this.localThread == Thread.currentThread();
}
public boolean inSameThread(Thread thread) {
return this.localThread == thread;
}
}

View File

@@ -60,7 +60,8 @@ public class PrepareRunner implements Runnable {
@Override @Override
public void completed(Integer count, ByteBuffer buffer) { public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) { if (count < 1) {
response.request.offerReadBuffer(buffer); buffer.clear();
channel.setReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常 channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel(); response.removeChannel();
response.finish(true); response.finish(true);
@@ -84,7 +85,8 @@ public class PrepareRunner implements Runnable {
@Override @Override
public void failed(Throwable exc, ByteBuffer buffer) { public void failed(Throwable exc, ByteBuffer buffer) {
response.request.offerReadBuffer(buffer); buffer.clear();
channel.setReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常 channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel(); response.removeChannel();
response.finish(true); response.finish(true);
@@ -97,7 +99,7 @@ public class PrepareRunner implements Runnable {
channel.dispose();// response.init(channel); 在调用之前异常 channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel(); response.removeChannel();
response.finish(true); response.finish(true);
if (te != null && context.logger.isLoggable(Level.FINEST)) { if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te); context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te);
} }
} }
@@ -116,7 +118,8 @@ public class PrepareRunner implements Runnable {
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
request.setMoredata(buffer); request.setMoredata(buffer);
} else { } else {
response.request.offerReadBuffer(buffer); buffer.clear();
channel.setReadBuffer(buffer);
} }
preparer.prepare(request, response); preparer.prepare(request, response);
} else { } else {
@@ -137,7 +140,8 @@ public class PrepareRunner implements Runnable {
if (attachment.hasRemaining()) { if (attachment.hasRemaining()) {
request.setMoredata(attachment); request.setMoredata(attachment);
} else { } else {
response.request.offerReadBuffer(attachment); attachment.clear();
channel.setReadBuffer(attachment);
} }
try { try {
preparer.prepare(request, response); preparer.prepare(request, response);
@@ -151,7 +155,8 @@ public class PrepareRunner implements Runnable {
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
preparer.illRequestCounter.incrementAndGet(); preparer.illRequestCounter.incrementAndGet();
response.request.offerReadBuffer(attachment); attachment.clear();
channel.setReadBuffer(attachment);
response.finish(true); response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);
} }
@@ -175,19 +180,4 @@ public class PrepareRunner implements Runnable {
return response.removeChannel(); return response.removeChannel();
} }
protected ByteBuffer pollReadBuffer(Request request) {
return request.pollReadBuffer();
}
protected ByteBuffer pollReadBuffer(Response response) {
return response.request.pollReadBuffer();
}
protected void offerReadBuffer(Request request, ByteBuffer buffer) {
request.offerReadBuffer(buffer);
}
protected void offerReadBuffer(Response response, ByteBuffer buffer) {
response.request.offerReadBuffer(buffer);
}
} }

View File

@@ -40,8 +40,6 @@ public abstract class Request<C extends Context> {
protected AsyncConnection channel; protected AsyncConnection channel;
protected ByteBuffer readBuffer;
/** /**
* properties 与 attributes 的区别在于调用recycle时 attributes会被清空而properties会保留; * properties 与 attributes 的区别在于调用recycle时 attributes会被清空而properties会保留;
* properties 通常存放需要永久绑定在request里的一些对象 * properties 通常存放需要永久绑定在request里的一些对象
@@ -67,23 +65,6 @@ public abstract class Request<C extends Context> {
return rs; return rs;
} }
protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected void offerReadBuffer(ByteBuffer buffer) {
if (buffer == null) return;
if (this.readBuffer == null) {
buffer.clear();
this.readBuffer = buffer;
} else {
bufferPool.accept(buffer);
}
}
/** /**
* 返回值Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 &gt;0: 需再读取的字节数。 * 返回值Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 &gt;0: 需再读取的字节数。
* *

View File

@@ -27,18 +27,12 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final C context; protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final ObjectPool<Response> responsePool; protected final ObjectPool<Response> responsePool;
protected final R request; protected final R request;
protected AsyncConnection channel; protected AsyncConnection channel;
protected ByteBuffer writeHeadBuffer;
protected ByteBuffer writeBodyBuffer;
private volatile boolean inited = true; private volatile boolean inited = true;
protected Object output; //输出的结果对象 protected Object output; //输出的结果对象
@@ -49,8 +43,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Servlet<C, R, ? extends Response<C, R>> servlet; protected Servlet<C, R, ? extends Response<C, R>> servlet;
private Supplier<ByteBuffer> bodyBufferSupplier;
private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() { private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
@@ -58,31 +50,21 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (attachment.hasRemaining()) { if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this); channel.write(attachment, attachment, this);
} else { } else {
offerResponseBuffer(attachment); channel.offerBuffer(attachment);
ByteBuffer data = request.removeMoredata();
final boolean more = data != null && request.keepAlive;
request.more = more;
finish(); finish();
if (more) new PrepareRunner(context, responsePool, request.channel, null, Response.this).run();
} }
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
offerResponseBuffer(attachment); channel.offerBuffer(attachment);
finish(true); finish(true);
} }
private void offerResponseBuffer(ByteBuffer attachment) {
if (writeHeadBuffer == null) {
if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeHeadBuffer = attachment;
}
} else if (writeBodyBuffer == null) {
if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeBodyBuffer = attachment;
}
} else {
bufferPool.accept(attachment);
}
}
}; };
private final CompletionHandler finishHandler2 = new CompletionHandler<Integer, ByteBuffer[]>() { private final CompletionHandler finishHandler2 = new CompletionHandler<Integer, ByteBuffer[]>() {
@@ -99,73 +81,36 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (index >= 0) { if (index >= 0) {
channel.write(attachments, index, attachments.length - index, attachments, this); channel.write(attachments, index, attachments.length - index, attachments, this);
} else { } else {
offerResponseBuffer(attachments); for (ByteBuffer attachment : attachments) {
channel.offerBuffer(attachment);
}
ByteBuffer data = request.removeMoredata();
final boolean more = data != null && request.keepAlive;
request.more = more;
finish(); finish();
if (more) new PrepareRunner(context, responsePool, request.channel, null, Response.this).run();
} }
} }
@Override @Override
public void failed(Throwable exc, final ByteBuffer[] attachments) { public void failed(Throwable exc, final ByteBuffer[] attachments) {
offerResponseBuffer(attachments); for (ByteBuffer attachment : attachments) {
channel.offerBuffer(attachment);
}
finish(true); finish(true);
} }
private void offerResponseBuffer(ByteBuffer[] attachments) {
int start = 0;
if (writeHeadBuffer == null && attachments.length > start) {
if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeHeadBuffer = attachments[start];
start++;
}
}
if (writeBodyBuffer == null && attachments.length > start) {
if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeBodyBuffer = attachments[start];
start++;
}
}
for (int i = start; i < attachments.length; i++) {
bufferPool.accept(attachments[i]);
}
}
}; };
protected Response(C context, final R request, ObjectPool<Response> responsePool) { protected Response(C context, final R request, ObjectPool<Response> responsePool) {
this.context = context; this.context = context;
this.request = request; this.request = request;
this.bufferPool = request.bufferPool;
this.responsePool = responsePool; this.responsePool = responsePool;
this.writeHeadBuffer = bufferPool.get();
this.writeBodyBuffer = bufferPool.get();
this.bodyBufferSupplier = () -> {
ByteBuffer buffer = writeBodyBuffer;
if (buffer == null) return bufferPool.get();
writeBodyBuffer = null;
return buffer;
};
}
protected ByteBuffer pollWriteReadBuffer() {
ByteBuffer buffer = this.writeHeadBuffer;
this.writeHeadBuffer = null;
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected ByteBuffer pollWriteBodyBuffer() {
ByteBuffer buffer = this.writeBodyBuffer;
this.writeBodyBuffer = null;
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected Supplier<ByteBuffer> getBodyBufferSupplier() {
return bodyBufferSupplier;
} }
protected void offerBuffer(ByteBuffer... buffers) { protected void offerBuffer(ByteBuffer... buffers) {
for (ByteBuffer buffer : buffers) { for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer); channel.offerBuffer(buffer);
} }
} }
@@ -278,7 +223,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void finish(final byte[] bs) { public void finish(final byte[] bs) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (this.context.bufferCapacity == bs.length) { if (this.context.bufferCapacity == bs.length) {
ByteBuffer buffer = this.bufferPool.get(); ByteBuffer buffer = channel.bufferSupplier.get();
buffer.put(bs); buffer.put(bs);
buffer.flip(); buffer.flip();
this.finish(buffer); this.finish(buffer);
@@ -289,33 +234,33 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void finish(ByteBuffer buffer) { public void finish(ByteBuffer buffer) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
ByteBuffer data = this.request.removeMoredata();
final AsyncConnection conn = this.channel; final AsyncConnection conn = this.channel;
final boolean more = data != null && this.request.keepAlive; // ByteBuffer data = this.request.removeMoredata();
this.request.more = more; // final boolean more = data != null && this.request.keepAlive;
// this.request.more = more;
conn.write(buffer, buffer, finishHandler); conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); // if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(boolean kill, ByteBuffer buffer) { public void finish(boolean kill, ByteBuffer buffer) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (kill) refuseAlive(); if (kill) refuseAlive();
ByteBuffer data = this.request.removeMoredata();
final AsyncConnection conn = this.channel; final AsyncConnection conn = this.channel;
final boolean more = data != null && this.request.keepAlive; // ByteBuffer data = this.request.removeMoredata();
this.request.more = more; // final boolean more = data != null && this.request.keepAlive;
// this.request.more = more;
conn.write(buffer, buffer, finishHandler); conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); // if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(ByteBuffer... buffers) { public void finish(ByteBuffer... buffers) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
final AsyncConnection conn = this.channel; final AsyncConnection conn = this.channel;
ByteBuffer data = this.request.removeMoredata(); // ByteBuffer data = this.request.removeMoredata();
final boolean more = data != null && this.request.keepAlive; // final boolean more = data != null && this.request.keepAlive;
this.request.more = more; // this.request.more = more;
conn.write(buffers, buffers, finishHandler2); conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); // if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(boolean kill, ByteBuffer... buffers) { public void finish(boolean kill, ByteBuffer... buffers) {
@@ -337,14 +282,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
channel.write(buffer, attachment, this); channel.write(buffer, attachment, this);
} else { } else {
bufferPool.accept(buffer); channel.offerBuffer(buffer);
if (handler != null) handler.completed(result, attachment); if (handler != null) handler.completed(result, attachment);
} }
} }
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
bufferPool.accept(buffer); channel.offerBuffer(buffer);
if (handler != null) handler.failed(exc, attachment); if (handler != null) handler.failed(exc, attachment);
} }
@@ -362,7 +307,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
index = i; index = i;
break; break;
} }
bufferPool.accept(buffers[i]); channel.offerBuffer(buffers[i]);
} }
if (index == 0) { if (index == 0) {
channel.write(buffers, attachment, this); channel.write(buffers, attachment, this);
@@ -376,7 +321,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) { for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer); channel.offerBuffer(buffer);
} }
if (handler != null) handler.failed(exc, attachment); if (handler != null) handler.failed(exc, attachment);
} }

View File

@@ -129,7 +129,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024); this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(protocol) ? 1350 : 32 * 1024); int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(protocol) ? 1350 : 32 * 1024);
this.bufferCapacity = "UDP".equalsIgnoreCase(protocol) ? bufCapacity : (bufCapacity < 8 * 1024 ? 8 * 1024 : bufCapacity); this.bufferCapacity = "UDP".equalsIgnoreCase(protocol) ? bufCapacity : (bufCapacity < 8 * 1024 ? 8 * 1024 : bufCapacity);
this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 32); this.threads = config.getIntValue("threads", Math.max(8, Runtime.getRuntime().availableProcessors() * 2));
this.bufferPoolSize = config.getIntValue("bufferPoolSize", this.threads * 4); this.bufferPoolSize = config.getIntValue("bufferPoolSize", this.threads * 4);
this.responsePoolSize = config.getIntValue("responsePoolSize", this.threads * 2); this.responsePoolSize = config.getIntValue("responsePoolSize", this.threads * 2);
this.name = config.getValue("name", "Server-" + protocol + "-" + this.address.getPort()); this.name = config.getValue("name", "Server-" + protocol + "-" + this.address.getPort());
@@ -153,7 +153,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
final String n = name; final String n = name;
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> { this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new WorkThread(executor, r); Thread t = new WorkThread(executor, r);
t.setName(n + "-ServletThread-" + f.format(counter.incrementAndGet())); t.setName("Redkale-" + n + "-ServletThread-" + f.format(counter.incrementAndGet()));
return t; return t;
}); });
} }

View File

@@ -104,7 +104,8 @@ public class TcpAioProtocolServer extends ProtocolServer {
AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel, AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel,
context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
context.runAsync(new PrepareRunner(context, responsePool, conn, null, null)); //context.runAsync(new PrepareRunner(context, responsePool, conn, null, null));
new PrepareRunner(context, responsePool, conn, null, null).run();
} catch (Throwable e) { } catch (Throwable e) {
context.logger.log(Level.INFO, channel + " accept error", e); context.logger.log(Level.INFO, channel + " accept error", e);
} }

View File

@@ -119,7 +119,7 @@ public class TransportFactory {
if (this.checkinterval < 2) this.checkinterval = 2; if (this.checkinterval < 2) this.checkinterval = 2;
} }
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, this.getClass().getSimpleName() + "-TransportFactoryTask-Thread"); final Thread t = new Thread(r, "Redkale-" + this.getClass().getSimpleName() + "-Schedule-Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
@@ -162,7 +162,7 @@ public class TransportFactory {
ExecutorService transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> { ExecutorService transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r);
t.setDaemon(true); t.setDaemon(true);
t.setName("Transport-Thread-" + counter.incrementAndGet()); t.setName("Redkale-Transport-Thread-" + counter.incrementAndGet());
return t; return t;
}); });
AsynchronousChannelGroup transportGroup = null; AsynchronousChannelGroup transportGroup = null;

View File

@@ -37,7 +37,7 @@ public class HttpResourceServlet extends HttpServlet {
public WatchThread(File root) throws IOException { public WatchThread(File root) throws IOException {
this.root = root; this.root = root;
this.setName("HttpResourceServlet-Watch-Thread"); this.setName("Redkale-HttpResourceServlet-Watch-Thread");
this.setDaemon(true); this.setDaemon(true);
this.watcher = this.root.toPath().getFileSystem().newWatchService(); this.watcher = this.root.toPath().getFileSystem().newWatchService();
} }

View File

@@ -50,6 +50,10 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
protected static final byte[] connectAliveBytes = "Connection: keep-alive\r\n".getBytes(); protected static final byte[] connectAliveBytes = "Connection: keep-alive\r\n".getBytes();
private static final byte[] fillContentLengthBytes = ("Content-Length: \r\n").getBytes();
private static final ZoneId ZONE_GMT = ZoneId.of("GMT");
private static final Set<OpenOption> options = new HashSet<>(); private static final Set<OpenOption> options = new HashSet<>();
private static final Map<Integer, String> httpCodes = new HashMap<>(); private static final Map<Integer, String> httpCodes = new HashMap<>();
@@ -103,8 +107,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
httpCodes.put(505, "HTTP Version Not Supported"); httpCodes.put(505, "HTTP Version Not Supported");
} }
private static final ZoneId ZONE_GMT = ZoneId.of("GMT");
private int status = 200; private int status = 200;
private String contentType = ""; private String contentType = "";
@@ -113,9 +115,15 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private HttpCookie[] cookies; private HttpCookie[] cookies;
private boolean headsended = false; private int headWritedSize = -1; //0表示跳过header正数表示header的字节长度。
private ByteBuffer headBuffer;
private int headLenPos = -1;
private BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler; private BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler;
private Supplier<ByteBuffer> bodyBufferSupplier;
//------------------------------------------------ //------------------------------------------------
private final String plainContentType; private final String plainContentType;
@@ -163,6 +171,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.hasRender = renders != null && !renders.isEmpty(); this.hasRender = renders != null && !renders.isEmpty();
this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null; this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null;
this.contentType = this.plainContentType; this.contentType = this.plainContentType;
this.bodyBufferSupplier = () -> {
if (headWritedSize >= 0 || bufferHandler != null) return channel.pollWriteBuffer(); //bufferHandler 需要cached的请求不能带上header
if (contentLength < 0) contentLength = -2;
return createHeader();
};
} }
@Override @Override
@@ -185,12 +198,18 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.contentLength = -1; this.contentLength = -1;
this.contentType = null; this.contentType = null;
this.cookies = null; this.cookies = null;
this.headsended = false; this.headWritedSize = -1;
this.headBuffer = null;
this.headLenPos = -1;
this.header.clear(); this.header.clear();
this.bufferHandler = null; this.bufferHandler = null;
return super.recycle(); return super.recycle();
} }
protected Supplier<ByteBuffer> getBodyBufferSupplier() {
return bodyBufferSupplier;
}
@Override @Override
protected void init(AsyncConnection channel) { protected void init(AsyncConnection channel) {
super.init(channel); super.init(channel);
@@ -285,15 +304,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler()); return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler());
} }
/**
* 获取ByteBuffer生成器
*
* @return ByteBuffer生成器
*/
public Supplier<ByteBuffer> getBufferSupplier() {
return getBodyBufferSupplier();
}
/** /**
* 将对象以JSON格式输出 * 将对象以JSON格式输出
* *
@@ -637,7 +647,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finish(final String contentType, final byte[] bs) { public void finish(final String contentType, final byte[] bs) {
if (isClosed()) return; //避免重复关闭 if (isClosed()) return; //避免重复关闭
final byte[] content = bs == null ? new byte[0] : bs; final byte[] content = bs == null ? new byte[0] : bs;
if (!this.headsended) { if (this.headWritedSize < 0) {
this.contentType = contentType; this.contentType = contentType;
this.contentLength = content.length; this.contentLength = content.length;
ByteBuffer headbuf = createHeader(); ByteBuffer headbuf = createHeader();
@@ -681,7 +691,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override @Override
public void finish(boolean kill, ByteBuffer buffer) { public void finish(boolean kill, ByteBuffer buffer) {
if (isClosed()) return; //避免重复关闭 if (isClosed()) return; //避免重复关闭
if (!this.headsended) { if (this.headWritedSize < 0) {
this.contentLength = buffer == null ? 0 : buffer.remaining(); this.contentLength = buffer == null ? 0 : buffer.remaining();
ByteBuffer headbuf = createHeader(); ByteBuffer headbuf = createHeader();
headbuf.flip(); headbuf.flip();
@@ -719,7 +729,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
if (bufs != null) buffers = bufs; if (bufs != null) buffers = bufs;
} }
if (kill) refuseAlive(); if (kill) refuseAlive();
if (!this.headsended) { if (this.headWritedSize < 0) {
long len = 0; long len = 0;
for (ByteBuffer buf : buffers) { for (ByteBuffer buf : buffers) {
len += buf.remaining(); len += buf.remaining();
@@ -736,6 +746,17 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
super.finish(kill, newbuffers); super.finish(kill, newbuffers);
} }
} else { } else {
if (this.headLenPos > 0 && buffers[0] == headBuffer) {
long contentlen = -this.headWritedSize;
for (ByteBuffer buf : buffers) {
contentlen += buf.remaining();
}
byte[] lenBytes = String.valueOf(contentlen).getBytes();
int start = this.headLenPos - lenBytes.length;
for (int i = 0; i < lenBytes.length; i++) {
headBuffer.put(start + i, lenBytes[i]);
}
}
super.finish(kill, buffers); super.finish(kill, buffers);
} }
} }
@@ -749,7 +770,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param handler 异步回调函数 * @param handler 异步回调函数
*/ */
public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) { public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) { if (this.headWritedSize < 0) {
if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining(); if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining();
ByteBuffer headbuf = createHeader(); ByteBuffer headbuf = createHeader();
headbuf.flip(); headbuf.flip();
@@ -772,7 +793,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param handler 异步回调函数 * @param handler 异步回调函数
*/ */
public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) { public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) { if (this.headWritedSize < 0) {
if (this.contentLength < 0) { if (this.contentLength < 0) {
int len = 0; int len = 0;
if (buffers != null && buffers.length > 0) { if (buffers != null && buffers.length > 0) {
@@ -855,8 +876,10 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} }
this.contentLength = length; this.contentLength = length;
if (filename != null && !filename.isEmpty() && file != null) { if (filename != null && !filename.isEmpty() && file != null) {
if (this.header.getValue("Content-Disposition") == null) {
addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(filename, "UTF-8")); addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(filename, "UTF-8"));
} }
}
this.contentType = MimeType.getByFilename(filename == null || filename.isEmpty() ? file.getName() : filename); this.contentType = MimeType.getByFilename(filename == null || filename.isEmpty() ? file.getName() : filename);
if (this.contentType == null) this.contentType = "application/octet-stream"; if (this.contentType == null) this.contentType = "application/octet-stream";
String range = request.getHeader("Range"); String range = request.getHeader("Range");
@@ -897,14 +920,19 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
//Header大小不能超过一个ByteBuffer的容量 //Header大小不能超过一个ByteBuffer的容量
protected ByteBuffer createHeader() { protected ByteBuffer createHeader() {
this.headsended = true; ByteBuffer buffer = this.channel.pollWriteBuffer();
ByteBuffer buffer = this.pollWriteReadBuffer(); int oldpos = buffer.position();
if (this.status == 200) { if (this.status == 200) {
buffer.put(status200Bytes); buffer.put(status200Bytes);
} else { } else {
buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes()); buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes());
} }
if (this.contentLength >= 0) buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes()); if (this.contentLength >= 0) {
buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes());
} else if (this.contentLength == -2) {
buffer.put(fillContentLengthBytes);
this.headLenPos = buffer.position() - 2; //去掉\r\n
}
if (!this.request.isWebSocket()) { if (!this.request.isWebSocket()) {
if (this.contentType == this.jsonContentType) { if (this.contentType == this.jsonContentType) {
buffer.put(this.jsonContentTypeBytes); buffer.put(this.jsonContentTypeBytes);
@@ -976,6 +1004,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} }
} }
buffer.put(LINE); buffer.put(LINE);
this.headWritedSize = buffer.position() - oldpos;
this.headBuffer = buffer;
return buffer; return buffer;
} }
@@ -1001,7 +1031,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @return HttpResponse * @return HttpResponse
*/ */
public HttpResponse skipHeader() { public HttpResponse skipHeader() {
this.headsended = true; this.headWritedSize = 0;
return this; return this;
} }
@@ -1208,7 +1238,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
bufferPool.accept(attachment); channel.offerBuffer(attachment);
finish(true); finish(true);
try { try {
filechannel.close(); filechannel.close();

View File

@@ -59,7 +59,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
public void finish(final int retcode, final BsonWriter out) { public void finish(final int retcode, final BsonWriter out) {
if (out == null) { if (out == null) {
final ByteBuffer buffer = pollWriteReadBuffer(); final ByteBuffer buffer = channel.pollWriteBuffer();
fillHeader(buffer, 0, retcode); fillHeader(buffer, 0, retcode);
finish(buffer); finish(buffer);
return; return;

View File

@@ -70,18 +70,17 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} }
c = c1; c = c1;
} catch (SQLException se) { } catch (SQLException se) {
if (info.tableStrategy == null || !info.isTableNotExist(se)) throw se; if (info.getTableStrategy() == null || !info.isTableNotExist(se)) throw se;
synchronized (info.tables) { synchronized (info.disTableLock()) {
final String oldTable = info.table;
final String catalog = conn.getCatalog(); final String catalog = conn.getCatalog();
final String newTable = info.getTable(entitys[0]); final String newTable = info.getTable(entitys[0]);
final String tablekey = newTable.indexOf('.') > 0 ? newTable : (catalog + '.' + newTable); final String tablekey = newTable.indexOf('.') > 0 ? newTable : (catalog + '.' + newTable);
if (!info.tables.contains(tablekey)) { if (!info.containsDisTable(tablekey)) {
try { try {
Statement st = conn.createStatement(); Statement st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable)); st.execute(info.getTableCopySQL(newTable));
st.close(); st.close();
info.tables.add(tablekey); info.addDisTable(tablekey);
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表 } catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (newTable.indexOf('.') > 0 && info.isTableNotExist(se)) { if (newTable.indexOf('.') > 0 && info.isTableNotExist(se)) {
Statement st; Statement st;
@@ -94,14 +93,14 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} }
try { try {
st = conn.createStatement(); st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable)); st.execute(info.getTableCopySQL(newTable));
st.close(); st.close();
info.tables.add(tablekey); info.addDisTable(tablekey);
} catch (SQLException sqle2) { } catch (SQLException sqle2) {
logger.log(Level.SEVERE, "create table2(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle2); logger.log(Level.SEVERE, "create table2(" + info.getTableCopySQL(newTable) + ") error", sqle2);
} }
} else { } else {
logger.log(Level.SEVERE, "create table(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle); logger.log(Level.SEVERE, "create table(" + info.getTableCopySQL(newTable) + ") error", sqle);
} }
} }
} }
@@ -143,7 +142,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} catch (SQLException e) { } catch (SQLException e) {
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) writePool.offerConnection(conn); if (conn != null) writePool.offerConnection(conn);
} }
@@ -201,7 +200,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} catch (SQLException e) { } catch (SQLException e) {
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) writePool.offerConnection(conn); if (conn != null) writePool.offerConnection(conn);
} }
@@ -222,7 +221,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1); if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) writePool.offerConnection(conn); if (conn != null) writePool.offerConnection(conn);
} }
@@ -243,7 +242,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1); if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) writePool.offerConnection(conn); if (conn != null) writePool.offerConnection(conn);
} }
@@ -296,7 +295,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} catch (SQLException e) { } catch (SQLException e) {
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) writePool.offerConnection(conn); if (conn != null) writePool.offerConnection(conn);
} }
@@ -330,7 +329,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} catch (SQLException e) { } catch (SQLException e) {
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) writePool.offerConnection(conn); if (conn != null) writePool.offerConnection(conn);
} }
@@ -339,12 +338,12 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
@Override @Override
protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) { protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) {
Connection conn = null; Connection conn = null;
final Map map = new HashMap<>();
try { try {
conn = readPool.poll(); conn = readPool.poll();
//conn.setReadOnly(true); //conn.setReadOnly(true);
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql); ResultSet set = stmt.executeQuery(sql);
final Map map = new HashMap<>();
if (set.next()) { if (set.next()) {
int index = 0; int index = 0;
for (FilterFuncColumn ffc : columns) { for (FilterFuncColumn ffc : columns) {
@@ -360,9 +359,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close(); stmt.close();
return CompletableFuture.completedFuture(map); return CompletableFuture.completedFuture(map);
} catch (SQLException e) { } catch (SQLException e) {
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(map);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }
@@ -385,9 +385,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close(); stmt.close();
return CompletableFuture.completedFuture(rs); return CompletableFuture.completedFuture(rs);
} catch (SQLException e) { } catch (SQLException e) {
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(defVal);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }
@@ -396,11 +397,11 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
@Override @Override
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) { protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) {
Connection conn = null; Connection conn = null;
Map<K, N> rs = new LinkedHashMap<>();
try { try {
conn = readPool.poll(); conn = readPool.poll();
//conn.setReadOnly(true); //conn.setReadOnly(true);
final Statement stmt = conn.createStatement(); final Statement stmt = conn.createStatement();
Map<K, N> rs = new LinkedHashMap<>();
ResultSet set = stmt.executeQuery(sql); ResultSet set = stmt.executeQuery(sql);
ResultSetMetaData rsd = set.getMetaData(); ResultSetMetaData rsd = set.getMetaData();
boolean smallint = rsd == null ? false : rsd.getColumnType(1) == Types.SMALLINT; boolean smallint = rsd == null ? false : rsd.getColumnType(1) == Types.SMALLINT;
@@ -411,9 +412,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close(); stmt.close();
return CompletableFuture.completedFuture(rs); return CompletableFuture.completedFuture(rs);
} catch (SQLException e) { } catch (SQLException e) {
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(rs);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }
@@ -433,10 +435,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
ps.close(); ps.close();
return CompletableFuture.completedFuture(rs); return CompletableFuture.completedFuture(rs);
} catch (SQLException e) { } catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(null); if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(null);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }
@@ -460,10 +462,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
ps.close(); ps.close();
return CompletableFuture.completedFuture(val == null ? defValue : val); return CompletableFuture.completedFuture(val == null ? defValue : val);
} catch (SQLException e) { } catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(defValue); if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(defValue);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }
@@ -483,10 +485,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
if (info.isLoggable(logger, Level.FINEST, sql)) logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql); if (info.isLoggable(logger, Level.FINEST, sql)) logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
return CompletableFuture.completedFuture(rs); return CompletableFuture.completedFuture(rs);
} catch (SQLException e) { } catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(false); if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(false);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }
@@ -557,10 +559,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
ps.close(); ps.close();
return CompletableFuture.completedFuture(new Sheet<>(total, list)); return CompletableFuture.completedFuture(new Sheet<>(total, list));
} catch (SQLException e) { } catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(new Sheet<>()); if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(new Sheet<>());
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;//return CompletableFuture.failedFuture(e);
} finally { } finally {
if (conn != null) readPool.offerConnection(conn); if (conn != null) readPool.offerConnection(conn);
} }

View File

@@ -7,7 +7,7 @@ package org.redkale.source;
import java.io.*; import java.io.*;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.net.URL; import java.net.*;
import java.util.*; import java.util.*;
import javax.xml.stream.*; import javax.xml.stream.*;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
@@ -139,7 +139,7 @@ public final class DataSources {
public static DataSource createDataSource(final String unitName) throws IOException { public static DataSource createDataSource(final String unitName) throws IOException {
return createDataSource(unitName, System.getProperty(DATASOURCE_CONFPATH) == null return createDataSource(unitName, System.getProperty(DATASOURCE_CONFPATH) == null
? DataJdbcSource.class.getResource("/META-INF/persistence.xml") ? DataJdbcSource.class.getResource("/META-INF/persistence.xml")
: new File(System.getProperty(DATASOURCE_CONFPATH)).toURI().toURL()); : (System.getProperty(DATASOURCE_CONFPATH, "").contains("://") ? URI.create(System.getProperty(DATASOURCE_CONFPATH)).toURL() : new File(System.getProperty(DATASOURCE_CONFPATH)).toURI().toURL()));
} }
public static DataSource createDataSource(final String unitName, URL persistxml) throws IOException { public static DataSource createDataSource(final String unitName, URL persistxml) throws IOException {

View File

@@ -91,7 +91,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
} else if (s.length() == 2) { } else if (s.length() == 2) {
s = "0" + s; s = "0" + s;
} }
t.setName(cname + "-Thread-" + s); t.setName("Redkale-"+cname + "-Thread-" + s);
t.setUncaughtExceptionHandler(ueh); t.setUncaughtExceptionHandler(ueh);
return t; return t;
}); });
@@ -209,6 +209,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override @Override
public void destroy(AnyValue config) { public void destroy(AnyValue config) {
if (this.executor != null) this.executor.shutdownNow(); if (this.executor != null) this.executor.shutdownNow();
if (readPool != null) readPool.close();
if (writePool != null) writePool.close();
} }
@Local @Local
@@ -466,7 +468,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) { public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node); if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node);
return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> { return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -481,14 +483,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor()); return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> { if (isAsync()) return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
deleteCache(info, rs, flipper, node); deleteCache(info, rs, flipper, node);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -542,7 +544,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int clearTable(Class<T> clazz, FilterNode node) { public <T> int clearTable(Class<T> clazz, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return clearTableCache(info, node); if (isOnlyCache(info)) return clearTableCache(info, node);
return DataSqlSource.this.clearTableCompose(info, node).whenComplete((rs, t) -> { return this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -562,14 +564,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> clearTableCache(info, node), getExecutor()); return CompletableFuture.supplyAsync(() -> clearTableCache(info, node), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.clearTableCompose(info, node).whenComplete((rs, t) -> { if (isAsync()) return this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
clearTableCache(info, node); clearTableCache(info, node);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.clearTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.clearTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -594,7 +596,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int dropTable(Class<T> clazz, FilterNode node) { public <T> int dropTable(Class<T> clazz, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return dropTableCache(info, node); if (isOnlyCache(info)) return dropTableCache(info, node);
return DataSqlSource.this.dropTableCompose(info, node).whenComplete((rs, t) -> { return this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -614,14 +616,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> dropTableCache(info, node), getExecutor()); return CompletableFuture.supplyAsync(() -> dropTableCache(info, node), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.dropTableCompose(info, node).whenComplete((rs, t) -> { if (isAsync()) return this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
dropTableCache(info, node); dropTableCache(info, node);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.dropTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.dropTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -812,7 +814,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int updateColumn(Class<T> clazz, String column, Serializable colval, FilterNode node) { public <T> int updateColumn(Class<T> clazz, String column, Serializable colval, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, column, colval, node); if (isOnlyCache(info)) return updateCache(info, -1, column, colval, node);
return DataSqlSource.this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> { return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -827,14 +829,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, colval, node), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, colval, node), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> { if (isAsync()) return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, column, colval, node); updateCache(info, rs, column, colval, node);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, column, colval, node).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, column, colval, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -886,7 +888,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (values == null || values.length < 1) return -1; if (values == null || values.length < 1) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, pk, values); if (isOnlyCache(info)) return updateCache(info, -1, pk, values);
return DataSqlSource.this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> { return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -902,14 +904,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, pk, values), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, pk, values), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> { if (isAsync()) return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, pk, values); updateCache(info, rs, pk, values);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, pk, values).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, pk, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -967,7 +969,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (values == null || values.length < 1) return -1; if (values == null || values.length < 1) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values); if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values);
return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> { return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -983,14 +985,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> { if (isAsync()) return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, node, flipper, values); updateCache(info, rs, node, flipper, values);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -1063,7 +1065,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Class<T> clazz = (Class) entity.getClass(); Class<T> clazz = (Class) entity.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, false, entity, null, selects); if (isOnlyCache(info)) return updateCache(info, -1, false, entity, null, selects);
return DataSqlSource.this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> { return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -1080,14 +1082,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, entity, null, selects), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, entity, null, selects), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> { if (isAsync()) return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, false, entity, null, selects); updateCache(info, rs, false, entity, null, selects);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, false, entity, null, selects).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, false, entity, null, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -1102,7 +1104,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Class<T> clazz = (Class) entity.getClass(); Class<T> clazz = (Class) entity.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, true, entity, node, selects); if (isOnlyCache(info)) return updateCache(info, -1, true, entity, node, selects);
return DataSqlSource.this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> { return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -1119,14 +1121,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) { if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, entity, node, selects), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, entity, node, selects), getExecutor());
} }
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> { if (isAsync()) return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, true, entity, node, selects); updateCache(info, rs, true, entity, node, selects);
} }
}); });
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, true, entity, node, selects).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, true, entity, node, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -1601,7 +1603,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache(); final EntityCache<T> cache = info.getCache();
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) return cache.find(selects, node); if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) return cache.find(selects, node);
return DataSqlSource.this.findCompose(info, selects, node).join(); return this.findCompose(info, selects, node).join();
} }
@Override @Override
@@ -1611,8 +1613,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) { if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) {
return CompletableFuture.completedFuture(cache.find(selects, node)); return CompletableFuture.completedFuture(cache.find(selects, node));
} }
if (isAsync()) return DataSqlSource.this.findCompose(info, selects, node); if (isAsync()) return this.findCompose(info, selects, node);
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.findCompose(info, selects, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> this.findCompose(info, selects, node).join(), getExecutor());
} }
protected <T> CompletableFuture<T> findCompose(final EntityInfo<T> info, final SelectColumn selects, final FilterNode node) { protected <T> CompletableFuture<T> findCompose(final EntityInfo<T> info, final SelectColumn selects, final FilterNode node) {
@@ -1701,7 +1703,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node); Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return val; if (cache.isFullLoaded() || val != null) return val;
} }
return DataSqlSource.this.findColumnCompose(info, column, defValue, node).join(); return this.findColumnCompose(info, column, defValue, node).join();
} }
@Override @Override
@@ -1712,8 +1714,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node); Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val); if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val);
} }
if (isAsync()) return DataSqlSource.this.findColumnCompose(info, column, defValue, node); if (isAsync()) return this.findColumnCompose(info, column, defValue, node);
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.findColumnCompose(info, column, defValue, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> this.findColumnCompose(info, column, defValue, node).join(), getExecutor());
} }
protected <T> CompletableFuture<Serializable> findColumnCompose(final EntityInfo<T> info, String column, final Serializable defValue, final FilterNode node) { protected <T> CompletableFuture<Serializable> findColumnCompose(final EntityInfo<T> info, String column, final Serializable defValue, final FilterNode node) {
@@ -1773,7 +1775,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node); boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return rs; if (rs || cache.isFullLoaded()) return rs;
} }
return DataSqlSource.this.existsCompose(info, node).join(); return this.existsCompose(info, node).join();
} }
@Override @Override
@@ -1784,8 +1786,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node); boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs); if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs);
} }
if (isAsync()) return DataSqlSource.this.existsCompose(info, node); if (isAsync()) return this.existsCompose(info, node);
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.existsCompose(info, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> this.existsCompose(info, node).join(), getExecutor());
} }
protected <T> CompletableFuture<Boolean> existsCompose(final EntityInfo<T> info, FilterNode node) { protected <T> CompletableFuture<Boolean> existsCompose(final EntityInfo<T> info, FilterNode node) {

View File

@@ -89,16 +89,16 @@ public final class EntityInfo<T> {
final String notcontainSQL; final String notcontainSQL;
//用于判断表不存在的使用, 多个SQLState用;隔开 //用于判断表不存在的使用, 多个SQLState用;隔开
final String tablenotexistSqlstates; private final String tablenotexistSqlstates;
//用于复制表结构使用 //用于复制表结构使用
final String tablecopySQL; private final String tablecopySQL;
//用于存在database.table_20160202类似这种分布式表 //用于存在database.table_20160202类似这种分布式表
final Set<String> tables = new HashSet<>(); private final Set<String> tables = new HashSet<>();
//分表 策略 //分表 策略
final DistributeTableStrategy<T> tableStrategy; private final DistributeTableStrategy<T> tableStrategy;
//根据主键查找单个对象的SQL //根据主键查找单个对象的SQL
private final String queryPrepareSQL; private final String queryPrepareSQL;
@@ -239,7 +239,7 @@ public final class EntityInfo<T> {
} }
//--------------------------------------------- //---------------------------------------------
Table t = type.getAnnotation(Table.class); Table t = type.getAnnotation(Table.class);
if (type.getAnnotation(VirtualEntity.class) != null || "memory".equalsIgnoreCase(source.getType())) { if (type.getAnnotation(VirtualEntity.class) != null || (source == null || "memory".equalsIgnoreCase(source.getType()))) {
this.table = null; this.table = null;
BiFunction<DataSource, Class, List> loader = null; BiFunction<DataSource, Class, List> loader = null;
try { try {
@@ -510,6 +510,18 @@ public final class EntityInfo<T> {
return tableStrategy; return tableStrategy;
} }
public Object disTableLock() {
return tables;
}
public boolean containsDisTable(String tablekey) {
return tables.contains(tablekey);
}
public void addDisTable(String tablekey) {
tables.add(tablekey);
}
public String getTableNotExistSqlStates2() { public String getTableNotExistSqlStates2() {
return tablenotexistSqlstates; return tablenotexistSqlstates;
} }

View File

@@ -31,6 +31,11 @@ public enum FilterExpress {
NOTLIKE("NOT LIKE"), NOTLIKE("NOT LIKE"),
IGNORECASELIKE("LIKE"), //不区分大小写的 LIKE IGNORECASELIKE("LIKE"), //不区分大小写的 LIKE
IGNORECASENOTLIKE("NOT LIKE"), //不区分大小写的 NOT LIKE IGNORECASENOTLIKE("NOT LIKE"), //不区分大小写的 NOT LIKE
LENGTH_EQUAL("="), //字符串值的长度
LENGTH_LESSTHAN("<"), //字符串值的长度 <
LENGTH_LESSTHANOREQUALTO("<="), //字符串值的长度 <=
LENGTH_GREATERTHAN(">"), //字符串值的长度 >
LENGTH_GREATERTHANOREQUALTO(">="), //字符串值的长度 >=
CONTAIN("CONTAIN"), //包含, 相当于反向LIKE CONTAIN("CONTAIN"), //包含, 相当于反向LIKE
NOTCONTAIN("NOT CONTAIN"), //不包含, 相当于反向LIKE NOTCONTAIN("NOT CONTAIN"), //不包含, 相当于反向LIKE

View File

@@ -397,7 +397,10 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
if (express == NOTCONTAIN) return info.notcontainSQL.replace("${column}", info.getSQLColumn(talis, column)).replace("${keystr}", val); if (express == NOTCONTAIN) return info.notcontainSQL.replace("${column}", info.getSQLColumn(talis, column)).replace("${keystr}", val);
if (express == IGNORECASENOTCONTAIN) return info.notcontainSQL.replace("${column}", "LOWER(" + info.getSQLColumn(talis, column) + ")").replace("${keystr}", val); if (express == IGNORECASENOTCONTAIN) return info.notcontainSQL.replace("${column}", "LOWER(" + info.getSQLColumn(talis, column) + ")").replace("${keystr}", val);
if (express == IGNORECASEEQUAL || express == IGNORECASENOTEQUAL || express == IGNORECASELIKE || express == IGNORECASENOTLIKE) { if (express == LENGTH_EQUAL || express == LENGTH_LESSTHAN || express == LENGTH_LESSTHANOREQUALTO
|| express == LENGTH_GREATERTHAN || express == LENGTH_GREATERTHANOREQUALTO) {
sb.append("LENGTH(").append(info.getSQLColumn(talis, column)).append(')');
} else if (express == IGNORECASEEQUAL || express == IGNORECASENOTEQUAL || express == IGNORECASELIKE || express == IGNORECASENOTLIKE) {
sb.append("LOWER(").append(info.getSQLColumn(talis, column)).append(')'); sb.append("LOWER(").append(info.getSQLColumn(talis, column)).append(')');
if (fk) val = "LOWER(" + info.getSQLColumn(talis, ((FilterKey) val0).getColumn()) + ')'; if (fk) val = "LOWER(" + info.getSQLColumn(talis, ((FilterKey) val0).getColumn()) + ')';
} else { } else {
@@ -1406,6 +1409,81 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
return "LOWER(" + field + ") " + express.value() + ' ' + formatToString(valstr2); return "LOWER(" + field + ") " + express.value() + ' ' + formatToString(valstr2);
} }
}; };
case LENGTH_EQUAL:
final int intval = ((Number) val).intValue();
return new Predicate<T>() {
@Override
public boolean test(T t) {
Object rs = attr.get(t);
return (rs == null && 0 == intval) || (rs != null && rs.toString().length() == intval);
}
@Override
public String toString() {
return "LENGTH(" + field + ") " + express.value() + ' ' + intval;
}
};
case LENGTH_LESSTHAN:
final int intval2 = ((Number) val).intValue();
return new Predicate<T>() {
@Override
public boolean test(T t) {
Object rs = attr.get(t);
return (rs == null && 0 < intval2) || (rs != null && rs.toString().length() < intval2);
}
@Override
public String toString() {
return "LENGTH(" + field + ") " + express.value() + ' ' + intval2;
}
};
case LENGTH_LESSTHANOREQUALTO:
final int intval3 = ((Number) val).intValue();
return new Predicate<T>() {
@Override
public boolean test(T t) {
Object rs = attr.get(t);
return (rs == null && 0 <= intval3) || (rs != null && rs.toString().length() <= intval3);
}
@Override
public String toString() {
return "LENGTH(" + field + ") " + express.value() + ' ' + intval3;
}
};
case LENGTH_GREATERTHAN:
final int intval4 = ((Number) val).intValue();
return new Predicate<T>() {
@Override
public boolean test(T t) {
Object rs = attr.get(t);
return (rs == null && 0 > intval4) || (rs != null && rs.toString().length() > intval4);
}
@Override
public String toString() {
return "LENGTH(" + field + ") " + express.value() + ' ' + intval4;
}
};
case LENGTH_GREATERTHANOREQUALTO:
final int intval5 = ((Number) val).intValue();
return new Predicate<T>() {
@Override
public boolean test(T t) {
Object rs = attr.get(t);
return (rs == null && 0 >= intval5) || (rs != null && rs.toString().length() >= intval5);
}
@Override
public String toString() {
return "LENGTH(" + field + ") " + express.value() + ' ' + intval5;
}
};
case CONTAIN: case CONTAIN:
return fk ? new Predicate<T>() { return fk ? new Predicate<T>() {

View File

@@ -363,6 +363,9 @@ public final class FilterNodeBean<T extends FilterBean> implements Comparable<Fi
sb.append(col).append(' ').append(express.value()); sb.append(col).append(' ').append(express.value());
} else if (express == ISEMPTY || express == ISNOTEMPTY) { } else if (express == ISEMPTY || express == ISNOTEMPTY) {
sb.append(col).append(' ').append(express.value()).append(" ''"); sb.append(col).append(' ').append(express.value()).append(" ''");
} else if (express == LENGTH_EQUAL || express == LENGTH_LESSTHAN || express == LENGTH_LESSTHANOREQUALTO
|| express == LENGTH_GREATERTHAN || express == LENGTH_GREATERTHANOREQUALTO) {
sb.append("LENGTH(").append(col).append(") ").append(express.value()).append(" ?");
} else { } else {
boolean lower = (express == IGNORECASEEQUAL || express == IGNORECASENOTEQUAL || express == IGNORECASELIKE boolean lower = (express == IGNORECASEEQUAL || express == IGNORECASENOTEQUAL || express == IGNORECASELIKE
|| express == IGNORECASENOTLIKE || express == IGNORECASECONTAIN || express == IGNORECASENOTCONTAIN); || express == IGNORECASENOTLIKE || express == IGNORECASECONTAIN || express == IGNORECASENOTCONTAIN);

View File

@@ -38,6 +38,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
//TCP Channel组 //TCP Channel组
protected AsynchronousChannelGroup group; protected AsynchronousChannelGroup group;
protected ScheduledThreadPoolExecutor scheduler;
protected final ArrayBlockingQueue<AsyncConnection> connQueue; protected final ArrayBlockingQueue<AsyncConnection> connQueue;
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) { public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
@@ -50,6 +52,42 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue; this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue;
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, "PoolSource-Scheduled-Thread");
t.setDaemon(true);
return t;
});
this.scheduler.scheduleAtFixedRate(() -> {
runPingTask();
}, 60, 30, TimeUnit.SECONDS);
}
private void runPingTask() {
try {
if (connQueue.isEmpty()) return;
long time = System.currentTimeMillis() - 30 * 1000;
AsyncConnection first = connQueue.peek();
if (first == null || first.getLastReadTime() >= time || first.getLastWriteTime() >= time) return;
pollAsync().whenComplete((conn, e) -> {
if (e != null) return;
if (conn.getLastReadTime() >= time || conn.getLastWriteTime() >= time) {//半分钟内已经用过
offerConnection(conn);
return;
}
CompletableFuture<AsyncConnection> future = sendPingCommand(conn);
if (future == null) { //不支持ping
offerConnection(conn);
return;
}
future.whenComplete((conn2, e2) -> {
if (e2 != null) return;
offerConnection(conn2);
runPingTask();
});
});
} catch (Exception e) {
logger.log(Level.FINEST, "PoolSource task ping failed", e);
}
} }
@Override @Override
@@ -216,6 +254,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
@Override @Override
public void close() { public void close() {
this.scheduler.shutdownNow();
connQueue.stream().forEach(x -> { connQueue.stream().forEach(x -> {
CompletableFuture<AsyncConnection> future = null; CompletableFuture<AsyncConnection> future = null;
try { try {
@@ -232,5 +271,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
}); });
} }
protected abstract CompletableFuture<AsyncConnection> sendPingCommand(final AsyncConnection conn);
protected abstract CompletableFuture<AsyncConnection> sendCloseCommand(final AsyncConnection conn); protected abstract CompletableFuture<AsyncConnection> sendCloseCommand(final AsyncConnection conn);
} }

View File

@@ -51,6 +51,14 @@ public class ByteBufferReader {
this.bigEndian = this.currBuffer.order() == ByteOrder.BIG_ENDIAN; this.bigEndian = this.currBuffer.order() == ByteOrder.BIG_ENDIAN;
} }
public ByteBufferReader append(ByteBuffer... buffs) {
for (ByteBuffer buf : buffs) {
Objects.requireNonNull(buf);
}
this.buffers = Utility.append(this.buffers, buffs);
return this;
}
public static ByteBufferReader create(ByteBuffer buffer) { public static ByteBufferReader create(ByteBuffer buffer) {
return new ByteBufferReader(buffer); return new ByteBufferReader(buffer);
} }
@@ -80,7 +88,30 @@ public class ByteBufferReader {
} }
public boolean hasRemaining() { public boolean hasRemaining() {
return this.currBuffer.hasRemaining(); boolean v = this.currBuffer.hasRemaining();
if (v) return v;
if (this.currIndex == this.buffers.length - 1) return false;
for (int i = this.currIndex + 1; i < this.buffers.length; i++) {
if (this.buffers[i].hasRemaining()) return true;
}
return false;
}
public int remaining() {
int v = this.currBuffer.remaining();
for (int i = this.currIndex + 1; i < this.buffers.length; i++) {
v += this.buffers[i].remaining();
}
return v;
}
//提前预读一个字节
public byte preget() {
ByteBuffer buf = this.currBuffer;
if (!buf.hasRemaining()) {
buf = this.buffers[this.currIndex + 1];
}
return buf.get(buf.position());
} }
public byte get() { public byte get() {

View File

@@ -85,6 +85,13 @@ public interface Creator<T> {
creatorCacheMap.put(Stream.class, (params) -> new ArrayList<>().stream()); creatorCacheMap.put(Stream.class, (params) -> new ArrayList<>().stream());
creatorCacheMap.put(ConcurrentHashMap.class, (params) -> new ConcurrentHashMap<>()); creatorCacheMap.put(ConcurrentHashMap.class, (params) -> new ConcurrentHashMap<>());
creatorCacheMap.put(CompletableFuture.class, (params) -> new CompletableFuture<>()); creatorCacheMap.put(CompletableFuture.class, (params) -> new CompletableFuture<>());
creatorCacheMap.put(Map.Entry.class, new Creator<Map.Entry>() {
@Override
@ConstructorParameters({"key", "value"})
public Map.Entry create(Object... params) {
return new AbstractMap.SimpleEntry(params[0], params[1]);
}
});
creatorCacheMap.put(AbstractMap.SimpleEntry.class, new Creator<AbstractMap.SimpleEntry>() { creatorCacheMap.put(AbstractMap.SimpleEntry.class, new Creator<AbstractMap.SimpleEntry>() {
@Override @Override
@ConstructorParameters({"key", "value"}) @ConstructorParameters({"key", "value"})
@@ -232,6 +239,8 @@ public interface Creator<T> {
clazz = (Class<T>) ConcurrentHashMap.class; clazz = (Class<T>) ConcurrentHashMap.class;
} else if (Collection.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(ArrayList.class)) { } else if (Collection.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(ArrayList.class)) {
clazz = (Class<T>) ArrayList.class; clazz = (Class<T>) ArrayList.class;
} else if (Map.Entry.class.isAssignableFrom(clazz) && (Modifier.isInterface(clazz.getModifiers()) || Modifier.isAbstract(clazz.getModifiers()) || !Modifier.isPublic(clazz.getModifiers()))) {
clazz = (Class<T>) AbstractMap.SimpleEntry.class;
} }
Creator creator = CreatorInner.creatorCacheMap.get(clazz); Creator creator = CreatorInner.creatorCacheMap.get(clazz);
if (creator != null) return creator; if (creator != null) return creator;

View File

@@ -19,23 +19,25 @@ import java.util.logging.*;
* @author zhangjx * @author zhangjx
* @param <T> 对象池元素的数据类型 * @param <T> 对象池元素的数据类型
*/ */
public final class ObjectPool<T> implements Supplier<T>, Consumer<T> { public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName()); protected static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName());
private final boolean debug; protected final boolean debug;
private final Queue<T> queue; protected Creator<T> creator;
private Creator<T> creator; protected int max;
private final Consumer<T> prepare; protected final Consumer<T> prepare;
private final Predicate<T> recycler; protected final Predicate<T> recycler;
private final AtomicLong creatCounter; protected final AtomicLong creatCounter;
private final AtomicLong cycleCounter; protected final AtomicLong cycleCounter;
protected final Queue<T> queue;
public ObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) { public ObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(2, clazz, prepare, recycler); this(2, clazz, prepare, recycler);
@@ -62,12 +64,18 @@ public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
} }
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) { public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors() * 2, max),
creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)));
}
protected ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler, Queue<T> queue) {
this.creatCounter = creatCounter; this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter; this.cycleCounter = cycleCounter;
this.creator = creator; this.creator = creator;
this.prepare = prepare; this.prepare = prepare;
this.recycler = recycler; this.recycler = recycler;
this.queue = new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); this.queue = queue;
this.max = max;
this.debug = logger.isLoggable(Level.FINEST); this.debug = logger.isLoggable(Level.FINEST);
} }

View File

@@ -17,7 +17,7 @@ public final class Redkale {
} }
public static String getDotedVersion() { public static String getDotedVersion() {
return "2.0.0-rc1"; return "2.0.0-rc3";
} }
public static int getMajorVersion() { public static int getMajorVersion() {

View File

@@ -0,0 +1,70 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.util;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
/**
* 对象池
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <T> 对象池元素的数据类型
*/
public class ThreadLocalObjectPool<T> extends ObjectPool<T> {
public ThreadLocalObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(2, clazz, prepare, recycler);
}
public ThreadLocalObjectPool(int max, Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(max, Creator.create(clazz), prepare, recycler);
}
public ThreadLocalObjectPool(Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(2, creator, prepare, recycler);
}
public ThreadLocalObjectPool(int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ThreadLocalObjectPool(int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ThreadLocalObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler);
}
public ThreadLocalObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
super(creatCounter, cycleCounter, max, creator, prepare, recycler, new LinkedList<>());
}
@Override
public T get() {
T result = queue.poll();
if (result == null) {
if (creatCounter != null) creatCounter.incrementAndGet();
result = this.creator.create();
}
if (prepare != null) prepare.accept(result);
return result;
}
@Override
public void accept(final T e) {
if (e != null && recycler.test(e) && this.queue.size() < this.max) {
if (cycleCounter != null) cycleCounter.incrementAndGet();
queue.offer(e);
}
}
}

View File

@@ -83,7 +83,8 @@ public abstract class TypeToken<T> {
if (type instanceof TypeVariable) return null; if (type instanceof TypeVariable) return null;
if (type instanceof GenericArrayType) return Array.newInstance(typeToClass(((GenericArrayType) type).getGenericComponentType()), 0).getClass(); if (type instanceof GenericArrayType) return Array.newInstance(typeToClass(((GenericArrayType) type).getGenericComponentType()), 0).getClass();
if (!(type instanceof ParameterizedType)) return null; //只能是null了 if (!(type instanceof ParameterizedType)) return null; //只能是null了
return typeToClass(((ParameterizedType) type).getOwnerType()); Type owner = ((ParameterizedType) type).getOwnerType();
return typeToClass(owner == null ? ((ParameterizedType) type).getRawType() : owner);
} }
public static Type[] getGenericType(final Type[] types, final Type declaringClass) { public static Type[] getGenericType(final Type[] types, final Type declaringClass) {

View File

@@ -37,13 +37,13 @@ public final class Utility {
/** /**
* <blockquote><pre> * <blockquote><pre>
* public final class AnonymousArrayFunction implements java.util.function.Function&lt;Object, char[]&gt; { * public final class AnonymousCharArrayFunction implements java.util.function.Function&lt;Object, char[]&gt; {
* *
* final sun.misc.Unsafe unsafe; * final sun.misc.Unsafe unsafe;
* *
* final long fd; * final long fd;
* *
* public AnonymousArrayFunction(Object obj, long fd) { * public AnonymousCharArrayFunction(Object obj, long fd) {
* this.unsafe = (sun.misc.Unsafe) obj; * this.unsafe = (sun.misc.Unsafe) obj;
* this.fd = fd; * this.fd = fd;
* } * }
@@ -56,37 +56,80 @@ public final class Utility {
* } * }
* </pre></blockquote> * </pre></blockquote>
*/ */
private static final String functionClassBinary = "cafebabe00000034002f0a00090022070023090008002409000800250a000200260700" private static final String functionCharClassBinary = "cafebabe00000034002d0a00090020070021090008002209000800230a00020024070025"
+ "270a0008002807002907002a07002b010006756e736166650100114c73756e2f6d6973632f556e736166653b01000266640100014a0100063c69" + "0a00080026070027070028070029010006756e736166650100114c73756e2f6d6973632f556e736166653b01000266640100014a0100063c696e69743e0"
+ "6e69743e010016284c6a6176612f6c616e672f4f626a6563743b4a2956010004436f646501000f4c696e654e756d6265725461626c650100124c" + "10016284c6a6176612f6c616e672f4f626a6563743b4a2956010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c56617269"
+ "6f63616c5661726961626c655461626c65010004746869730100294c6f72672f7265646b616c652f7574696c2f416e6f6e796d6f757341727261" + "61626c655461626c650100047468697301002d4c6f72672f7265646b616c652f7574696c2f416e6f6e796d6f757343686172417272617946756e6374696"
+ "7946756e6374696f6e3b0100036f626a0100124c6a6176612f6c616e672f4f626a6563743b0100056170706c79010016284c6a6176612f6c616e" + "f6e3b0100036f626a0100124c6a6176612f6c616e672f4f626a6563743b0100056170706c79010016284c6a6176612f6c616e672f4f626a6563743b295b"
+ "672f4f626a6563743b295b43010001740100236f72672e6e65746265616e732e536f757263654c6576656c416e6e6f746174696f6e730100144c" + "4301000174010026284c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f4f626a6563743b0100095369676e61747572650100454"
+ "6a6176612f6c616e672f4f766572726964653b010026284c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f4f626a6563" + "c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f46756e6374696f6e3c4c6a6176612f6c616e672f4f626a65"
+ "743b0100095369676e61747572650100454c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f46756e" + "63743b5b433e3b01000a536f7572636546696c6501001f416e6f6e796d6f757343686172417272617946756e6374696f6e2e6a6176610c000f002a01000"
+ "6374696f6e3c4c6a6176612f6c616e672f4f626a6563743b5b433e3b01000a536f7572636546696c6501001b416e6f6e796d6f75734172726179" + "f73756e2f6d6973632f556e736166650c000b000c0c000d000e0c002b002c0100025b430c0018001901002b6f72672f7265646b616c652f7574696c2f41"
+ "46756e6374696f6e2e6a6176610c000f002c01000f73756e2f6d6973632f556e736166650c000b000c0c000d000e0c002d002e0100025b430c00" + "6e6f6e796d6f757343686172417272617946756e6374696f6e0100106a6176612f6c616e672f4f626a65637401001b6a6176612f7574696c2f66756e637"
+ "1800190100276f72672f7265646b616c652f7574696c2f416e6f6e796d6f7573417272617946756e6374696f6e0100106a6176612f6c616e672f" + "4696f6e2f46756e6374696f6e0100032829560100096765744f626a656374010027284c6a6176612f6c616e672f4f626a6563743b4a294c6a6176612f6c"
+ "4f626a65637401001b6a6176612f7574696c2f66756e6374696f6e2f46756e6374696f6e0100032829560100096765744f626a65637401002728" + "616e672f4f626a6563743b0021000800090001000a00020010000b000c00000010000d000e000000030001000f0010000100110000005c0003000400000"
+ "4c6a6176612f6c616e672f4f626a6563743b4a294c6a6176612f6c616e672f4f626a6563743b0031000800090001000a00020010000b000c0000" + "0122ab700012a2bc00002b500032a20b50004b10000000200120000001200040000001200040013000c0014001100150013000000200003000000120014"
+ "0010000d000e000000030001000f0010000100110000005c00030004000000122ab700012a2bc00002b500032a20b50004b10000000200120000" + "001500000000001200160017000100000012000d000e0002000100180019000100110000004700040002000000132ab400032b2ab40004b60005c00006c"
+ "001200040000000e0004000f000c0010001100110013000000200003000000120014001500000000001200160017000100000012000d000e0002" + "00006b00000000200120000000600010000001900130000001600020000001300140015000000000013001a0017000110410018001b0001001100000030"
+ "000100180019000200110000004400040002000000102ab400032b2ab40004b60005c00006b00000000200120000000600010000001500130000" + "00020002000000062a2bb60007b00000000200120000000600010000000c00130000000c0001000000060014001500000002001c00000002001d001e000"
+ "001600020000001000140015000000000010001a00170001001b000000060001001c000010410018001d00020011000000300002000200000006" + "00002001f";
+ "2a2bb60007b00000000200120000000600010000000800130000000c000100000006001400150000001b000000060001001c00000002001e0000"
+ "0002001f0020000000020021";
private static final Function<Object, char[]> strFunction; private static final Function<Object, char[]> strCharFunction;
private static final Function<Object, char[]> sbFunction; private static final Function<Object, char[]> sbCharFunction;
/**
* <blockquote><pre>
* public final class AnonymousByteArrayFunction implements java.util.function.Function&lt;Object, byte[]&gt; {
*
* final sun.misc.Unsafe unsafe;
*
* final long fd;
*
* public AnonymousByteArrayFunction(Object obj, long fd) {
* this.unsafe = (sun.misc.Unsafe) obj;
* this.fd = fd;
* }
*
* &#64;Override
* public byte[] apply(Object t) {
* return (byte[]) unsafe.getObject(t, fd);
* }
*
* }
* </pre></blockquote>
*/
private static final String functionByteClassBinary = "cafebabe00000034002d0a00090020070021090008002209000800230a00020024070025"
+ "0a00080026070027070028070029010006756e736166650100114c73756e2f6d6973632f556e736166653b01000266640100014a0100063c696e69743e0"
+ "10016284c6a6176612f6c616e672f4f626a6563743b4a2956010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c56617269"
+ "61626c655461626c650100047468697301002d4c6f72672f7265646b616c652f7574696c2f416e6f6e796d6f757342797465417272617946756e6374696"
+ "f6e3b0100036f626a0100124c6a6176612f6c616e672f4f626a6563743b0100056170706c79010016284c6a6176612f6c616e672f4f626a6563743b295b"
+ "4201000174010026284c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f4f626a6563743b0100095369676e61747572650100454"
+ "c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f46756e6374696f6e3c4c6a6176612f6c616e672f4f626a65"
+ "63743b5b423e3b01000a536f7572636546696c6501001f416e6f6e796d6f757342797465417272617946756e6374696f6e2e6a6176610c000f002a01000"
+ "f73756e2f6d6973632f556e736166650c000b000c0c000d000e0c002b002c0100025b420c0018001901002b6f72672f7265646b616c652f7574696c2f41"
+ "6e6f6e796d6f757342797465417272617946756e6374696f6e0100106a6176612f6c616e672f4f626a65637401001b6a6176612f7574696c2f66756e637"
+ "4696f6e2f46756e6374696f6e0100032829560100096765744f626a656374010027284c6a6176612f6c616e672f4f626a6563743b4a294c6a6176612f6c"
+ "616e672f4f626a6563743b0021000800090001000a00020010000b000c00000010000d000e000000030001000f0010000100110000005c0003000400000"
+ "0122ab700012a2bc00002b500032a20b50004b10000000200120000001200040000001200040013000c0014001100150013000000200003000000120014"
+ "001500000000001200160017000100000012000d000e0002000100180019000100110000004700040002000000132ab400032b2ab40004b60005c00006c"
+ "00006b00000000200120000000600010000001900130000001600020000001300140015000000000013001a0017000110410018001b0001001100000030"
+ "00020002000000062a2bb60007b00000000200120000000600010000000c00130000000c0001000000060014001500000002001c00000002001d001e000"
+ "00002001f";
private static final Function<Object, byte[]> strByteFunction;
private static final Function<Object, byte[]> sbByteFunction;
private static final javax.net.ssl.SSLContext DEFAULTSSL_CONTEXT; private static final javax.net.ssl.SSLContext DEFAULTSSL_CONTEXT;
private static final javax.net.ssl.HostnameVerifier defaultVerifier = (s, ss) -> true; private static final javax.net.ssl.HostnameVerifier defaultVerifier = (s, ss) -> true;
static { static {
Function<Object, char[]> strFunction0 = null; Function<Object, char[]> strCharFunction0 = null;
Function<Object, char[]> sbFunction0 = null; Function<Object, char[]> sbCharFunction0 = null;
Function<Object, byte[]> strByteFunction0 = null;
Function<Object, byte[]> sbByteFunction0 = null;
try { try {
Field f = String.class.getDeclaredField("value"); Field f = String.class.getDeclaredField("value");
if (f.getType() == char[].class) { //JDK9及以上不再是char[] if (f.getType() == char[].class) { //JDK9及以上不再是char[]
@@ -97,21 +140,40 @@ public final class Utility {
final Method fm = usafe.getClass().getMethod("objectFieldOffset", Field.class); final Method fm = usafe.getClass().getMethod("objectFieldOffset", Field.class);
final long fd1 = (Long) fm.invoke(usafe, f); final long fd1 = (Long) fm.invoke(usafe, f);
final long fd2 = (Long) fm.invoke(usafe, StringBuilder.class.getSuperclass().getDeclaredField("value")); final long fd2 = (Long) fm.invoke(usafe, StringBuilder.class.getSuperclass().getDeclaredField("value"));
byte[] bytes = hexToBin(functionClassBinary); byte[] bytes = hexToBin(functionCharClassBinary);
Class<Attribute> creatorClazz = (Class<Attribute>) new ClassLoader() { Class<Attribute> creatorClazz = (Class<Attribute>) new ClassLoader() {
public final Class<?> loadClass(String name, byte[] b) { public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length); return defineClass(name, b, 0, b.length);
} }
}.loadClass("org.re" + "dkale.util.AnonymousArrayFunction", bytes); }.loadClass("org.re" + "dkale.util.AnonymousCharArrayFunction", bytes);
strFunction0 = (Function<Object, char[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd1); strCharFunction0 = (Function<Object, char[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd1);
sbFunction0 = (Function<Object, char[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd2); sbCharFunction0 = (Function<Object, char[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd2);
} else {
Class unsafeClass = Class.forName("sun.misc.Unsafe");
Field safeField = unsafeClass.getDeclaredField("theUnsafe");
safeField.setAccessible(true);
final Object usafe = safeField.get(null);
final Method fm = usafe.getClass().getMethod("objectFieldOffset", Field.class);
final long fd1 = (Long) fm.invoke(usafe, f);
final long fd2 = (Long) fm.invoke(usafe, StringBuilder.class.getSuperclass().getDeclaredField("value"));
byte[] bytes = hexToBin(functionByteClassBinary);
Class<Attribute> creatorClazz = (Class<Attribute>) new ClassLoader() {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
}.loadClass("org.re" + "dkale.util.AnonymousByteArrayFunction", bytes);
strByteFunction0 = (Function<Object, byte[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd1);
sbByteFunction0 = (Function<Object, byte[]>) creatorClazz.getDeclaredConstructor(Object.class, long.class).newInstance(usafe, fd2);
} }
} catch (Throwable e) { //不会发生 } catch (Throwable e) { //不会发生
//e.printStackTrace(); //e.printStackTrace();
} }
strFunction = strFunction0; strCharFunction = strCharFunction0;
sbFunction = sbFunction0; sbCharFunction = sbCharFunction0;
strByteFunction = strByteFunction0;
sbByteFunction = sbByteFunction0;
try { try {
DEFAULTSSL_CONTEXT = javax.net.ssl.SSLContext.getInstance("SSL"); DEFAULTSSL_CONTEXT = javax.net.ssl.SSLContext.getInstance("SSL");
@@ -1861,8 +1923,8 @@ public final class Utility {
public static byte[] encodeUTF8(final String value) { public static byte[] encodeUTF8(final String value) {
if (value == null) return new byte[0]; if (value == null) return new byte[0];
if (strFunction == null) return encodeUTF8(value.toCharArray()); if (strCharFunction == null) return encodeUTF8(value.toCharArray());
return encodeUTF8((char[]) strFunction.apply(value)); return encodeUTF8((char[]) strCharFunction.apply(value));
} }
public static byte[] encodeUTF8(final char[] array) { public static byte[] encodeUTF8(final char[] array) {
@@ -1913,14 +1975,28 @@ public final class Utility {
public static char[] charArray(String value) { public static char[] charArray(String value) {
if (value == null) return null; if (value == null) return null;
if (strFunction == null) return value.toCharArray(); if (strCharFunction == null) return value.toCharArray();
return strFunction.apply(value); return strCharFunction.apply(value);
} }
public static char[] charArray(StringBuilder value) { public static char[] charArray(StringBuilder value) {
if (value == null) return null; if (value == null) return null;
if (sbFunction == null) return value.toString().toCharArray(); if (sbCharFunction == null) return value.toString().toCharArray();
return sbFunction.apply(value); return sbCharFunction.apply(value);
}
//只能是单字节字符串
public static byte[] byteArray(String latin1Value) {
if (latin1Value == null) return null;
if (strByteFunction == null) return latin1Value.getBytes();
return strByteFunction.apply(latin1Value);
}
//只能是单字节字符串
public static byte[] byteArray(StringBuilder latin1Value) {
if (latin1Value == null) return null;
if (sbByteFunction == null) return latin1Value.toString().getBytes();
return sbByteFunction.apply(latin1Value);
} }
public static ByteBuffer encodeUTF8(final ByteBuffer buffer, final char[] array) { public static ByteBuffer encodeUTF8(final ByteBuffer buffer, final char[] array) {
@@ -1933,8 +2009,8 @@ public final class Utility {
public static int encodeUTF8Length(String value) { public static int encodeUTF8Length(String value) {
if (value == null) return -1; if (value == null) return -1;
if (strFunction == null) return encodeUTF8Length(value.toCharArray()); if (strCharFunction == null) return encodeUTF8Length(value.toCharArray());
return encodeUTF8Length(strFunction.apply(value)); return encodeUTF8Length(strCharFunction.apply(value));
} }
public static int encodeUTF8Length(final char[] text) { public static int encodeUTF8Length(final char[] text) {