From de089072fa81a4ef2a9fb7580d089e6a9e13cdee Mon Sep 17 00:00:00 2001 From: Redkale Date: Sat, 31 Dec 2022 09:58:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E4=BC=98=E5=8C=96Client=20ru?= =?UTF-8?q?nWork?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 4 +- .../org/redkale/boot/LoggingBaseHandler.java | 19 ++++++++++ .../org/redkale/boot/LoggingFileHandler.java | 18 --------- .../java/org/redkale/net/AsyncConnection.java | 2 + .../java/org/redkale/net/AsyncIOThread.java | 34 ++++++++++++----- .../redkale/net/client/ClientConnection.java | 37 +++++++++---------- .../org/redkale/net/client/ClientFuture.java | 14 +++---- .../org/redkale/source/DataJdbcSource.java | 6 +-- .../java/org/redkale/source/EntityInfo.java | 23 ------------ src/main/java/org/redkale/util/Sheet.java | 14 +++++-- 10 files changed, 86 insertions(+), 85 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index b4ba744dc..6fded3cc5 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -612,15 +612,17 @@ public final class Application { } //给所有client给一个默认的AsyncGroup + final AtomicReference clientref = new AtomicReference<>(); final AtomicInteger wclientCounter = new AtomicInteger(); final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2); clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> { int i = wclientCounter.get(); int c = wclientCounter.incrementAndGet(); String threadname = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, clientThreads, workref.get(), r); + Thread t = new WorkThread(threadname, i, clientThreads, clientref.get(), r); return t; }); + clientref.set(clientExecutor); } this.workExecutor = workExecutor0; this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); diff --git a/src/main/java/org/redkale/boot/LoggingBaseHandler.java b/src/main/java/org/redkale/boot/LoggingBaseHandler.java index 87b6efd60..da88cf39a 100644 --- a/src/main/java/org/redkale/boot/LoggingBaseHandler.java +++ b/src/main/java/org/redkale/boot/LoggingBaseHandler.java @@ -113,4 +113,23 @@ public abstract class LoggingBaseHandler extends Handler { log.setParameters(new String[]{Thread.currentThread().getName(), traceid}); } } + + public static void initDebugLogConfig() { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + final PrintStream ps = new PrintStream(out); + final String handlerName = LoggingFileHandler.LoggingConsoleHandler.class.getName(); //java.util.logging.ConsoleHandler + ps.println("handlers = " + handlerName); + ps.println(".level = FINEST"); + ps.println("jdk.level = INFO"); + ps.println("sun.level = INFO"); + ps.println("com.sun.level = INFO"); + ps.println("javax.level = INFO"); + ps.println(handlerName + ".level = FINEST"); + ps.println(handlerName + ".formatter = " + LoggingFormater.class.getName()); + LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray())); + } catch (Exception e) { + } + } + } diff --git a/src/main/java/org/redkale/boot/LoggingFileHandler.java b/src/main/java/org/redkale/boot/LoggingFileHandler.java index ec1c9ab83..c54075e09 100644 --- a/src/main/java/org/redkale/boot/LoggingFileHandler.java +++ b/src/main/java/org/redkale/boot/LoggingFileHandler.java @@ -70,24 +70,6 @@ public class LoggingFileHandler extends LoggingBaseHandler { } } - public static void initDebugLogConfig() { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - final PrintStream ps = new PrintStream(out); - final String handlerName = LoggingConsoleHandler.class.getName(); //java.util.logging.ConsoleHandler - ps.println("handlers = " + handlerName); - ps.println(".level = FINEST"); - ps.println("jdk.level = INFO"); - ps.println("sun.level = INFO"); - ps.println("com.sun.level = INFO"); - ps.println("javax.level = INFO"); - ps.println(handlerName + ".level = FINEST"); - ps.println(handlerName + ".formatter = " + LoggingFormater.class.getName()); - LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray())); - } catch (Exception e) { - } - } - protected final LinkedBlockingQueue logqueue = new LinkedBlockingQueue(); protected String pattern; diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 70545a2b0..a06a852ea 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -81,6 +81,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, Supplier bufferSupplier, Consumer bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { + Objects.requireNonNull(ioGroup); + Objects.requireNonNull(ioThread); Objects.requireNonNull(bufferSupplier); Objects.requireNonNull(bufferConsumer); this.client = client; diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index ba87b1d9f..1f893d0b8 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -66,32 +66,46 @@ public class AsyncIOThread extends WorkThread { return true; } + /** + * 不可重置, 防止IO操作不在IO线程中执行 + * + * @param command + */ @Override - public void execute(Runnable command) { + public final void execute(Runnable command) { commandQueue.offer(command); selector.wakeup(); } + /** + * 不可重置, 防止IO操作不在IO线程中执行 + * + * @param commands + */ @Override - public void execute(Runnable... commands) { + public final void execute(Runnable... commands) { for (Runnable command : commands) { commandQueue.offer(command); } selector.wakeup(); } + /** + * 不可重置, 防止IO操作不在IO线程中执行 + * + * @param commands + */ @Override - public void execute(Collection commands) { - if (commands == null) { - return; + public final void execute(Collection commands) { + if (commands != null) { + for (Runnable command : commands) { + commandQueue.offer(command); + } + selector.wakeup(); } - for (Runnable command : commands) { - commandQueue.offer(command); - } - selector.wakeup(); } - public void register(Consumer consumer) { + public final void register(Consumer consumer) { registerQueue.offer(consumer); selector.wakeup(); } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index db02fcac8..22f907b5c 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -332,11 +332,12 @@ public abstract class ClientConnection implements Co protected abstract ClientCodec createCodec(); protected CompletableFuture

