This commit is contained in:
Redkale
2021-01-20 13:20:03 +08:00
parent eace16d7fd
commit 4e689855f4
11 changed files with 14 additions and 40 deletions

View File

@@ -26,7 +26,7 @@ public class SncpMessageRequest extends SncpRequest {
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public SncpMessageRequest(SncpContext context, MessageRecord message) { public SncpMessageRequest(SncpContext context, MessageRecord message) {
super(context, null); super(context);
this.message = message; this.message = message;
readHeader(ByteBuffer.wrap(message.getContent())); readHeader(ByteBuffer.wrap(message.getContent()));
} }

View File

@@ -79,7 +79,6 @@ public class AioTcpProtocolServer extends ProtocolServer {
AtomicLong createResponseCounter = new AtomicLong(); AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong(); AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final AsynchronousServerSocketChannel serchannel = this.serverChannel; final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {

View File

@@ -78,7 +78,6 @@ public class BioUdpProtocolServer extends ProtocolServer {
AtomicLong createResponseCounter = new AtomicLong(); AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong(); AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final DatagramChannel serchannel = this.serverChannel; final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds; final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;

View File

@@ -91,7 +91,6 @@ public class NioTcpProtocolServer extends ProtocolServer {
AtomicLong cycleResponseCounter = new AtomicLong(); AtomicLong cycleResponseCounter = new AtomicLong();
this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
this.ioGroup = new NioThreadGroup(server.name, null, Runtime.getRuntime().availableProcessors(), bufferPool, responsePool); this.ioGroup = new NioThreadGroup(server.name, null, Runtime.getRuntime().availableProcessors(), bufferPool, responsePool);
this.ioGroup.start(); this.ioGroup.start();

View File

@@ -9,7 +9,6 @@ import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.ObjectPool;
/** /**
* 协议请求对象 * 协议请求对象
@@ -24,8 +23,6 @@ public abstract class Request<C extends Context> {
protected final C context; protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final BsonConvert bsonConvert; protected final BsonConvert bsonConvert;
protected final JsonConvert jsonConvert; protected final JsonConvert jsonConvert;
@@ -48,9 +45,8 @@ public abstract class Request<C extends Context> {
protected final Map<String, Object> attributes = new HashMap<>(); protected final Map<String, Object> attributes = new HashMap<>();
protected Request(C context, ObjectPool<ByteBuffer> bufferPool) { protected Request(C context) {
this.context = context; this.context = context;
this.bufferPool = bufferPool;
this.bsonConvert = context.getBsonConvert(); this.bsonConvert = context.getBsonConvert();
this.jsonConvert = context.getJsonConvert(); this.jsonConvert = context.getJsonConvert();
} }

View File

@@ -368,9 +368,6 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//必须在 createContext()之后调用 //必须在 createContext()之后调用
protected abstract ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize); protected abstract ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize);
//必须在 createResponsePool()之后调用
protected abstract Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool);
public void shutdown() throws IOException { public void shutdown() throws IOException {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning"); logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning");

View File

@@ -103,13 +103,13 @@ public class HttpRequest extends Request<HttpContext> {
Object attachment; //仅供HttpServlet传递Entry使用 Object attachment; //仅供HttpServlet传递Entry使用
public HttpRequest(HttpContext context, ObjectPool<ByteBuffer> bufferPool) { public HttpRequest(HttpContext context) {
super(context, bufferPool); super(context);
this.remoteAddrHeader = context.remoteAddrHeader; this.remoteAddrHeader = context.remoteAddrHeader;
} }
public HttpRequest(HttpContext context, HttpSimpleRequest req) { public HttpRequest(HttpContext context, HttpSimpleRequest req) {
super(context, null); super(context);
this.remoteAddrHeader = null; this.remoteAddrHeader = null;
if (req != null) { if (req != null) {
this.rpc = req.rpc; this.rpc = req.rpc;

View File

@@ -469,11 +469,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
@Override @Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) { protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return HttpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null); ObjectPool<Response> pool = HttpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
} pool.setCreator((Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context), pool, this.respConfig));
return pool;
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context, bufferPool), responsePool, this.respConfig);
} }
} }

View File

@@ -10,12 +10,10 @@ import static org.redkale.net.sncp.SncpRequest.DEFAULT_HEADER;
import java.io.*; import java.io.*;
import java.lang.annotation.*; import java.lang.annotation.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.nio.*;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.*; import javax.annotation.*;
import org.redkale.asm.*; import org.redkale.asm.*;
@@ -47,7 +45,6 @@ public final class SncpDynServlet extends SncpServlet {
private final HashMap<DLong, SncpServletAction> actions = new HashMap<>(); private final HashMap<DLong, SncpServletAction> actions = new HashMap<>();
private Supplier<ByteBuffer> bufferSupplier;
public SncpDynServlet(final BsonConvert convert, final String serviceName, final Class serviceOrSourceType, final Service service, public SncpDynServlet(final BsonConvert convert, final String serviceName, final Class serviceOrSourceType, final Service service,
final AtomicInteger maxClassNameLength, AtomicInteger maxNameLength) { final AtomicInteger maxClassNameLength, AtomicInteger maxNameLength) {
@@ -113,15 +110,12 @@ public final class SncpDynServlet extends SncpServlet {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException { public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (bufferSupplier == null) {
bufferSupplier = request.getBufferPool();
}
final SncpServletAction action = actions.get(request.getActionid()); final SncpServletAction action = actions.get(request.getActionid());
//logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));
if (action == null) { if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else { } else {
BsonWriter out = bufferSupplier == null ? action.convert.pollBsonWriter() : action.convert.pollBsonWriter(bufferSupplier); BsonWriter out = action.convert.pollBsonWriter();
out.writeTo(DEFAULT_HEADER); out.writeTo(DEFAULT_HEADER);
BsonReader in = action.convert.pollBsonReader(); BsonReader in = action.convert.pollBsonReader();
SncpAsyncHandler handler = null; SncpAsyncHandler handler = null;

View File

@@ -46,15 +46,11 @@ public class SncpRequest extends Request<SncpContext> {
private byte[] addrbytes = new byte[6]; private byte[] addrbytes = new byte[6];
protected SncpRequest(SncpContext context, ObjectPool<ByteBuffer> bufferPool) { protected SncpRequest(SncpContext context) {
super(context, bufferPool); super(context);
this.convert = context.getBsonConvert(); this.convert = context.getBsonConvert();
} }
protected ObjectPool<ByteBuffer> getBufferPool() {
return this.bufferPool;
}
@Override @Override
protected int readHeader(ByteBuffer buffer) { protected int readHeader(ByteBuffer buffer) {
if (buffer.remaining() < HEADER_SIZE) { if (buffer.remaining() < HEADER_SIZE) {

View File

@@ -145,12 +145,9 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
@Override @Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) { protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return SncpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null); ObjectPool<Response> pool = SncpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
} pool.setCreator((Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context), pool));
return pool;
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context, bufferPool), responsePool);
} }
} }