增加强大的HttpMessageClusterClient功能

This commit is contained in:
Redkale
2020-07-15 18:03:33 +08:00
parent 7cfbaee199
commit 70c5123f7a
7 changed files with 230 additions and 19 deletions

View File

@@ -695,7 +695,19 @@ public final class Application {
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
}
//------------------------------------- 注册 DataSource --------------------------------------------------------
resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
if (clusterAgent == null) return;
HttpMessageClient messageClient = new HttpMessageClusterClient(clusterAgent);
field.set(src, messageClient);
rf.inject(messageClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpMessageClient.class, messageClient);
} catch (Exception e) {
logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] DataSource inject error", e);
}
}, HttpMessageClient.class);
initResources();
}

View File

@@ -156,11 +156,12 @@ public abstract class ClusterAgent {
public int intervalCheckSeconds() {
return 10;
}
//获取HTTP远程服务的可用ip列表
public abstract Collection<InetSocketAddress> queryHttpAddress(String protocol, String module, String resname);
public abstract CompletableFuture<Collection<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname);
//获取远程服务的可用ip列表
protected abstract Collection<InetSocketAddress> queryAddress(ClusterEntry entry);
protected abstract CompletableFuture<Collection<InetSocketAddress>> queryAddress(ClusterEntry entry);
//注册服务
protected abstract void register(NodeServer ns, String protocol, Service service);
@@ -172,7 +173,7 @@ public abstract class ClusterAgent {
protected void updateSncpTransport(ClusterEntry entry) {
Service service = entry.serviceref.get();
if (service == null) return;
Collection<InetSocketAddress> addrs = queryAddress(entry);
Collection<InetSocketAddress> addrs = queryAddress(entry).join();
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), entry.netprotocol, entry.address, null, addrs);
}
@@ -192,7 +193,8 @@ public abstract class ClusterAgent {
return "check-" + generateApplicationServiceId();
}
protected String generateHttpServiceName(String protocol, String module, String resname) {
//也会提供给HttpMessageClusterAgent适用
public String generateHttpServiceName(String protocol, String module, String resname) {
return protocol.toLowerCase() + ":" + module + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
}

View File

@@ -0,0 +1,100 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.net.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.cluster.ClusterAgent;
import org.redkale.convert.ConvertType;
import org.redkale.net.http.*;
/**
* 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class HttpMessageClusterClient extends HttpMessageClient {
//jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET
private static final Set<String> DISALLOWED_HEADERS_SET = Set.of("connection", "content-length",
"date", "expect", "from", "host", "origin",
"referer", "upgrade", "via", "warning");
protected ClusterAgent clusterAgent;
protected java.net.http.HttpClient httpClient;
public HttpMessageClusterClient(ClusterAgent clusterAgent) {
super(null);
Objects.requireNonNull(clusterAgent);
this.clusterAgent = clusterAgent;
this.httpClient = java.net.http.HttpClient.newHttpClient();
}
@Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return httpAsync(userid, request);
}
@Override
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
httpAsync(userid, request);
}
protected CompletableFuture<HttpResult<byte[]>> httpAsync(int userid, HttpSimpleRequest req) {
String module = req.getRequestURI();
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
Map<String, String> headers = req.getHeaders();
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000));
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid);
if (headers != null) headers.forEach((n, v) -> {
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
});
builder.header("Content-Type", "x-www-form-urlencoded");
String paramstr = req.getParametersToString();
if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
return forEachCollectionFuture(userid, req, builder, addrs.iterator());
});
}
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(int userid, HttpSimpleRequest req, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) {
if (!it.hasNext()) return CompletableFuture.completedFuture(null);
InetSocketAddress addr = it.next();
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI();
builder.uri(URI.create(url));
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> {
if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, builder, it);
HttpResult rs = new HttpResult();
java.net.http.HttpHeaders hs = resp.headers();
if (hs != null) {
Map<String, List<String>> hm = hs.map();
if (hm != null) {
for (Map.Entry<String, List<String>> en : hm.entrySet()) {
List<String> val = en.getValue();
if (val != null && val.size() == 1) {
rs.header(en.getKey(), val.get(0));
}
}
}
}
rs.setResult(resp.body());
return CompletableFuture.completedFuture(rs);
});
}
}

View File

@@ -30,19 +30,22 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
@Override
public byte[] encode(HttpSimpleRequest data) {
byte[] requestURI = MessageCoder.getBytes(data.getRequestURI()); //long-string
byte[] path = MessageCoder.getBytes(data.getRequestURI()); //short-string
byte[] remoteAddr = MessageCoder.getBytes(data.getRemoteAddr());//short-string
byte[] sessionid = MessageCoder.getBytes(data.getSessionid());//short-string
byte[] contentType = MessageCoder.getBytes(data.getContentType());//short-string
byte[] headers = MessageCoder.getBytes(data.getHeaders());
byte[] params = MessageCoder.getBytes(data.getParams());
byte[] body = MessageCoder.getBytes(data.getBody());
int count = 1 + 4 + requestURI.length + 2 + remoteAddr.length + 2 + sessionid.length
int count = 1 + 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length + 2 + sessionid.length
+ 2 + contentType.length + 4 + headers.length + params.length + 4 + body.length;
byte[] bs = new byte[count];
ByteBuffer buffer = ByteBuffer.wrap(bs);
buffer.put((byte) (data.isRpc() ? 'T' : 'F'));
buffer.putInt(requestURI.length);
if (requestURI.length > 0) buffer.put(requestURI);
buffer.putChar((char) path.length);
if (remoteAddr.length > 0) buffer.put(path);
buffer.putChar((char) remoteAddr.length);
if (remoteAddr.length > 0) buffer.put(remoteAddr);
buffer.putChar((char) sessionid.length);
@@ -64,6 +67,7 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
HttpSimpleRequest req = new HttpSimpleRequest();
req.setRpc(buffer.get() == 'T');
req.setRequestURI(MessageCoder.getLongString(buffer));
req.setPath(MessageCoder.getShortString(buffer));
req.setRemoteAddr(MessageCoder.getShortString(buffer));
req.setSessionid(MessageCoder.getShortString(buffer));
req.setContentType(MessageCoder.getShortString(buffer));

View File

@@ -34,6 +34,9 @@ import org.redkale.util.*;
*/
public class HttpRequest extends Request<HttpContext> {
protected static final Serializable CURRUSERID_NIL = new Serializable() {
};
public static final String SESSIONID_NAME = "JSESSIONID";
protected boolean rpc;
@@ -76,7 +79,7 @@ public class HttpRequest extends Request<HttpContext> {
protected Annotation[] annotations;
// @since 2.1.0
protected Serializable currentUserid;
protected Serializable currentUserid = CURRUSERID_NIL;
protected Object currentUser;
@@ -116,10 +119,20 @@ public class HttpRequest extends Request<HttpContext> {
public HttpSimpleRequest createSimpleRequest(String prefix) {
HttpSimpleRequest req = new HttpSimpleRequest();
req.setBody(array.size() == 0 ? null : array.getBytes());
req.setHeaders(headers.isEmpty() ? null : headers);
if (!headers.isEmpty()) {
if (headers.containsKey(Rest.REST_HEADER_RPC_NAME)
|| headers.containsKey(Rest.REST_HEADER_CURRUSERID_NAME)) { //外部request不能包含RPC的header信息
req.setHeaders(new HashMap<>(headers));
req.removeHeader(Rest.REST_HEADER_RPC_NAME);
req.removeHeader(Rest.REST_HEADER_CURRUSERID_NAME);
} else {
req.setHeaders(headers);
}
}
req.setParams(params.isEmpty() ? null : params);
req.setRemoteAddr(getRemoteAddr());
req.setContentType(getContentType());
req.setPath(prefix);
String uri = this.requestURI;
if (prefix != null && !prefix.isEmpty() && uri.startsWith(prefix)) {
uri = uri.substring(prefix.length());
@@ -221,6 +234,14 @@ public class HttpRequest extends Request<HttpContext> {
case "user-agent":
headers.put("User-Agent", value);
break;
case Rest.REST_HEADER_RPC_NAME:
this.rpc = "true".equalsIgnoreCase(value);
headers.put(name, value);
break;
case Rest.REST_HEADER_CURRUSERID_NAME:
this.currentUserid = value;
headers.put(name, value);
break;
default:
headers.put(name, value);
}
@@ -334,14 +355,30 @@ public class HttpRequest extends Request<HttpContext> {
/**
* 获取当前用户ID<br>
*
* @param <T> 数据类型只能是int、long、String、JavaBean
* @param <T> 数据类型只能是int、long、String、JavaBean
* @param type 类型
*
* @return 用户ID
*
* @since 2.1.0
*/
@SuppressWarnings("unchecked")
public <T extends Serializable> T currentUserid() {
public <T extends Serializable> T currentUserid(Class<T> type) {
if (currentUserid == CURRUSERID_NIL || currentUserid == null) {
if (type == int.class) return (T) (Integer) (int) 0;
if (type == long.class) return (T) (Long) (long) 0;
return null;
}
if (type == int.class) {
if (this.currentUserid instanceof Number) return (T) (Integer) ((Number) this.currentUserid).intValue();
return (T) (Integer) Integer.parseInt(this.currentUserid.toString());
}
if (type == long.class) {
if (this.currentUserid instanceof Number) return (T) (Long) ((Number) this.currentUserid).longValue();
return (T) (Long) Long.parseLong(this.currentUserid.toString());
}
if (type == String.class) return (T) this.currentUserid.toString();
if (this.currentUserid instanceof CharSequence) return JsonConvert.root().convertFrom(type, this.currentUserid.toString());
return (T) this.currentUserid;
}
@@ -621,7 +658,7 @@ public class HttpRequest extends Request<HttpContext> {
this.moduleid = 0;
this.actionid = 0;
this.annotations = null;
this.currentUserid = null;
this.currentUserid = CURRUSERID_NIL;
this.currentUser = null;
this.remoteAddr = null;

View File

@@ -5,8 +5,11 @@
*/
package org.redkale.net.http;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.*;
import org.redkale.convert.ConvertColumn;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.*;
@@ -31,29 +34,33 @@ public class HttpSimpleRequest implements java.io.Serializable {
protected String requestURI;
@ConvertColumn(index = 3)
@Comment("请求的前缀")
protected String path;
@ConvertColumn(index = 4)
@Comment("客户端IP")
protected String remoteAddr;
@ConvertColumn(index = 4)
@ConvertColumn(index = 5)
@Comment("会话ID")
protected String sessionid;
@ConvertColumn(index = 5)
@ConvertColumn(index = 6)
@Comment("Content-Type")
protected String contentType;
@ConvertColumn(index = 6)
@ConvertColumn(index = 7)
protected int currentUserid;
@ConvertColumn(index = 7)
@ConvertColumn(index = 8)
@Comment("http header信息")
protected Map<String, String> headers;
@ConvertColumn(index = 8)
@ConvertColumn(index = 9)
@Comment("参数信息")
protected Map<String, String> params;
@ConvertColumn(index = 9)
@ConvertColumn(index = 10)
@Comment("http body信息")
protected byte[] body; //对应HttpRequest.array
@@ -70,6 +77,19 @@ public class HttpSimpleRequest implements java.io.Serializable {
return req;
}
@ConvertDisabled
public String getParametersToString() {
if (this.params == null || this.params.isEmpty()) return null;
final StringBuilder sb = new StringBuilder();
AtomicBoolean no2 = new AtomicBoolean(false);
this.params.forEach((n, v) -> {
if (no2.get()) sb.append('&');
sb.append(n).append('=').append(URLEncoder.encode(v, StandardCharsets.UTF_8));
no2.set(true);
});
return sb.toString();
}
public HttpSimpleRequest rpc(boolean rpc) {
this.rpc = rpc;
return this;
@@ -80,6 +100,11 @@ public class HttpSimpleRequest implements java.io.Serializable {
return this;
}
public HttpSimpleRequest path(String path) {
this.path = path;
return this;
}
public HttpSimpleRequest remoteAddr(String remoteAddr) {
this.remoteAddr = remoteAddr;
return this;
@@ -141,6 +166,18 @@ public class HttpSimpleRequest implements java.io.Serializable {
return this;
}
public HttpSimpleRequest header(String key, int value) {
if (this.headers == null) this.headers = new HashMap<>();
this.headers.put(key, String.valueOf(value));
return this;
}
public HttpSimpleRequest header(String key, long value) {
if (this.headers == null) this.headers = new HashMap<>();
this.headers.put(key, String.valueOf(value));
return this;
}
public HttpSimpleRequest param(String key, String value) {
if (this.params == null) this.params = new HashMap<>();
this.params.put(key, value);
@@ -208,6 +245,14 @@ public class HttpSimpleRequest implements java.io.Serializable {
this.requestURI = requestURI;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getSessionid() {
return sessionid;
}

View File

@@ -40,6 +40,10 @@ public final class Rest {
public static final String REST_HEADER_RESOURCE_NAME = "rest-resource-name";
public static final String REST_HEADER_RPC_NAME = "rest-rpc-name";
public static final String REST_HEADER_CURRUSERID_NAME = "rest-curruserid-name";
static final String REST_SERVICE_FIELD_NAME = "_redkale_service";
static final String REST_TOSTRINGOBJ_FIELD_NAME = "_redkale_tostringsupplier";
@@ -1349,7 +1353,14 @@ public final class Rest {
varInsns.add(new int[]{ALOAD, maxLocals});
} else if (userid != null) { //HttpRequest.currentUserid
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "currentUserid", "()Ljava/io/Serializable;", false);
if (ptype == int.class) {
mv.visitFieldInsn(GETSTATIC, "java/lang/Integer", "TYPE", "Ljava/lang/Class;");
} else if (ptype == long.class) {
mv.visitFieldInsn(GETSTATIC, "java/lang/Long", "TYPE", "Ljava/lang/Class;");
} else {
mv.visitLdcInsn(Type.getType(Type.getInternalName(ptype)));
}
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "currentUserid", "(Ljava/lang/Class;)Ljava/io/Serializable;", false);
if (ptype == int.class) {
mv.visitTypeInsn(CHECKCAST, "java/lang/Integer");
mv.visitInsn(ICONST_0);