临时优化Client runWork

This commit is contained in:
Redkale
2022-12-31 09:58:51 +08:00
parent 2542eb959a
commit de089072fa
10 changed files with 86 additions and 85 deletions

View File

@@ -612,15 +612,17 @@ public final class Application {
}
//给所有client给一个默认的AsyncGroup
final AtomicReference<ExecutorService> clientref = new AtomicReference<>();
final AtomicInteger wclientCounter = new AtomicInteger();
final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2);
clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> {
int i = wclientCounter.get();
int c = wclientCounter.incrementAndGet();
String threadname = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, clientThreads, workref.get(), r);
Thread t = new WorkThread(threadname, i, clientThreads, clientref.get(), r);
return t;
});
clientref.set(clientExecutor);
}
this.workExecutor = workExecutor0;
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);

View File

@@ -113,4 +113,23 @@ public abstract class LoggingBaseHandler extends Handler {
log.setParameters(new String[]{Thread.currentThread().getName(), traceid});
}
}
public static void initDebugLogConfig() {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
final PrintStream ps = new PrintStream(out);
final String handlerName = LoggingFileHandler.LoggingConsoleHandler.class.getName(); //java.util.logging.ConsoleHandler
ps.println("handlers = " + handlerName);
ps.println(".level = FINEST");
ps.println("jdk.level = INFO");
ps.println("sun.level = INFO");
ps.println("com.sun.level = INFO");
ps.println("javax.level = INFO");
ps.println(handlerName + ".level = FINEST");
ps.println(handlerName + ".formatter = " + LoggingFormater.class.getName());
LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray()));
} catch (Exception e) {
}
}
}

View File

@@ -70,24 +70,6 @@ public class LoggingFileHandler extends LoggingBaseHandler {
}
}
public static void initDebugLogConfig() {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
final PrintStream ps = new PrintStream(out);
final String handlerName = LoggingConsoleHandler.class.getName(); //java.util.logging.ConsoleHandler
ps.println("handlers = " + handlerName);
ps.println(".level = FINEST");
ps.println("jdk.level = INFO");
ps.println("sun.level = INFO");
ps.println("com.sun.level = INFO");
ps.println("javax.level = INFO");
ps.println(handlerName + ".level = FINEST");
ps.println(handlerName + ".formatter = " + LoggingFormater.class.getName());
LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray()));
} catch (Exception e) {
}
}
protected final LinkedBlockingQueue<LogRecord> logqueue = new LinkedBlockingQueue();
protected String pattern;

View File

@@ -81,6 +81,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, Supplier<ByteBuffer> bufferSupplier,
Consumer<ByteBuffer> bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
Objects.requireNonNull(ioGroup);
Objects.requireNonNull(ioThread);
Objects.requireNonNull(bufferSupplier);
Objects.requireNonNull(bufferConsumer);
this.client = client;

View File