writeChannel(R request) { - ClientFuture respFuture = createClientFuture(request); + ClientFuture respFuture; if (request == client.closeRequest) { - respFuture.request = null; + respFuture = createClientFuture(null); closeFuture = respFuture; } else { + respFuture = createClientFuture(request); int rts = this.channel.getReadTimeoutSeconds(); if (rts > 0 && respFuture.request != null) { respFuture.conn = this; @@ -353,19 +354,18 @@ public abstract class ClientConnection implements Co } private void writeChannelInThread(R request, ClientFuture respFuture) { - { //保证顺序一致 - if (client.closeRequest != null && respFuture.request == client.closeRequest) { - responseQueue.offer(ClientFuture.EMPTY); - } else { - request.respFuture = respFuture; - responseQueue.offer(respFuture); - } - requestQueue.offer(request); - if (isAuthenticated() && client.reqWritedCounter != null) { - client.reqWritedCounter.increment(); - } + //保证顺序一致 + if (client.closeRequest != null && respFuture.request == client.closeRequest) { + responseQueue.offer(ClientFuture.EMPTY); + } else { + request.respFuture = respFuture; + responseQueue.offer(respFuture); } - if (responseQueue.size() < 2 && writePending.compareAndSet(false, true)) {//responseQueue.size() < 2 && 加了这句会存在偶尔不写数据的问题? + requestQueue.offer(request); + if (isAuthenticated() && client.reqWritedCounter != null) { + client.reqWritedCounter.increment(); + } + if (writePending.compareAndSet(false, true)) { continueWrite(true); } } @@ -402,14 +402,13 @@ public abstract class ClientConnection implements Co public void dispose(Throwable exc) { channel.dispose(); - Throwable e = exc; + Throwable e = exc == null ? new ClosedChannelException() : exc; CompletableFuture f; respWaitingCounter.reset(); + WorkThread thread = channel.getAsyncIOThread(); while ((f = responseQueue.poll()) != null) { - if (e == null) { - e = new ClosedChannelException(); - } - f.completeExceptionally(e); + CompletableFuture future = f; + thread.runWork(() -> future.completeExceptionally(e)); } } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 94dc89164..70e60ae0f 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -16,7 +16,7 @@ import org.redkale.net.*; */ public class ClientFuture extends CompletableFuture implements Runnable { - public static final ClientFuture EMPTY = new ClientFuture() { + public static final ClientFuture EMPTY = new ClientFuture(null) { @Override public boolean complete(Object value) { return true; @@ -28,7 +28,7 @@ public class ClientFuture extends CompletableFuture implements Runnable { } }; - protected ClientRequest request; + protected final ClientRequest request; ScheduledFuture timeout; @@ -36,10 +36,6 @@ public class ClientFuture extends CompletableFuture implements Runnable { ClientConnection conn; - public ClientFuture() { - super(); - } - public ClientFuture(ClientRequest request) { super(); this.request = request; @@ -51,7 +47,11 @@ public class ClientFuture extends CompletableFuture implements Runnable { @Override //JDK9+ public ClientFuture newIncompleteFuture() { - return new ClientFuture<>(); + ClientFuture future = new ClientFuture<>(request); + future.timeout = timeout; + future.mergeCount = mergeCount; + future.conn = conn; + return future; } public R getRequest() { diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 9d17d7971..16c3ec006 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -1652,16 +1652,16 @@ public class DataJdbcSource extends DataSqlSource { String listSubSql; StringBuilder union = new StringBuilder(); if (tables.length == 1) { - listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere; + listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere; } else { int b = 0; for (String table : tables) { if (!union.isEmpty()) { union.append(" UNION ALL "); } - union.append("SELECT ").append(info.getFullQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere); + union.append("SELECT ").append(info.getQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere); } - listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a"; + listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getQueryColumns("a", selects) + " FROM (" + (union) + ") a"; } listSql = listSubSql + createSQLOrderby(info, flipper); if (mysqlOrPgsql) { diff --git a/src/main/java/org/redkale/source/EntityInfo.java b/src/main/java/org/redkale/source/EntityInfo.java index 534d7d39f..9ccbefa62 100644 --- a/src/main/java/org/redkale/source/EntityInfo.java +++ b/src/main/java/org/redkale/source/EntityInfo.java @@ -971,29 +971,6 @@ public final class EntityInfo { } if ("a".equals(tabalis)) { return querySqlColumnSequenceA; - } - return tabalis + "." + Utility.joining(querySqlColumns, "," + tabalis + "."); - } - StringBuilder sb = new StringBuilder(); - for (Attribute attr : this.attributes) { - if (!selects.test(attr.field())) { - continue; - } - if (sb.length() > 0) { - sb.append(','); - } - sb.append(getSQLColumn(tabalis, attr.field())); - } - if (sb.length() == 0) { - sb.append('*'); - } - return sb; - } - - public CharSequence getFullQueryColumns(String tabalis, SelectColumn selects) { - if (selects == null) { - if (tabalis == null) { - return querySqlColumnSequence; } else { StringBuilder sb = new StringBuilder(); String s = tabalis + "."; diff --git a/src/main/java/org/redkale/util/Sheet.java b/src/main/java/org/redkale/util/Sheet.java index 6e7fd71a8..7870b35b4 100644 --- a/src/main/java/org/redkale/util/Sheet.java +++ b/src/main/java/org/redkale/util/Sheet.java @@ -7,7 +7,7 @@ package org.redkale.util; import java.util.*; import java.util.function.*; -import java.util.stream.*; +import java.util.stream.Stream; import org.redkale.convert.ConvertColumn; /** @@ -50,7 +50,9 @@ public class Sheet implements java.io.Serializable, Iterable { } public Sheet copyTo(Sheet copy) { - if (copy == null) return copy; + if (copy == null) { + return copy; + } copy.total = this.total; Collection data = this.getRows(); if (data != null) { @@ -95,7 +97,9 @@ public class Sheet implements java.io.Serializable, Iterable { public List list(boolean created) { Collection data = this.rows; - if (data == null) return created ? new ArrayList() : null; + if (data == null) { + return created ? new ArrayList() : null; + } return (data instanceof List) ? (List) data : new ArrayList(data); } @@ -119,7 +123,9 @@ public class Sheet implements java.io.Serializable, Iterable { public Sheet map(Function mapper) { Collection data = this.rows; - if (data == null || data.isEmpty()) return (Sheet) this; + if (data == null || data.isEmpty()) { + return (Sheet) this; + } final List list = new ArrayList<>(); for (T item : data) { list.add(mapper.apply(item));