From 1982c984ee44807581302c192266cead9a63fa4e Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 9 Jan 2021 14:42:45 +0800 Subject: [PATCH] --- src/org/redkale/util/ThreadHashExecutor.java | 30 +++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/org/redkale/util/ThreadHashExecutor.java b/src/org/redkale/util/ThreadHashExecutor.java index 8e956c8ff..3f874ac89 100644 --- a/src/org/redkale/util/ThreadHashExecutor.java +++ b/src/org/redkale/util/ThreadHashExecutor.java @@ -23,25 +23,41 @@ public class ThreadHashExecutor { private final AtomicInteger index = new AtomicInteger(); + private final LinkedBlockingQueue[] queues; + private final ExecutorService[] executors; public ThreadHashExecutor(int size) { ExecutorService[] array = new ExecutorService[size]; + LinkedBlockingQueue[] ques = new LinkedBlockingQueue[size]; final AtomicInteger counter = new AtomicInteger(); for (int i = 0; i < array.length; i++) { - array[i] = Executors.newSingleThreadExecutor((Runnable r) -> { - Thread t = new Thread(r); - t.setDaemon(true); - t.setName("Redkale-HashThread-" + counter.incrementAndGet()); - return t; - }); + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + ques[i] = queue; + array[i] = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, + (Runnable r) -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("Redkale-HashThread-" + counter.incrementAndGet()); + return t; + }); } + this.queues = ques; this.executors = array; } public void execute(int hash, Runnable command) { if (hash < 1) { - this.executors[index.incrementAndGet() % this.executors.length].execute(command); + int k = 0; + int minsize = queues[0].size(); + for (int i = 1; i < queues.length; i++) { + int size = queues[i].size(); + if (size < minsize) { + minsize = size; + k = i; + } + } + this.executors[k].execute(command); } else { this.executors[hash % this.executors.length].execute(command); }