@@ -66,32 +66,46 @@ public class AsyncIOThread extends WorkThread {
return true;
}
/**
* 不可重置, 防止IO操作不在IO线程中执行
*
* @param command
*/
@Override
public void execute(Runnable command) {
public final void execute(Runnable command) {
commandQueue.offer(command);
selector.wakeup();
}
/**
* 不可重置, 防止IO操作不在IO线程中执行
*
* @param commands
*/
@Override
public void execute(Runnable... commands) {
public final void execute(Runnable... commands) {
for (Runnable command : commands) {
commandQueue.offer(command);
}
selector.wakeup();
}
/**
* 不可重置, 防止IO操作不在IO线程中执行
*
* @param commands
*/
@Override
public void execute(Collection<Runnable> commands) {
if (commands == null) {
return;
public final void execute(Collection<Runnable> commands) {
if (commands != null) {
for (Runnable command : commands) {
commandQueue.offer(command);
}
selector.wakeup();
}
for (Runnable command : commands) {
commandQueue.offer(command);
}
selector.wakeup();
}
public void register(Consumer<Selector> consumer) {
public final void register(Consumer<Selector> consumer) {
registerQueue.offer(consumer);
selector.wakeup();
}

View File

@@ -332,11 +332,12 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected abstract ClientCodec createCodec();
protected CompletableFuture<P> writeChannel(R request) {
ClientFuture respFuture = createClientFuture(request);
ClientFuture respFuture;
if (request == client.closeRequest) {
respFuture.request = null;
respFuture = createClientFuture(null);
closeFuture = respFuture;
} else {
respFuture = createClientFuture(request);
int rts = this.channel.getReadTimeoutSeconds();
if (rts > 0 && respFuture.request != null) {
respFuture.conn = this;
@@ -353,19 +354,18 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
private void writeChannelInThread(R request, ClientFuture respFuture) {
{ //保证顺序一致
if (client.closeRequest != null && respFuture.request == client.closeRequest) {
responseQueue.offer(ClientFuture.EMPTY);
} else {
request.respFuture = respFuture;
responseQueue.offer(respFuture);
}
requestQueue.offer(request);
if (isAuthenticated() && client.reqWritedCounter != null) {
client.reqWritedCounter.increment();
}
//保证顺序一致
if (client.closeRequest != null && respFuture.request == client.closeRequest) {
responseQueue.offer(ClientFuture.EMPTY);
} else {
request.respFuture = respFuture;
responseQueue.offer(respFuture);
}
if (responseQueue.size() < 2 && writePending.compareAndSet(false, true)) {//responseQueue.size() < 2 && 加了这句会存在偶尔不写数据的问题?
requestQueue.offer(request);
if (isAuthenticated() && client.reqWritedCounter != null) {
client.reqWritedCounter.increment();
}
if (writePending.compareAndSet(false, true)) {
continueWrite(true);
}
}
@@ -402,14 +402,13 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
public void dispose(Throwable exc) {
channel.dispose();
Throwable e = exc;
Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f;
respWaitingCounter.reset();
WorkThread thread = channel.getAsyncIOThread();
while ((f = responseQueue.poll()) != null) {
if (e == null) {
e = new ClosedChannelException();
}
f.completeExceptionally(e);
CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e));
}
}

View File

@@ -16,7 +16,7 @@ import org.redkale.net.*;
*/
public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
public static final ClientFuture EMPTY = new ClientFuture() {
public static final ClientFuture EMPTY = new ClientFuture(null) {
@Override
public boolean complete(Object value) {
return true;
@@ -28,7 +28,7 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
}
};
protected ClientRequest request;
protected final ClientRequest request;
ScheduledFuture timeout;
@@ -36,10 +36,6 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
ClientConnection conn;
public ClientFuture() {
super();
}
public ClientFuture(ClientRequest request) {
super();
this.request = request;
@@ -51,7 +47,11 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
@Override //JDK9+
public <U> ClientFuture<U> newIncompleteFuture() {
return new ClientFuture<>();
ClientFuture future = new ClientFuture<>(request);
future.timeout = timeout;
future.mergeCount = mergeCount;
future.conn = conn;
return future;
}
public <R extends ClientRequest> R getRequest() {

View File

@@ -1652,16 +1652,16 @@ public class DataJdbcSource extends DataSqlSource {
String listSubSql;
StringBuilder union = new StringBuilder();
if (tables.length == 1) {
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
for (String table : tables) {
if (!union.isEmpty()) {
union.append(" UNION ALL ");
}
union.append("SELECT ").append(info.getFullQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
union.append("SELECT ").append(info.getQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a";
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
listSql = listSubSql + createSQLOrderby(info, flipper);
if (mysqlOrPgsql) {

View File

@@ -971,29 +971,6 @@ public final class EntityInfo<T> {
}
if ("a".equals(tabalis)) {
return querySqlColumnSequenceA;
}
return tabalis + "." + Utility.joining(querySqlColumns, "," + tabalis + ".");
}
StringBuilder sb = new StringBuilder();
for (Attribute attr : this.attributes) {
if (!selects.test(attr.field())) {
continue;
}
if (sb.length() > 0) {
sb.append(',');
}
sb.append(getSQLColumn(tabalis, attr.field()));
}
if (sb.length() == 0) {
sb.append('*');
}
return sb;
}
public CharSequence getFullQueryColumns(String tabalis, SelectColumn selects) {
if (selects == null) {
if (tabalis == null) {
return querySqlColumnSequence;
} else {
StringBuilder sb = new StringBuilder();
String s = tabalis + ".";

View File

@@ -7,7 +7,7 @@ package org.redkale.util;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.stream.Stream;
import org.redkale.convert.ConvertColumn;
/**
@@ -50,7 +50,9 @@ public class Sheet<T> implements java.io.Serializable, Iterable<T> {
}
public Sheet<T> copyTo(Sheet<T> copy) {
if (copy == null) return copy;
if (copy == null) {
return copy;
}
copy.total = this.total;
Collection<T> data = this.getRows();
if (data != null) {
@@ -95,7 +97,9 @@ public class Sheet<T> implements java.io.Serializable, Iterable<T> {
public List<T> list(boolean created) {
Collection<T> data = this.rows;
if (data == null) return created ? new ArrayList() : null;
if (data == null) {
return created ? new ArrayList() : null;
}
return (data instanceof List) ? (List<T>) data : new ArrayList(data);
}
@@ -119,7 +123,9 @@ public class Sheet<T> implements java.io.Serializable, Iterable<T> {
public <R> Sheet<R> map(Function<T, R> mapper) {
Collection<T> data = this.rows;
if (data == null || data.isEmpty()) return (Sheet) this;
if (data == null || data.isEmpty()) {
return (Sheet) this;
}
final List<R> list = new ArrayList<>();
for (T item : data) {
list.add(mapper.apply(item));