This commit is contained in:
Redkale
2020-06-03 19:46:25 +08:00
parent 6909748bda
commit 4806d6ada0
6 changed files with 106 additions and 23 deletions

View File

@@ -0,0 +1,66 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* MQ管理器
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class HttpRespProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageAgent agent;
protected final ConcurrentHashMap<Long, RespFutureNode> respNodes = new ConcurrentHashMap<>();
public HttpRespProcessor(Logger logger, MessageAgent agent) {
this.logger = logger;
this.agent = agent;
}
@Override
public void process(MessageRecord message) {
RespFutureNode node = respNodes.get(message.getSeqid());
if (node == null) {
logger.log(Level.WARNING, HttpRespProcessor.class.getSimpleName() + " process " + message + " error");
return;
}
node.future.complete(message);
}
public CompletableFuture<MessageRecord> createFuture(long seqid) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
RespFutureNode node = new RespFutureNode(seqid, future);
respNodes.put(seqid, node);
return future;
}
protected static class RespFutureNode {
public final long seqid;
public final long createtime;
public final CompletableFuture<MessageRecord> future;
public RespFutureNode(long seqid, CompletableFuture<MessageRecord> future) {
this.seqid = seqid;
this.future = future;
this.createtime = System.currentTimeMillis();
}
}
}

View File

@@ -54,6 +54,10 @@ public abstract class MessageAgent {
public void init(AnyValue config) {
}
public final CompletableFuture<MessageRecord> createSncpRespFuture(MessageRecord message) {
return this.sncpRespProcessor.createFuture(message.getSeqid());
}
public final synchronized void startSncpRespConsumer() {
if (this.sncpRespStartms >= 0) return;
long s = System.currentTimeMillis();

View File

@@ -23,7 +23,7 @@ public abstract class MessageProducer extends Thread {
protected volatile boolean closed;
public abstract CompletableFuture<MessageRecord> apply(MessageRecord message);
public abstract CompletableFuture<Void> apply(MessageRecord message);
protected abstract void waitFor();

View File

@@ -45,12 +45,12 @@ public class SncpMessageResponse extends SncpResponse {
if (out == null) {
final byte[] result = new byte[SncpRequest.HEADER_SIZE];
fillHeader(ByteBuffer.wrap(result), 0, retcode);
producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, (byte[]) null));
producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, (byte[]) null));
return;
}
final int respBodyLength = out.count(); //body总长度
final byte[] result = out.toArray();
fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode);
producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, result));
producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, result));
}
}

View File

@@ -1,19 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpRespConsumer {
}

View File

@@ -5,7 +5,8 @@
*/
package org.redkale.mq;
import java.util.logging.Logger;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* MQ管理器
@@ -23,6 +24,8 @@ public class SncpRespProcessor implements MessageProcessor {
protected final MessageAgent agent;
protected final ConcurrentHashMap<Long, RespFutureNode> respNodes = new ConcurrentHashMap<>();
public SncpRespProcessor(Logger logger, MessageAgent agent) {
this.logger = logger;
this.agent = agent;
@@ -30,5 +33,34 @@ public class SncpRespProcessor implements MessageProcessor {
@Override
public void process(MessageRecord message) {
RespFutureNode node = respNodes.get(message.getSeqid());
if (node == null) {
logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error");
return;
}
node.future.complete(message);
}
public CompletableFuture<MessageRecord> createFuture(long seqid) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
RespFutureNode node = new RespFutureNode(seqid, future);
respNodes.put(seqid, node);
return future;
}
protected static class RespFutureNode {
public final long seqid;
public final long createtime;
public final CompletableFuture<MessageRecord> future;
public RespFutureNode(long seqid, CompletableFuture<MessageRecord> future) {
this.seqid = seqid;
this.future = future;
this.createtime = System.currentTimeMillis();
}
}
}