更新:2016/01/14,补充基于HTTP流实现comet在服务端一个利用阻塞队列的例子。
1. 为什么需要WebSocket:
回答这个首先需要知道一些历史,我们知道HTTP是客户端向服务器请求获取数据的普遍方式,但是它是一种被动性的通信机制。request=response,服务器只有在接受到客户端请求的时候才可能向客户端发送数据。但有时候服务器需要告诉客户端有新的数据应该接受,比如消息,即时聊天等功能,也就是说我们应该需要一种全双工的通信方式,而HTTP的被动性使得这一问题很难解决。
你可能会问webSocket很新啊,在这之前是如果实现类似的功能的呢?下面介绍几种技术用来在在“单连接的HTTP中模拟全双工”的:
解决方案一:频繁轮询:
客户端以较短的时间间隔向服务器请求新的数据,比如1秒:
那么请求过程可能是这样的:
client:
GET/ajaxEndponit
有没有新数据
server:
200 OK Content-Length:123
有,给你
1s...
client:
GET/ajaxEndponit
有没有新数据
server:
200 OK Content-Length:0
没有。。。
1s...
client:
GET/ajaxEndponit
有没有新数据
server:
200 OK Content-Length:0
没有。。。
1s...
...n秒后
client:
GET/ajaxEndponit
有没有新数据
server:
200 OK Content-Length:123
有,给你
显然这种方案产生了大量的请求,其中很多请求是无意义的,造成了大量的浪费。这时你可能会想:要是可以减少连接数就好了,这就是长轮询。
解决方案二:长轮询:
HTTP是
被动性的,因此想要减少连接数量只能增长连接的时间否则你无法及时返回新的数据。客户端发起一个超时时间较长(比如20秒)的请求,服务器在没有数据的时候并不立即返回,而是以某种方式阻塞(使用阻塞队列神马的),当有数据的时候在返回,或者在时间超时时返回无数据。
那么请求过程可能是这样的:
clinet:GET/longPollEndpoint
server:200 OK Content-Length:123
有,给你
client:GET/longPollEndpoint
server检查没有数据,阻塞等待
.
. 20秒
.
server:200 OK Content-Length:0
没有数据。。。
client:GET/longPollEndpoint
server:200 OK Content-Length:123有,给你
可以看到,连接的数量大大减少了,但是存在连接几乎一直存在啊,这样会不会有什么问题?有,首先,HTTP/1.1规范中对同一主机名的访问是有数量限制的:不超过2个,也就是我们少了一半可以请求其他数据的连接资源。。。再有,如果在服务器没有数据阻塞的时候客户端又有新数据要请求时怎么办,只能另起一个并行的请求去做了。
补充:HTTP流实现
除了长轮询,还可以使用基于HTTP流的方式,服务端采用阻塞队列的时间:使用AsyncContext异步处理写个例子:
使用一个简单的生产者-消费者模式:
<span style="white-space:pre"> </span>AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0); //默认是Servlet的超时时间,默认30,设为0或负数表示notimeout
/* 略,可以设置AsyncListener等等 */
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
asyncContext.start(new Runnable() {
@Override
public void run() {
try {
while (true) {
String msg = queue.take();
PrintWriter out;
(out = asyncContext.getResponse().getWriter()).println(msg);
out.flush();
}
} catch (Exception e) {
new RuntimeException(e);
}
asyncContext.complete(); //任务完成调用onComplete,通知,调用回调函数
}
});
new Thread(new DataHandler(queue)).start();
生产者代码:
private static class DataHandler implements Runnable {
private final BlockingQueue<String> queue;
public DataHandler(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
queue.put("new msg!");
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
logger.error(e);
}
}
}
}
在浏览器(chrome)地址栏请求,一般默认是notimeout,因此超时时间就是我在服务端设置的时间(如果不设置,AsyncContext的默认时间是ServletContext的超时时间,如果设置为0或负数,就是notimeout)。
这就是一个长时间保持的HTTP连接,在服务器方法超时或方法返回之前HTTP流不会关闭,通过HTTP流向客户端周期性地发送数据,客户端响应数据送达事件(而不是完成事件)来做出处理。
解决方案三:分块编码:
这个方案可能是为了解决浏览器长时间等待而创建的,一直等待响应对浏览器来说并不友好,但我觉得这个方案有些奇葩,分块的思想大致是这样的,既然我发现有很多时间都是在等待,那我有数据的时候就不一次性返回给客户端了,把它分割分几次返回给客户端,这样看上去浏览不就是一直在请求和获取数据吗。。。这个方法并没有什么本质上的改观,如果某些时候需要返回的数据大量产生呢,那我们是不是需要动态维护块的大小适应数据流,显然这是一件不简单的事情,又需要额外的成本。
解决方案四:Applet和Adobe Flash:
Java applet是一个“久远”的概念,它是一种内嵌在浏览器中的Java 小程序和Flash一样,它们不再使用HTTP,而是TCP套接字来实现全双工的通信,但是它们并不安全,没有构建什么安全协议,而写资源消耗,你懂的。。。尤其是移动互联网出现后很多移动端的浏览器并不支持它们,所以。。。
2. WekSocket简介:
说了这么多,你应当发现这是个难搞的问题啊,是的,HTTP的被动性本质使得在它的框架内我们无法实现真正的全双工通信。WebSocket正是解决这一问题的方案。
那么如何从一个HTTP连接机制中的通信环境发起一个WebSocket连接呢?
利用HTTP/1.1的升级特性迁移到WebSocket协议:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com
Connection: Upgrade表示我要升级成其他的协议;
Upgrade:websocket表示我要升级层websocket,服务器你支持不?
Sec-WebSocket-Key,客户端随机生成的,服务器之后会拿这个进行加密作为标识之一。
Sec-WebSocket-Protocol:区分同一URL下需要使用的不同协议。
Sec-WebSokcet-Version:版本号。
服务器会返回:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat
Sec-WebSocket-Accept:看我加密了你发来的随机码,你认识我了吧!
Sec-WebSocket-Protocol:我支持chat协议;
注意,HTTP的状态码是101, Switch Protocols,ok,到这里,我们从原来的HTTP协议已经建立了
一个持久的、全双工的TCP套接字协议,
PS:有些时候,服务器的特定资源只接受HTTP升级请求,这时如果请求为发起升级,会返回426 Upgrade Required;如果客户端不支持就返回400咯。
那WebSocket到底有什么好处呢?
在握手之后,信道已经建立起来了,ws和wss分别对应与http和https(看我们已经可以使用新的协议了)。
这时已经是一个全双工,持久的信道了,它有这些优点:
(1)使用HTTP来进行握手,可以很方便的继承与浏览器和HTTP服务器中;
(2)ws和wss与http和https一样同样对应与80和443端口,一般防火墙是不会阻止的;
(3)心跳机制,一般用来保持连接的,两边都不说话时,也通过心跳包来保持连接~;
(4)不再收到2个连接的限制了;
(5)可以安全进行跨域连接了,HTTP的origin会限制ajax和XMLHttpRequest跨域;
值得一提的是使用webSocket提供高清视频流更为强大;
3. Java中的WebSocket:
当然使用H5中的js的webSocket api是浏览器的方法,这里为了演示我就只介绍Java的WebSocket API了,同样分为客户端和服务端两部分。
maven依赖:
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
这里使用的是provided,显然和其他J2EE规范一样我们在开发时也只需要使用api面向接口开发,tomcat8.0中提供了websocket的实现。
3.1 WebSocket API:
客户端API:
有几个关键的接口:
ContainerProvider:这是服务提供者接口,不同的实现者可以通过实现该接口来构建服务(webSocket)的提供者,通过提供者注册接口来提供具体的服务,如果你不清楚这种模式可以参看《Effective Java》中的第一条;
WebSocketContainer:这就我们需要的webSocket“服务”了,通过ContainerProvider.getContainer获得,通过他的connectToServer方法可以返回Session;
Session:使用session我们一发送数据,关闭会话;
RemoteEndPoint:我的理解是一个回调接口,它代表的实例由WebContainer管理,就像Servlet和Servlet容器一样;
服务器API:
serverContainer继承了WebSocketContainer,你可以通过ServletContext.getArribute("javax.websocket.ServerContainer")来获取它,但你并不需要这么做,你只需要在POJO上添加一个@ServerEndpoint,webSocket会扫面它创建实例,默认是每一个连接都会创建一个EndPoint实例,理解这一点很重要,因为在和Spring的自动注入结合后会产生问题(如果你不是用Spring WebSocket配置的话)。
EndPoint有四个回调方法:@OnOpen,@OnMessage,@OnClose,@OnError,你可以通过注解在POJO中标注。
4. 基于Spring webSocket开发消息系统:
4.1 服务端实现:
@ServerEndpoint(value = "/message/{userId}", configurator = SpringConfigurator.class)
public class MessageServer implements DisposableBean {
private static Map<Long, Session> userSessions = new ConcurrentHashMap<>();
private static ObjectMapper mapper = new ObjectMapper();
private static Logger logger = LogManager.getLogger();
@Inject
private MessageService messageService;
@Override
public void destroy() throws Exception {
userSessions.clear();
userSessions = null;
}
/**
* Get unread messages by userId when session opened.
*
* @param session session get from WebSocketContain.connectToServer
* @param userId user's ID
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") long userId) {
logger.debug("open session: userId:" + userId + " sessionId" + session.getId());
List<BMessageEntity> unReadMessages =
this.messageService.getMessageByIsRead(userId, BMessageState.UNREAD);
if(unReadMessages.size() > 0) {
this.sendJsonMessage(session, userId, unReadMessages);
}
userSessions.put(userId, session);
}
/**
* client should send new BMessageEntity to this server, and opMessage() save data to dataBase
*
* @param session session get from WebSocketContain.connectToServer
* @param byteBuffer accept messages
*/
@OnMessage
public void onMessage(Session session, ByteBuffer byteBuffer,
@PathParam("userId") long userId) {
CharBuffer charBuffer = Charset.forName("utf8").decode(byteBuffer);
String message = charBuffer.toString();
try {
MessageJson messageJson = mapper.readValue(message, MessageJson.class);
long replyUserId = messageJson.getReplyUserId();
//save new message
BMessageEntity messageEntity = this.messageService.saveMessage(userId, replyUserId, messageJson.getMessage());
if(messageEntity != null) {
//send
Session replySession = userSessions.get(replyUserId);
if(replySession != null) {
this.sendJsonMessage(replySession, userId, messageEntity);
}
}
} catch (IOException e) {
logger.error(e);
}
}
@OnError
public void onError(Session session, Throwable e) {
logger.error("sessionId:" + session.getId() + " " + e);
}
@OnClose
public void onClose(Session session, @PathParam("userId") long userId) {
Session session1 = userSessions.get(userId);
if(session1 != null) {
try {
if(session1.isOpen())
session1.close();
} catch (IOException e) {
logger.error(e);
} finally {
userSessions.remove(userId);
}
}
}
private void sendJsonMessage(Session session, long userId, Object object) {
try {
session.getBasicRemote()
.sendText(MessageServer.mapper.writeValueAsString(object));
} catch (IOException e) {
this.handleException(e, userId);
}
}
private void handleException(Throwable throwable, long userId) {
try(Session session = userSessions.get(userId)) {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.toString()));
} catch (IOException e) {
logger.error(e);
} finally {
userSessions.remove(userId);
}
}
/**
* custom configurator, TODO collect some require, then use this configurator
*/
public static class EndpointConfigurator extends SpringConfigurator {
}
}
首先,使用了configurator = SpringConfigurator.class,SpringConfigurator取代了默认的Configurator,它将使得我们可以正确使用Spring进行实例化和注入,如果没有这个设置,你会得到NullPointException,因为它是有WebSocket实现实例化,它并不会帮我们注入哦,这也是上面提到的问题。
@ServerEndpoint(value = "/message/{userId}", configurator = SpringConfigurator.class)
定义一个HashMap存放session,用来向指定的user发送消息,ObjectMapper是用来转换json格式的。
private static Map<Long, Session> userSessions = new ConcurrentHashMap<>();
private static ObjectMapper mapper = new ObjectMapper();
打开连接是,我从数据库中取出未读消息,返回给客户端,并将session存到map中:
@OnOpen
public void onOpen(Session session, @PathParam("userId") long userId) {
logger.debug("open session: userId:" + userId + " sessionId" + session.getId());
List<BMessageEntity> unReadMessages =
this.messageService.getMessageByIsRead(userId, BMessageState.UNREAD);
if(unReadMessages.size() > 0) {
this.sendJsonMessage(session, userId, unReadMessages);
}
userSessions.put(userId, session);
}
接受到消息后,保存消息到数据库,然后在map中查找对方的session,如果存在的话(对方有可能不在线哦),将这条消息发送给他。
@OnMessage
public void onMessage(Session session, ByteBuffer byteBuffer,
@PathParam("userId") long userId) {
CharBuffer charBuffer = Charset.forName("utf8").decode(byteBuffer);
String message = charBuffer.toString();
try {
MessageJson messageJson = mapper.readValue(message, MessageJson.class);
long replyUserId = messageJson.getReplyUserId();
//save new message
BMessageEntity messageEntity = this.messageService.saveMessage(userId, replyUserId, messageJson.getMessage());
if(messageEntity != null) {
//send
Session replySession = userSessions.get(replyUserId);
if(replySession != null) {
this.sendJsonMessage(replySession, userId, messageEntity);
}
}
} catch (IOException e) {
logger.error(e);
}
}
PS:这时,还是有WebSocket实现来创建它的,依然是一个连接一个实例,它也并不在Spring的单例bean注册表中,因此我们也无法将它注入到其他bean中,但是你可以在@Configuration配置中添加一个单例:
@Bean
public MessageServer messageServer() {
return new MessageServer();
}
这样也很省资源呢,但是注意线程安全。
4.2 客户端实现:
我这里使用了两个Servlet实例来模拟两个客户端,实现@ClientEndpoint,注意@ClientEndpoint并不会有websocket实例化,因此我们可以放心的在servlet上直接添加:
@ClientEndpoint
public class ClientServlet extends HttpServlet {
private static Logger logger = LogManager.getLogger();
private static ObjectMapper mapper = new ObjectMapper();
private Session session;
private long userId;
private static ObjectMapper objectMapper = new ObjectMapper();
@Override
public void init() throws ServletException {
userId = Long.valueOf(this.getInitParameter("userId"));
String path = this.getServletContext().getContextPath() + "/message/" +
userId;
logger.debug(path);
try {
URI uri = new URI("ws", "localhost:8080", path, null, null);
this.session = ContainerProvider.getWebSocketContainer()
.connectToServer(this, uri);
logger.debug(session.getId());
} catch (IOException | URISyntaxException | DeploymentException e) {
throw new ServletException("Cannot connect to " + path + "." + e);
}
}
@Override
public void destroy() {
try {
this.session.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
MessageJson messageJson = new MessageJson();
messageJson.setMessage(req.getParameter("message"));
messageJson.setReplyUserId(Long.valueOf(req.getParameter("replyUserId")));
try(OutputStream outputStream = this.session.getBasicRemote().getSendStream()) {
mapper.writeValue(outputStream, messageJson);
outputStream.flush();
}
resp.getWriter().append("OK");
}
@OnMessage
public void onMessage(String message) {
System.out.println(message);
}
@OnClose
public void onClose(CloseReason reason) {
CloseReason.CloseCode code = reason.getCloseCode();
}
}
在部署描述符中,部署两个实例,它们分别有初始化参数,userId,代表两个用户~:
<servlet>
<servlet-name>Client1</servlet-name>
<servlet-class>com.yjh.cg.site.server.ClientServlet</servlet-class>
<init-param>
<param-name>userId</param-name>
<param-value>1</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>Client1</servlet-name>
<url-pattern>/client1</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>Client2</servlet-name>
<servlet-class>com.yjh.cg.site.server.ClientServlet</servlet-class>
<init-param>
<param-name>userId</param-name>
<param-value>2</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>Client2</servlet-name>
<url-pattern>/client2</url-pattern>
</servlet-mapping>
在servlet初始化的时候,我们向服务器请求建立连接,获取session对象:
<span style="white-space:pre"> </span> URI uri = new URI("ws", "localhost:8080", path, null, null);
this.session = ContainerProvider.getWebSocketContainer()
.connectToServer(this, uri);
在doGet方法中我将http请求“包装”成websocket消息发送到服务器,当然这只是为了模拟:
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
MessageJson messageJson = new MessageJson();
messageJson.setMessage(req.getParameter("message"));
messageJson.setReplyUserId(Long.valueOf(req.getParameter("replyUserId")));
try(OutputStream outputStream = this.session.getBasicRemote().getSendStream()) {
mapper.writeValue(outputStream, messageJson);
outputStream.flush();
}
resp.getWriter().append("OK");
}
5. 模拟:
user2发送消息给user1:
http://localhost:8080/client2?message=我是谁&replyUserId=2
user1收到消息:我是谁
user1回复:Zerohuan
http://localhost:8080/client1
?message=Zerohuan
&replyUserId=1
好了,控制台的log我就不一一贴了,碎觉去了。。。
|