Server优化
This commit is contained in:
@@ -92,8 +92,8 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
|||||||
LongAdder createResponseCounter = new LongAdder();
|
LongAdder createResponseCounter = new LongAdder();
|
||||||
LongAdder cycleResponseCounter = new LongAdder();
|
LongAdder cycleResponseCounter = new LongAdder();
|
||||||
|
|
||||||
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
|
ObjectPool<ByteBuffer> safeBufferPool = server.createBufferSafePool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
|
||||||
ObjectPool<Response> safeResponsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
|
ObjectPool<Response> safeResponsePool = server.createResponseSafePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
|
||||||
final int respPoolMax = server.getResponsePoolSize();
|
final int respPoolMax = server.getResponsePoolSize();
|
||||||
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
|
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
|
||||||
if (!(Thread.currentThread() instanceof WorkThread)) {
|
if (!(Thread.currentThread() instanceof WorkThread)) {
|
||||||
@@ -119,7 +119,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
|||||||
(pool == null ? safeResponsePool : pool).accept(v);
|
(pool == null ? safeResponsePool : pool).accept(v);
|
||||||
};
|
};
|
||||||
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
|
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
|
||||||
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, bufferPool);
|
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool);
|
||||||
this.ioGroup.start();
|
this.ioGroup.start();
|
||||||
|
|
||||||
this.acceptThread = new Thread() {
|
this.acceptThread = new Thread() {
|
||||||
|
|||||||
@@ -89,8 +89,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
|
|||||||
LongAdder createResponseCounter = new LongAdder();
|
LongAdder createResponseCounter = new LongAdder();
|
||||||
LongAdder cycleResponseCounter = new LongAdder();
|
LongAdder cycleResponseCounter = new LongAdder();
|
||||||
|
|
||||||
ObjectPool<ByteBuffer> safeBufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
|
ObjectPool<ByteBuffer> safeBufferPool = server.createBufferSafePool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
|
||||||
ObjectPool<Response> safeResponsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
|
ObjectPool<Response> safeResponsePool = server.createResponseSafePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
|
||||||
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
|
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
|
||||||
if (!(Thread.currentThread() instanceof WorkThread)) {
|
if (!(Thread.currentThread() instanceof WorkThread)) {
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@@ -423,10 +423,10 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
|
|||||||
}
|
}
|
||||||
|
|
||||||
//必须在 createContext()之后调用
|
//必须在 createContext()之后调用
|
||||||
protected abstract ObjectPool<ByteBuffer> createBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize);
|
protected abstract ObjectPool<ByteBuffer> createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize);
|
||||||
|
|
||||||
//必须在 createContext()之后调用
|
//必须在 createContext()之后调用
|
||||||
protected abstract ObjectPool<Response> createResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize);
|
protected abstract ObjectPool<P> createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize);
|
||||||
|
|
||||||
public void shutdown() throws IOException {
|
public void shutdown() throws IOException {
|
||||||
long s = System.currentTimeMillis();
|
long s = System.currentTimeMillis();
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import java.util.function.Supplier;
|
|||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
import org.redkale.mq.*;
|
import org.redkale.mq.*;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.Server;
|
||||||
import org.redkale.net.http.HttpContext.HttpContextConfig;
|
import org.redkale.net.http.HttpContext.HttpContextConfig;
|
||||||
import org.redkale.net.http.HttpResponse.HttpResponseConfig;
|
import org.redkale.net.http.HttpResponse.HttpResponseConfig;
|
||||||
import org.redkale.net.sncp.Sncp;
|
import org.redkale.net.sncp.Sncp;
|
||||||
@@ -541,7 +541,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ObjectPool<ByteBuffer> createBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
|
protected ObjectPool<ByteBuffer> createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
|
||||||
final int rcapacity = this.bufferCapacity;
|
final int rcapacity = this.bufferCapacity;
|
||||||
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
|
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
|
||||||
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
|
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
|
||||||
@@ -555,9 +555,9 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ObjectPool<Response> createResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
|
protected ObjectPool<HttpResponse> createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
|
||||||
Creator<Response> creator = (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context), this.respConfig);
|
Creator<HttpResponse> creator = (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context), this.respConfig);
|
||||||
ObjectPool<Response> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
|
ObjectPool<HttpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, HttpResponse::prepare, HttpResponse::recycle);
|
||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import java.util.List;
|
|||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.*;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
import org.redkale.convert.bson.BsonFactory;
|
import org.redkale.convert.bson.BsonFactory;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.Server;
|
||||||
import org.redkale.net.sncp.SncpContext.SncpContextConfig;
|
import org.redkale.net.sncp.SncpContext.SncpContextConfig;
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
@@ -127,7 +127,7 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ObjectPool<ByteBuffer> createBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
|
protected ObjectPool<ByteBuffer> createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
|
||||||
final int rcapacity = this.bufferCapacity;
|
final int rcapacity = this.bufferCapacity;
|
||||||
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
|
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
|
||||||
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
|
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
|
||||||
@@ -141,9 +141,9 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ObjectPool<Response> createResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
|
protected ObjectPool<SncpResponse> createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
|
||||||
Creator<Response> creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context));
|
Creator<SncpResponse> creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context));
|
||||||
ObjectPool<Response> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, (x) -> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle());
|
ObjectPool<SncpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle);
|
||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user