Net 组件介绍
Net 组件是基于AIO(NIO.2)的一套TCP/UDP的网络框架,且只提供异步接口。org.redkale.net 是所有网络协议服务的基础包。RedKale内置HTTP和远程模式Service依赖的SNCP(Service Node Communicate Protocol)协议的实现包。RedKale启动的<server>节点服务都是基于Net组件实现的协议。下面详细介绍 HTTP服务 和 SNCP协议。
HTTP 服务
RedKale自实现的HTTP服务接口并不遵循Java EE规范JSR 340(Servlet 3.1),RedKale提倡的是HTTP+JSON的服务接口方式因此没有实现JSP规范,HTTP+JSON服务接口几乎适合所有类型的客户端(PC应用程序、PC Web、微信H5、移动APP、移动Web)开发。其与JSR 340(Servlet 3.1)的主要区别如下:
1、内置参数的JSON反序列化和响应结果的序列化接口。
2、对数值型的参数和header值提供简易接口。
3、不支持JSP,提倡的是HTTP+JSON接口方式。
4、无HttpSession对象,session可通过配置CacheSource实现。
5、内置文件上传接口,且可自实现断点续传。
6、对响应结果内容可以进行短期缓存从而减少数据源操作的压力。
7、内置WebSocket的集群与组功能,且提供伪WebSocket连接功能。
8、HttpResponse只能异步输出。
编写RedKale的HttpServlet与 JSR 340中的javax.servlet.http.HttpServlet 基本相同,只需继承 org.redkale.net.http.HttpServlet, Redkale也提供了更友好的基类 org.redkale.net.http.BasedHttpServlet, 比较好的习惯是一个项目先定义一个项目级的BaseServlet类,这样方便以后加入类似javax.servlet.Filter的功能。
一个典型的BaseSerlvet实现:
public class BaseSerlvet extends org.redkale.net.http.BasedHttpServlet {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean fine = logger.isLoggable(Level.FINE);
@Resource //[RedKale内置资源] 进程的启动时间
protected long serverCreateTime;
@Resource //[RedKale内置资源]
protected JsonConvert jsonConvert;
@Resource //[RedKale内置资源]
protected JsonFactory jsonFactory;
//[RedKale内置资源], 当前进程的根目录,字段类型可以是 String、java.io.File、java.nio.file.Path
@Resource(name = "APP_HOME")
protected File home;
//[RedKale内置资源], 当前Http Server的web页面的根目录,字段类型可以是 String、java.io.File、java.nio.file.Path
@Resource(name = "SERVER_ROOT")
protected File webroot;
@Resource
private UserService service;
//在调用authenticate之前调用, 返回false表示请求不合法
//该方法可以用于判断请求源是否合法或加入一些全局的拦截操作
@Override
public boolean preExecute(final HttpRequest request, final HttpResponse response) throws IOException {
if (!request.getHeader("User-Agent", "").contains("RedKale-Agent")) { //只用移动APP的接口可以判断User-Agent是否正确
response.addHeader("retcode", "10001");
response.addHeader("retmessage", "User-Agent error");
response.finish(201, "{'success':false, 'message':'User-Agent error, must be RedKale-Agent'}");
return false;
}
//可以加上一些统计操作
if (fine) response.setRecycleListener((req, resp) -> { //记录处理时间太长的请求操作
long e = System.currentTimeMillis() - request.getCreatetime();
if (e > 500) logger.fine("耗时居然用了 " + e + " 毫秒. 请求为: " + req);
});
return true;
}
//标记为@AuthIgnore 的方法将不会调用authenticate方法
//一般用于判断用户的登录态, 返回false表示鉴权失败
//moduleid值来自 @WebServlet.moduleid() 用于定义模块ID; actionid值自来@WebAction.actionid() 用于定义操作ID; 需要系统化的鉴权需要定义这两个值
@Override
public boolean authenticate(int moduleid, int actionid, HttpRequest request, HttpResponse response) throws IOException {
UserInfo user = (UserInfo) request.getAttribute("_current_userinfo");
if (user != null) return true; //已经判断过了
String sessionid = request.getSessionid(false);
if (sessionid == null) return false; //没有sessionid表示没有登录
user = service.current(sessionid);
if (user != null) request.setAttribute("_current_userinfo", user);
return user != null; //存在用户表示登录态正常
}
}继承BasedHttpServlet的子类可以使用其自带的鉴权、请求分支、缓存等功能, 一个典型的操作用户HttpServlet:
@WebServlet({"/user/*"}) //拦截所有 /user/ 开头的请求
public class UserServlet extends BaseSerlvet {
@Resource
private UserService service;
//登录操作
@AuthIgnore //登录操作不要判断登录态,所以需要标记为@AuthIgnore,不会调用 BaseSerlvet.authenticate 方法
//因为WebAction的判断规则用的是String.startsWith,所以WebAction.url不能用正则表达式,只能是getRequestURI的前缀
//且同一个HttpServlet类内的所有WebAction不能存在包含关系, 如 /user/myinfo 和 /user/myinforecord 不能存在同一HttpServlet中。
@WebAction(url = "/user/login")
public void login(HttpRequest req, HttpResponse resp) throws IOException {
LoginBean bean = req.getJsonParameter(LoginBean.class, "bean"); //获取参数
RetResult<UserInfo> result = service.login(bean); //登录操作, service内部判断bean的合法性
resp.finishJson(result); //输出结果
}
//获取当前用户信息
//未登录的请求会被BaseSerlvet.authenticate方法拦截,因此能进入该方法说明用户态存在
@WebAction(url = "/user/myinfo")
public void myinfo(HttpRequest req, HttpResponse resp) throws IOException {
UserInfo user = service.current(req.getSessionid(false));
//或者使用 UserInfo user = req.getAttribute("_current_userinfo"); 因为BaseSerlvet.authenticate方法已经将UserInfo注入到_current_userinfo属性中
resp.finishJson(user); //输出用户信息
}
//获取指定用户ID的用户信息, 请求如: /user/username/43565443
@AuthIgnore
// 默认缓存时间是15秒,BasedHttpServlet会将每个进入该方法的请求的响应结果缓存15秒,缓存命中时不会再进入该方法,过期会清空。
// @HttpCacheable 必须配合 @AuthIgnore 使用, 因为跟当前用户有关的请求一般不适合所有用户请求。
// 翻页查询想缓存就需要将翻页信息带进url: /user/query/page:2/size:50 。
@HttpCacheable(timeout = 30) //有效期30秒
@WebAction(url = "/user/userinfo/")
public void userinfo(HttpRequest req, HttpResponse resp) throws IOException {
UserInfo user = service.findUserInfo(Integer.parseInt(req.getRequstURILastPath()));
resp.finishJson(user); //输出用户信息
}
//更新个人头像
@WebAction(url = "/user/updateface")
public void updateface(HttpRequest req, HttpResponse resp) throws IOException {
UserInfo user = service.current(req.getSessionid(false));
for (MultiPart part : req.multiParts()) { //遍历上传文件列表
byte[] byts = part.getContentBytes(5 * 1024 * 1024L); //图片大小不能超过5M,超过5M返回null
if (byts == null) {
resp.finishJson(new RetResult(102, "上传的文件过大"));
} else {
BufferedImage img = ImageIO.read(new ByteArrayInputStream(byts));
//... 此处处理并存储头像图片
resp.finishJson(RetResult.SUCCESS); //输出成功信息
}
return; //头像图片仅会传一个, 只需要读取一个即可返回
}
resp.finishJson(new RetResult(103, "没有上传图片"));
}
}如上,所有/user/前缀的请求都会进入UserServlet,若没匹配的则返回505错误,为了方便以后编写前方静动分离服务器转发规则,比较好的习惯是将项目中所有动态Servlet加一个固定前缀,在 application.xml 里设置path即可。
<server protocol="HTTP" port="6060" root="root">
<services autoload="true" />
<servlets path="/pipes" autoload="true"/>
</server>如上, 配置了/pipes 前缀后,客户端发送Servlet请求需带上前缀,请求当前用户信息的url就变成:/pipes/user/myinfo 。
@WebServlet("/ws/chat")
public class ChatWebSocketServlet extends WebSocketServlet {
@Resource //[RedKale内置资源]
protected JsonConvert jsonConvert;
@Resource
private UserService userService;
@Override
protected WebSocket createWebSocket() {
return new WebSocket() {
private UserInfo user;
@Override
public void onMessage(String text) { // text 接收的格式: {"receiveid":200000001, "content":"Hi Redkale!"}
final ChatMessage message = jsonConvert.convertFrom(ChatMessage.class, text); //获取给对方的消息体信息
message.sendid = user.getUserid(); //将当前用户设为消息的发送方
message.sendtime = System.currentTimeMillis(); //设置消息发送时间
//给接收方发送消息, 即使接收方在其他WebSocket进程节点上有链接,RedKale也会自动发送到其他链接进程节点上。
super.sendEachMessage(message.receiveid, jsonConvert.convertTo(message));
}
@Override
protected Serializable createGroupid() { //以用户ID作为WebSocketGroup的groupid
this.user = userService.current(String.valueOf(super.getSessionid()));
return this.user == null ? null : this.user.getUserid();
}
@Override
public Serializable onOpen(HttpRequest request) {
return request.getSessionid(false); //以request中的sessionid字符串作为WebSocket的sessionid
}
};
}
public static class ChatMessage {
public int sendid; //发送方用户ID
public int receiveid; //接收方用户ID
public String content; //文本消息内容
public long sendtime; //消息发送时间
}
}. HttpRequest 对象
public class HttpRequest {
//获取请求方法 GET、POST等
public String getMethod();
//获取协议名 http、https、ws、wss等
public String getProtocol();
//获取Host的Header值
public String getHost();
//获取请求内容的长度, 为-1表示内容长度不确定
public long getContentLength();
//获取Content-Type的header值
public String getContentType();
//获取Connection的Header值
public String getConnection();
//获取客户端地址IP
public SocketAddress getRemoteAddress();
//获取客户端地址IP, 与getRemoteAddres() 的区别在于:本方法优先取header中指定为RemoteAddress名的值,没有则返回getRemoteAddres()的getHostAddress()。
//本方法适用于服务前端有如nginx的代理服务器进行中转,通过getRemoteAddres()是获取不到客户端的真实IP。
public String getRemoteAddr();
//获取请求内容指定的编码字符串
public String getBody(Charset charset);
//获取请求内容的UTF-8编码字符串
public String getBodyUTF8();
//获取文件上传对象
public MultiContext getMultiContext();
//获取文件上传信息列表 等价于 getMultiContext().parts();
public Iterable<MultiPart> multiParts() throws IOException;
//获取sessionid
public String getSessionid(boolean autoCreate);
//更新sessionid
public String changeSessionid();
//使sessionid失效
public void invalidateSession();
//获取所有Cookie对象
public java.net.HttpCookie[] getCookies();
//获取Cookie值, 没有返回默认值
public String getCookie(String name, String defaultValue);
//获取Cookie值
public String getCookie(String name);
//获取请求的URL
public String getRequestURI();
//截取getRequestURI最后的一个/后面的部分
public String getRequstURILastPath();
//从prefix之后截取getRequestURI再对"/"进行分隔
public String[] getRequstURIPaths(String prefix);
//获取请求URL分段中含prefix段的long值
// 例如请求URL /pipes/record/query/time:1453104341363/id:40
// 获取time参数: long time = request.getRequstURIPath("time:", 0L);
public long getRequstURIPath(String prefix, long defaultValue);
//获取请求URL分段中含prefix段的int值
// 例如请求URL /pipes/record/query/page:2/size:50
// 获取page参数: int page = request.getRequstURIPath("page:", 1);
// 获取size参数: int size = request.getRequstURIPath("size:", 20);
public int getRequstURIPath(String prefix, int defaultValue);
//获取请求URL分段中含prefix段的值
//例如请求URL /pipes/record/query/name:hello
//获取name参数: String name = request.getRequstURIPath("name:", "none");
public String getRequstURIPath(String prefix, String defaultValue);
// 获取请求URL分段中含prefix段的short值
// 例如请求URL /pipes/record/query/type:10
// 获取type参数: short type = request.getRequstURIPath("type:", (short)0);
public short getRequstURIPath(String prefix, short defaultValue);
//获取所有的header名
public String[] getHeaderNames();
// 获取指定的header值
public String getHeader(String name);
//获取指定的header值, 没有返回默认值
public String getHeader(String name, String defaultValue);
//获取指定的header的json值
public <T> T getJsonHeader(JsonConvert convert, Class<T> clazz, String name);
//获取指定的header的json值
public <T> T getJsonHeader(Class<T> clazz, String name);
//获取指定的header的boolean值, 没有返回默认boolean值
public boolean getBooleanHeader(String name, boolean defaultValue);
// 获取指定的header的short值, 没有返回默认short值
public short getShortHeader(String name, short defaultValue);
//获取指定的header的int值, 没有返回默认int值
public int getIntHeader(String name, int defaultValue);
// 获取指定的header的float值, 没有返回默认float值
public float getFloatHeader(String name, float defaultValue);
// 获取指定的header的long值, 没有返回默认long值
public long getLongHeader(String name, long defaultValue);
//获取指定的header的double值, 没有返回默认double值
public double getDoubleHeader(String name, double defaultValue);
//获取所有参数名
public String[] getParameterNames();
//获取指定的参数值
public String getParameter(String name);
//获取指定的参数值, 没有返回默认值
public String getParameter(String name, String defaultValue);
//获取指定的参数json值
public <T> T getJsonParameter(JsonConvert convert, Class<T> clazz, String name);
//获取指定的参数json值
public <T> T getJsonParameter(Class<T> clazz, String name);
//获取指定的参数boolean值, 没有返回默认boolean值
public boolean getBooleanParameter(String name, boolean defaultValue);
//获取指定的参数short值, 没有返回默认short值
public short getShortParameter(String name, short defaultValue);
//获取指定的参数int值, 没有返回默认int值
public int getIntParameter(String name, int defaultValue);
//获取指定的参数float值, 没有返回默认float值
public float getFloatParameter(String name, float defaultValue);
//获取指定的参数long值, 没有返回默认long值
public long getLongParameter(String name, long defaultValue);
//获取指定的参数double值, 没有返回默认double值
public double getDoubleParameter(String name, double defaultValue);
//获取HTTP上下文对象
public HttpContext getContext();
//获取所有属性值, servlet执行完后会被清空
public Map<String, Object> getAttributes();
//获取指定属性值
public <T> T getAttribute(String name);
//删除指定属性
public void removeAttribute(String name);
//设置属性值
public void setAttribute(String name, Object value);
//获取request创建时间
public long getCreatetime();
}. HttpResponse 对象
public class HttpResponse {
//设置状态码
public void setStatus(int status);
//获取状态码
public int getStatus();
//获取 ContentType
public String getContentType();
//设置 ContentType
public void setContentType(String contentType);
//获取内容长度
public long getContentLength();
//设置内容长度
public void setContentLength(long contentLength);
//设置Header值
public void setHeader(String name, Object value);
//添加Header值
public void addHeader(String name, Object value);
//跳过header的输出
//通常应用场景是,调用者的输出内容里已经包含了HTTP的响应头信息,因此需要调用此方法避免重复输出HTTP响应头信息。
public void skipHeader();
//增加Cookie值
public void addCookie(HttpCookie... cookies);
//异步输出指定内容
public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler);
//关闭HTTP连接,如果是keep-alive则不强制关闭
public void finish();
//强制关闭HTTP连接
public void finish(boolean kill);
//将对象以JSON格式输出
public void finishJson(Object obj);
//将对象以JSON格式输出
public void finishJson(JsonConvert convert, Object obj);
//将对象以JSON格式输出
public void finishJson(Type type, Object obj);
//将对象以JSON格式输出
public void finishJson(final JsonConvert convert, final Type type, final Object obj);
//将对象以JSON格式输出
public void finishJson(final Object... objs);
//将指定字符串以响应结果输出
public void finish(String obj);
//以指定响应码附带内容输出, message 可以为null
public void finish(int status, String message);
//以304状态码输出
public void finish304();
//以404状态码输出
public void finish404();
//将指定ByteBuffer按响应结果输出
public void finish(ByteBuffer buffer);
//将指定ByteBuffer按响应结果输出
//kill 输出后是否强制关闭连接
public void finish(boolean kill, ByteBuffer buffer);
//将指定ByteBuffer数组按响应结果输出
public void finish(ByteBuffer... buffers);
//将指定ByteBuffer数组按响应结果输出
//kill 输出后是否强制关闭连接
public void finish(boolean kill, ByteBuffer... buffers);
//将指定文件按响应结果输出
public void finish(File file) throws IOException;
}. WebSocket 对象
public abstract class WebSocket {
//发送消息体, 包含二进制/文本 返回结果0表示成功,非0表示错误码
public int send(WebSocketPacket packet);
//发送单一的文本消息 返回结果0表示成功,非0表示错误码
public int send(String text);
//发送文本消息 返回结果0表示成功,非0表示错误码
public int send(String text, boolean last);
//发送单一的二进制消息 返回结果0表示成功,非0表示错误码
public int send(byte[] data);
//发送单一的二进制消息 返回结果0表示成功,非0表示错误码
public int send(byte[] data, boolean last);
//发送消息, 消息类型是String或byte[] 返回结果0表示成功,非0表示错误码
public int send(Serializable message, boolean last);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息
public int sendEachMessage(Serializable groupid, String text);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息
public int sendEachMessage(Serializable groupid, String text, boolean last);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
public int sendEachMessage(Serializable groupid, byte[] data);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
public int sendEachMessage(Serializable groupid, byte[] data, boolean last);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息
public int sendRecentMessage(Serializable groupid, String text);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息
public int sendRecentMessage(Serializable groupid, String text, boolean last);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息
public int sendRecentMessage(Serializable groupid, byte[] data);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息
public int sendRecentMessage(Serializable groupid, byte[] data, boolean last);
//发送PING消息 返回结果0表示成功,非0表示错误码
public int sendPing();
//发送PING消息,附带其他信息 返回结果0表示成功,非0表示错误码
public int sendPing(byte[] data);
//发送PONG消息,附带其他信息 返回结果0表示成功,非0表示错误码
public int sendPong(byte[] data);
//获取当前WebSocket下的属性
public <T> T getAttribute(String name);
//移出当前WebSocket下的属性
public <T> T removeAttribute(String name);
//给当前WebSocket下的增加属性
public void setAttribute(String name, Object value);
//获取当前WebSocket所属的groupid
public Serializable getGroupid();
//获取当前WebSocket的会话ID, 不会为null
public Serializable getSessionid();
//获取客户端直接地址, 当WebSocket连接是由代理服务器转发的,则该值固定为代理服务器的IP地址
public SocketAddress getRemoteAddress();
//获取客户端真实地址 同 HttpRequest.getRemoteAddr()
public String getRemoteAddr();
//获取WebSocket创建时间
public long getCreatetime();
//显式地关闭WebSocket
public void close();
//获取当前WebSocket所属的WebSocketGroup, 不会为null
protected WebSocketGroup getWebSocketGroup();
//获取指定groupid的WebSocketGroup, 没有返回null
protected WebSocketGroup getWebSocketGroup(Serializable groupid);
//获取当前进程节点所有在线的WebSocketGroup
protected Collection<WebSocketGroup> getWebSocketGroups();
//获取在线用户的节点地址列表
protected Collection<InetSocketAddress> getOnlineNodes(Serializable groupid);
//获取在线用户的详细连接信息
protected Map<InetSocketAddress, List<String>> getOnlineRemoteAddress(Serializable groupid);
//返回sessionid, null表示连接不合法或异常,默认实现是request.getSessionid(false),通常需要重写该方法
public Serializable onOpen(final HttpRequest request);
//创建groupid, null表示异常, 必须实现该方法, 通常为用户ID为groupid
protected abstract Serializable createGroupid();
//标记为@WebSocketBinary才需要重写此方法
public void onRead(AsyncConnection channel) {
}
public void onConnected() {
}
public void onMessage(String text) {
}
public void onPing(byte[] bytes) {
}
public void onPong(byte[] bytes) {
}
public void onMessage(byte[] bytes) {
}
public void onFragment(String text, boolean last) {
}
public void onFragment(byte[] bytes, boolean last) {
}
public void onClose(int code, String reason) {
}
}SNCP 协议
自定义协议
协议的网络框架包含五种对象:
Context : 协议上下文对象
Request : 服务请求对象
Response : 服务响应对象
Servlet : 服务逻辑处理对象
Server : 服务监听对象
通常自定义协议需要继承上面五种对象类,同时为了让RedKale能识别和加载自定义协议服务需要继承 org.redkale.boot.NodeServer 并指明 @NodeProtocol,实现可以参考 基于SOCKS5协议的反向代理服务器