From 70c5123f7a06c09d3daed4b763c353183d26b3c9 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 15 Jul 2020 18:03:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=BC=BA=E5=A4=A7=E7=9A=84Ht?= =?UTF-8?q?tpMessageClusterClient=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/Application.java | 14 ++- src/org/redkale/cluster/ClusterAgent.java | 10 +- .../redkale/mq/HttpMessageClusterClient.java | 100 ++++++++++++++++++ .../redkale/mq/HttpSimpleRequestCoder.java | 6 +- src/org/redkale/net/http/HttpRequest.java | 47 +++++++- .../redkale/net/http/HttpSimpleRequest.java | 59 +++++++++-- src/org/redkale/net/http/Rest.java | 13 ++- 7 files changed, 230 insertions(+), 19 deletions(-) create mode 100644 src/org/redkale/mq/HttpMessageClusterClient.java diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 673057583..2eda5d6d4 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -695,7 +695,19 @@ public final class Application { logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); } - + //------------------------------------- 注册 DataSource -------------------------------------------------------- + resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> { + try { + if (field.getAnnotation(Resource.class) == null) return; + if (clusterAgent == null) return; + HttpMessageClient messageClient = new HttpMessageClusterClient(clusterAgent); + field.set(src, messageClient); + rf.inject(messageClient, null); // 给其可能包含@Resource的字段赋值; + rf.register(resourceName, HttpMessageClient.class, messageClient); + } catch (Exception e) { + logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] DataSource inject error", e); + } + }, HttpMessageClient.class); initResources(); } diff --git a/src/org/redkale/cluster/ClusterAgent.java b/src/org/redkale/cluster/ClusterAgent.java index 37977f31a..e95a31f14 100644 --- a/src/org/redkale/cluster/ClusterAgent.java +++ b/src/org/redkale/cluster/ClusterAgent.java @@ -156,11 +156,12 @@ public abstract class ClusterAgent { public int intervalCheckSeconds() { return 10; } + //获取HTTP远程服务的可用ip列表 - public abstract Collection queryHttpAddress(String protocol, String module, String resname); + public abstract CompletableFuture> queryHttpAddress(String protocol, String module, String resname); //获取远程服务的可用ip列表 - protected abstract Collection queryAddress(ClusterEntry entry); + protected abstract CompletableFuture> queryAddress(ClusterEntry entry); //注册服务 protected abstract void register(NodeServer ns, String protocol, Service service); @@ -172,7 +173,7 @@ public abstract class ClusterAgent { protected void updateSncpTransport(ClusterEntry entry) { Service service = entry.serviceref.get(); if (service == null) return; - Collection addrs = queryAddress(entry); + Collection addrs = queryAddress(entry).join(); Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), entry.netprotocol, entry.address, null, addrs); } @@ -192,7 +193,8 @@ public abstract class ClusterAgent { return "check-" + generateApplicationServiceId(); } - protected String generateHttpServiceName(String protocol, String module, String resname) { + //也会提供给HttpMessageClusterAgent适用 + public String generateHttpServiceName(String protocol, String module, String resname) { return protocol.toLowerCase() + ":" + module + (resname == null || resname.isEmpty() ? "" : ("-" + resname)); } diff --git a/src/org/redkale/mq/HttpMessageClusterClient.java b/src/org/redkale/mq/HttpMessageClusterClient.java new file mode 100644 index 000000000..cad29cced --- /dev/null +++ b/src/org/redkale/mq/HttpMessageClusterClient.java @@ -0,0 +1,100 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.net.*; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import org.redkale.cluster.ClusterAgent; +import org.redkale.convert.ConvertType; +import org.redkale.net.http.*; + +/** + * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class HttpMessageClusterClient extends HttpMessageClient { + + //jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET + private static final Set DISALLOWED_HEADERS_SET = Set.of("connection", "content-length", + "date", "expect", "from", "host", "origin", + "referer", "upgrade", "via", "warning"); + + protected ClusterAgent clusterAgent; + + protected java.net.http.HttpClient httpClient; + + public HttpMessageClusterClient(ClusterAgent clusterAgent) { + super(null); + Objects.requireNonNull(clusterAgent); + this.clusterAgent = clusterAgent; + this.httpClient = java.net.http.HttpClient.newHttpClient(); + } + + @Override + public CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + return httpAsync(userid, request); + } + + @Override + public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + httpAsync(userid, request); + } + + protected CompletableFuture> httpAsync(int userid, HttpSimpleRequest req) { + String module = req.getRequestURI(); + module = module.substring(1); //去掉/ + module = module.substring(0, module.indexOf('/')); + Map headers = req.getHeaders(); + String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); + return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { + if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); + java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000)); + if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); + if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); + if (headers != null) headers.forEach((n, v) -> { + if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); + }); + builder.header("Content-Type", "x-www-form-urlencoded"); + String paramstr = req.getParametersToString(); + if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); + return forEachCollectionFuture(userid, req, builder, addrs.iterator()); + }); + } + + private CompletableFuture> forEachCollectionFuture(int userid, HttpSimpleRequest req, java.net.http.HttpRequest.Builder builder, Iterator it) { + if (!it.hasNext()) return CompletableFuture.completedFuture(null); + InetSocketAddress addr = it.next(); + String url = "http://" + addr.getHostString() + ":" + addr.getPort() + (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(); + builder.uri(URI.create(url)); + return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> { + if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, builder, it); + HttpResult rs = new HttpResult(); + java.net.http.HttpHeaders hs = resp.headers(); + if (hs != null) { + Map> hm = hs.map(); + if (hm != null) { + for (Map.Entry> en : hm.entrySet()) { + List val = en.getValue(); + if (val != null && val.size() == 1) { + rs.header(en.getKey(), val.get(0)); + } + } + } + } + rs.setResult(resp.body()); + return CompletableFuture.completedFuture(rs); + }); + } +} diff --git a/src/org/redkale/mq/HttpSimpleRequestCoder.java b/src/org/redkale/mq/HttpSimpleRequestCoder.java index cefca77d9..ac7139290 100644 --- a/src/org/redkale/mq/HttpSimpleRequestCoder.java +++ b/src/org/redkale/mq/HttpSimpleRequestCoder.java @@ -30,19 +30,22 @@ public class HttpSimpleRequestCoder implements MessageCoder { @Override public byte[] encode(HttpSimpleRequest data) { byte[] requestURI = MessageCoder.getBytes(data.getRequestURI()); //long-string + byte[] path = MessageCoder.getBytes(data.getRequestURI()); //short-string byte[] remoteAddr = MessageCoder.getBytes(data.getRemoteAddr());//short-string byte[] sessionid = MessageCoder.getBytes(data.getSessionid());//short-string byte[] contentType = MessageCoder.getBytes(data.getContentType());//short-string byte[] headers = MessageCoder.getBytes(data.getHeaders()); byte[] params = MessageCoder.getBytes(data.getParams()); byte[] body = MessageCoder.getBytes(data.getBody()); - int count = 1 + 4 + requestURI.length + 2 + remoteAddr.length + 2 + sessionid.length + int count = 1 + 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length + 2 + sessionid.length + 2 + contentType.length + 4 + headers.length + params.length + 4 + body.length; byte[] bs = new byte[count]; ByteBuffer buffer = ByteBuffer.wrap(bs); buffer.put((byte) (data.isRpc() ? 'T' : 'F')); buffer.putInt(requestURI.length); if (requestURI.length > 0) buffer.put(requestURI); + buffer.putChar((char) path.length); + if (remoteAddr.length > 0) buffer.put(path); buffer.putChar((char) remoteAddr.length); if (remoteAddr.length > 0) buffer.put(remoteAddr); buffer.putChar((char) sessionid.length); @@ -64,6 +67,7 @@ public class HttpSimpleRequestCoder implements MessageCoder { HttpSimpleRequest req = new HttpSimpleRequest(); req.setRpc(buffer.get() == 'T'); req.setRequestURI(MessageCoder.getLongString(buffer)); + req.setPath(MessageCoder.getShortString(buffer)); req.setRemoteAddr(MessageCoder.getShortString(buffer)); req.setSessionid(MessageCoder.getShortString(buffer)); req.setContentType(MessageCoder.getShortString(buffer)); diff --git a/src/org/redkale/net/http/HttpRequest.java b/src/org/redkale/net/http/HttpRequest.java index bdbd8d1f4..c8835b75a 100644 --- a/src/org/redkale/net/http/HttpRequest.java +++ b/src/org/redkale/net/http/HttpRequest.java @@ -34,6 +34,9 @@ import org.redkale.util.*; */ public class HttpRequest extends Request { + protected static final Serializable CURRUSERID_NIL = new Serializable() { + }; + public static final String SESSIONID_NAME = "JSESSIONID"; protected boolean rpc; @@ -76,7 +79,7 @@ public class HttpRequest extends Request { protected Annotation[] annotations; // @since 2.1.0 - protected Serializable currentUserid; + protected Serializable currentUserid = CURRUSERID_NIL; protected Object currentUser; @@ -116,10 +119,20 @@ public class HttpRequest extends Request { public HttpSimpleRequest createSimpleRequest(String prefix) { HttpSimpleRequest req = new HttpSimpleRequest(); req.setBody(array.size() == 0 ? null : array.getBytes()); - req.setHeaders(headers.isEmpty() ? null : headers); + if (!headers.isEmpty()) { + if (headers.containsKey(Rest.REST_HEADER_RPC_NAME) + || headers.containsKey(Rest.REST_HEADER_CURRUSERID_NAME)) { //外部request不能包含RPC的header信息 + req.setHeaders(new HashMap<>(headers)); + req.removeHeader(Rest.REST_HEADER_RPC_NAME); + req.removeHeader(Rest.REST_HEADER_CURRUSERID_NAME); + } else { + req.setHeaders(headers); + } + } req.setParams(params.isEmpty() ? null : params); req.setRemoteAddr(getRemoteAddr()); req.setContentType(getContentType()); + req.setPath(prefix); String uri = this.requestURI; if (prefix != null && !prefix.isEmpty() && uri.startsWith(prefix)) { uri = uri.substring(prefix.length()); @@ -221,6 +234,14 @@ public class HttpRequest extends Request { case "user-agent": headers.put("User-Agent", value); break; + case Rest.REST_HEADER_RPC_NAME: + this.rpc = "true".equalsIgnoreCase(value); + headers.put(name, value); + break; + case Rest.REST_HEADER_CURRUSERID_NAME: + this.currentUserid = value; + headers.put(name, value); + break; default: headers.put(name, value); } @@ -334,14 +355,30 @@ public class HttpRequest extends Request { /** * 获取当前用户ID
* - * @param 数据类型只能是int、long、String、JavaBean + * @param 数据类型只能是int、long、String、JavaBean + * @param type 类型 * * @return 用户ID * * @since 2.1.0 */ @SuppressWarnings("unchecked") - public T currentUserid() { + public T currentUserid(Class type) { + if (currentUserid == CURRUSERID_NIL || currentUserid == null) { + if (type == int.class) return (T) (Integer) (int) 0; + if (type == long.class) return (T) (Long) (long) 0; + return null; + } + if (type == int.class) { + if (this.currentUserid instanceof Number) return (T) (Integer) ((Number) this.currentUserid).intValue(); + return (T) (Integer) Integer.parseInt(this.currentUserid.toString()); + } + if (type == long.class) { + if (this.currentUserid instanceof Number) return (T) (Long) ((Number) this.currentUserid).longValue(); + return (T) (Long) Long.parseLong(this.currentUserid.toString()); + } + if (type == String.class) return (T) this.currentUserid.toString(); + if (this.currentUserid instanceof CharSequence) return JsonConvert.root().convertFrom(type, this.currentUserid.toString()); return (T) this.currentUserid; } @@ -621,7 +658,7 @@ public class HttpRequest extends Request { this.moduleid = 0; this.actionid = 0; this.annotations = null; - this.currentUserid = null; + this.currentUserid = CURRUSERID_NIL; this.currentUser = null; this.remoteAddr = null; diff --git a/src/org/redkale/net/http/HttpSimpleRequest.java b/src/org/redkale/net/http/HttpSimpleRequest.java index fbf865e43..0551cdba6 100644 --- a/src/org/redkale/net/http/HttpSimpleRequest.java +++ b/src/org/redkale/net/http/HttpSimpleRequest.java @@ -5,8 +5,11 @@ */ package org.redkale.net.http; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.*; -import org.redkale.convert.ConvertColumn; +import java.util.concurrent.atomic.AtomicBoolean; +import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; import org.redkale.util.*; @@ -31,29 +34,33 @@ public class HttpSimpleRequest implements java.io.Serializable { protected String requestURI; @ConvertColumn(index = 3) + @Comment("请求的前缀") + protected String path; + + @ConvertColumn(index = 4) @Comment("客户端IP") protected String remoteAddr; - @ConvertColumn(index = 4) + @ConvertColumn(index = 5) @Comment("会话ID") protected String sessionid; - @ConvertColumn(index = 5) + @ConvertColumn(index = 6) @Comment("Content-Type") protected String contentType; - @ConvertColumn(index = 6) + @ConvertColumn(index = 7) protected int currentUserid; - @ConvertColumn(index = 7) + @ConvertColumn(index = 8) @Comment("http header信息") protected Map headers; - @ConvertColumn(index = 8) + @ConvertColumn(index = 9) @Comment("参数信息") protected Map params; - @ConvertColumn(index = 9) + @ConvertColumn(index = 10) @Comment("http body信息") protected byte[] body; //对应HttpRequest.array @@ -70,6 +77,19 @@ public class HttpSimpleRequest implements java.io.Serializable { return req; } + @ConvertDisabled + public String getParametersToString() { + if (this.params == null || this.params.isEmpty()) return null; + final StringBuilder sb = new StringBuilder(); + AtomicBoolean no2 = new AtomicBoolean(false); + this.params.forEach((n, v) -> { + if (no2.get()) sb.append('&'); + sb.append(n).append('=').append(URLEncoder.encode(v, StandardCharsets.UTF_8)); + no2.set(true); + }); + return sb.toString(); + } + public HttpSimpleRequest rpc(boolean rpc) { this.rpc = rpc; return this; @@ -80,6 +100,11 @@ public class HttpSimpleRequest implements java.io.Serializable { return this; } + public HttpSimpleRequest path(String path) { + this.path = path; + return this; + } + public HttpSimpleRequest remoteAddr(String remoteAddr) { this.remoteAddr = remoteAddr; return this; @@ -141,6 +166,18 @@ public class HttpSimpleRequest implements java.io.Serializable { return this; } + public HttpSimpleRequest header(String key, int value) { + if (this.headers == null) this.headers = new HashMap<>(); + this.headers.put(key, String.valueOf(value)); + return this; + } + + public HttpSimpleRequest header(String key, long value) { + if (this.headers == null) this.headers = new HashMap<>(); + this.headers.put(key, String.valueOf(value)); + return this; + } + public HttpSimpleRequest param(String key, String value) { if (this.params == null) this.params = new HashMap<>(); this.params.put(key, value); @@ -208,6 +245,14 @@ public class HttpSimpleRequest implements java.io.Serializable { this.requestURI = requestURI; } + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + public String getSessionid() { return sessionid; } diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 1491a7294..690a303f3 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -40,6 +40,10 @@ public final class Rest { public static final String REST_HEADER_RESOURCE_NAME = "rest-resource-name"; + public static final String REST_HEADER_RPC_NAME = "rest-rpc-name"; + + public static final String REST_HEADER_CURRUSERID_NAME = "rest-curruserid-name"; + static final String REST_SERVICE_FIELD_NAME = "_redkale_service"; static final String REST_TOSTRINGOBJ_FIELD_NAME = "_redkale_tostringsupplier"; @@ -1349,7 +1353,14 @@ public final class Rest { varInsns.add(new int[]{ALOAD, maxLocals}); } else if (userid != null) { //HttpRequest.currentUserid mv.visitVarInsn(ALOAD, 1); - mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "currentUserid", "()Ljava/io/Serializable;", false); + if (ptype == int.class) { + mv.visitFieldInsn(GETSTATIC, "java/lang/Integer", "TYPE", "Ljava/lang/Class;"); + } else if (ptype == long.class) { + mv.visitFieldInsn(GETSTATIC, "java/lang/Long", "TYPE", "Ljava/lang/Class;"); + } else { + mv.visitLdcInsn(Type.getType(Type.getInternalName(ptype))); + } + mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "currentUserid", "(Ljava/lang/Class;)Ljava/io/Serializable;", false); if (ptype == int.class) { mv.visitTypeInsn(CHECKCAST, "java/lang/Integer"); mv.visitInsn(ICONST_0);