前言

Websocket是一种在TCP连接上进行全双工网络通信的协议,之所以在有了Http协议之后仍旧诞生了Websocket的原因在于Http的缺陷:通信只能由客户端发起。在Websocket出现之前,为了在Web C/S架构上实现服务端向客户端推送消息,大都是靠Ajax进行轮询来“实现”推送,而因需要不停地打开连接以及Http较长的头部信息,在不断请求的过程中这将产生资源的浪费。Websocket因此应运而生。

由于目前可搜得的资料各式各样大相径庭(因为有的是Java原生API而有的是基于STOMP的应用示例),本文将着重介绍Spring环境下(SpringBoot为例)的Websocket应用。

Websocket基本概念

基本概念原理这里就不细讲了,一查一大把,推荐大佬的这篇博客(偏代码实践一些)以及知乎的这篇高赞回答(偏寓教于乐一些)。需要重点说明的是:

  1. Websocket作为标准的通信协议在Web C/S使用时,需要浏览器和Web服务容器的支持
  2. Websocket依靠Http来完成握手,握手完成后才是完全走Websocket协议
  3. 资源描述符的前缀为ws,加密通道传输则为wss,例如ws://example.com:80/some/path
  4. Websocket没有规定发送内容的格式,支持文本、二进制

Http握手

握手比较重要因此单独拿出来说一下,握手的具体细节可以参考上面推荐的文章以及百度,这里细说下为什么要使用Http来进行握手而不是完全独立采用自有协议,个人认为这一点主要原因有:

  1. Websocket主要还是作为Http的一种补充,与Http紧密结合是合情合理的,并且能够较好地融入Http生态
  2. 提供了良好的兼容性处理,可以通过Http来获取兼容性支持反馈以及使用Http来在不支持websocket的客户端上模拟兼容Websocket

SockJS

SockJS是一个JavaScript库,主要用于应对浏览器缺失websocket支持的情况。它提供了连贯的、跨浏览器的JavaScript API,它首先尝试使用原生Websocket,在失败时能够使用各种浏览器特定的传输协议来模拟Websocket的行为。

Java规范

Java发布提供了Websocket的标准API接口JSR-356,作为Java EE7标准的一部分。大部分标准的Java web容器都已经实现了对Websocket的支持,同时也是兼容这个标准接口的,例如Tomcat 7.0.47+, Jetty 9.1+, GlassFish 4.1+, WebLogic 12.1.3+, Undertow 1.0+ (WildFly 8.0+)等。

何时使用Websocket

这里主要参考Spring文档中的叙述:

Websocket虽然可以使网页变得动态以及更加有交互性,但是在很多情况下Ajax结合Http Streaming或者长轮询可以提供简单高效的解决方案。例如新闻、邮件、社交订阅等需要动态更新,但是在这些情景下每隔几分钟更新一次是完全没有问题的。而另一方面,协作、游戏以及金融应用则需要更加接近实时更新。注意延迟本身并不是决定性因素,如果信息量相对较少(例如监控网络故障),Http Streaming或轮询同样也可以高效地解决。低延迟、高频率以及高信息量的组合情况下,Websocket才是最佳选择。

STOMP协议

这里把STOMP协议提到前面一点的位置,也可以选择先看下面的内容,到介绍STOMP应用时再看这部分。

STOMP是一个简单的互操作协议,它被设计为常用消息传递模式的最小子集,定义了一种基于文本的简单异步消息协议,它最初是为脚本语言(如 Ruby、 Python 和 Perl)创建的,用于连接企业消息代理。STOMP已经广泛使用了好几年,并且得到了很多客户端(如stomp.js、Gozirra、stomp.py、stompngo等)、消息代理端(如ActiveMQ、RabbitMQ等)工具库的支持,目前最新的协议版本为1.2。

STOMP是一种基于’Frame’的协议,Frame基于Http建模,每个Frame由一个命令(Command)、一组头部(Headers)和可选的正文(Body)组成,如下是一个STOMP frame的基本结构示例:

1
2
3
4
5
COMMAND
header1:value1
header2:value2

Body^@

可以看到STOMP本身的结构是非常简单明了的。STOMP同样有客户端和服务端的概念,服务端被认为是可以接收和发送消息的一组目的地;而客户端则是用户代理,可以进行两种操作:发送消息(SEND)、发送订阅(SUBSCRIBE),为此,STOMP的命令有如下几种。

客户端命令:

  • CONNECT:用于初始化信息流或TCP连接,是客户端第一个需要发送的命令
  • SEND:表示向目的地发送消息,必须要包含一个名为destination的头部
  • SUBSCRIBE:用于注册监听一个目的地,必须包含一个名为destination的头部
  • BEGIN:用于启动事务,必须包含一个名为transaction的头部
  • COMMIT:用于提交事务,必须包含一个名为transaction的头部
  • ABORT:用于回滚事务,必须包含一个名为transaction的头部
  • DISCONNECT:告知服务端关闭连接

服务端命令:

  • CONNECTED:服务器响应客户的段的CONNECT请求,表示连接成功
  • MESSAGE:用于将订阅的消息发送给客户端,头部destination的值应与SEND frame中的相同,且必须包含一个名为message-id的头部用于唯一标识这个消息
  • RECIPT:收据,表示服务器成功处理了一个客户端要求返回收据的消息,必须包含头部message-id表明是哪个消息的收据
  • ERROR:出现异常时,服务端可能会发送该命令,通常在发送ERROR后将关闭连接

可以说STOMP主要就是提供了发送消息、订阅消息的语义,同时还能够支持事务的处理。

Spring应用集成

与其他提供相对单一的最佳实践的Spring整合不同,在Spring中应用Websocket可以有多种形式,且大相径庭,这也造成了初步查阅资料时的困惑,为何两篇文章中的示例会看起来完全不一样。

这里我们选用Springboot(2.x)以及其默认使用的tomcat来作为基础进行各种形式的示例演示说明,其中几个示例没有前端,可以使用在线测试工具来试验。使用Maven搭建项目,在最复杂的应用形式下,POM的依赖将包含如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<dependencies>

<!-- websocket基础,主要引入了spring-websocket和spring-messaging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<!-- web基础 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- security -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

<!-- message security -->
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-messaging</artifactId>
</dependency>

<!-- lombok,可选 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- test,可选 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- 以下为前端库的支持 -->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator-core</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.1-1</version>
</dependency>

</dependencies>

原生Java API

这种情况只利用Spring的基础能力以及辅助工具类,直接以标准的java规范接口(javax.websocket包下的内容)来编写websocket应用,在依赖上仅需要spring-boot-starter-websocket。让程序能够处理websocket请求仅需要编写两个额外的类即可。

其一是配置类,注入一个类型为ServerEndpointExporter的工具Bean,它的作用是扫描注解了@ServerEndpoint这个注解的类并且将其自动注册到Websocket容器以及探测ServerEndpointConfig的配置Bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
// @EnableWebSocket // no need
public class OriginWebSocketConfig {

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

其二是消息处理入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@ServerEndpoint("/myWs") // 对应websocket连接、发送消息的地址
public class OriginWSEndpoint {

// 简单存储客户端对应的session
private ConcurrentHashMap<String, Session> userMap = new ConcurrentHashMap<>();

// 简单的推送消息
public void sendMessage(String username, String message) throws IOException {
Session session = userMap.get(username);
if (null != session) {
session.getBasicRemote().sendText(message);
log.info("消息发送成功");
} else {
log.info("未找到客户Session");
}
}

@OnOpen
public void onOpen(Session session, EndpointConfig config) {
log.info("客户端接入:{}", session);

// you can define your own logic
String username = session.getId();
userMap.put(username, session);

// something more
// session.addMessageHandler( ... );
}

@OnMessage
public void onMessage(Session session, String message) {
log.info("从{}接收到一条消息({})", session, message);

// message handling ...
}

@OnClose
public void onClose(Session session, CloseReason closeReason) {
log.info("{}连接关闭。关闭原因:({})", session, closeReason);

String username = session.getId();
userMap.remove(username);
}

@OnError
public void onError(Session session, Throwable throwable) {
log.info("{}发生异常", session, throwable);

// error handling, maybe close session ...
}

}

这样就完成了一个最基本的处理流程,当然原生api不仅限于此,更多的原生api处理本篇暂不做讨论。

单独使用Spring Websocket

Spring-websocket对原生api进行了一些封装,添加了更多的特性支持,使得处理时更加便捷,提供了更加丰富的编程接口。首先添加一个配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.ws.controller.MyHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

@Configuration
@EnableWebSocket // 开启spring-websocket支持
public class PureWebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler") // 注册handler
.addInterceptors(new HttpSessionHandshakeInterceptor()) // 添加拦截器
.setAllowedOrigins("*") // 允许非同源访问
.withSockJS(); // 使用sockJS
}

@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}

}

详细说明如上的代码:

  • @EnableWebSocket

    典型的Spring Enable系列注解,查看注解的内容就是引入了一个配置类DelegatingWebSocketConfiguration,它干的事情并不多,主要就如下几点:

    1. 收集WebSocketHandlerRegistry的实现类Bean,并以此通过ServletWebSocketHandlerRegistry为Websocke创建一个HandlerMapping
    2. 注入一个在默认情况下为SockJS所使用的TaskScheduler
  • registry.addHandler

    类似Spring http的handlermapping,注册URL Path对应的处理器,处理器的基接口为WebSocketHandler

  • registry.addInterceptors

    很明显的,添加拦截器,不过这是对Websocket连接握手阶段的拦截,拦截器的基接口为HandshakeInterceptor,Spring提供了一些默认的拦截工具,如引入Http握手时的HttpSession、CsrfToken处理、同源处理等。

  • withSockJS

    提供SockJS的支持,值得注意的是虽然doc上写的是fallback,但这经实验验证并不是同时支持websocket连接和SockJS连接,例如前端直接使用Websocket api来连接使用了withSockJS的URL Path会发生报错,客户端和服务端应该保持一致,同时使用SockJS或者同时不用,或者服务端注册两个Handler分别处理SockJS和原始Websocket。

然后就是创建一个WebSocketHandler,Handler基本上对应了原生API中的ServerEndPoint:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;

public class MyHandler extends TextWebSocketHandler {

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("Receive a text message:"+ message.getPayload());

session.sendMessage(new TextMessage("What the Fuck ?"));
}
}

这里为了方便直接继承了TextWebSocketHandler,此外Spring还提供了BinaryWebSocketHandler用于应对二进制数据处理。对于实用性来说,还是应该直接实现WebSocketHandler接口或者继承AbstractWebSocketHandler,通常情况下需要在连接建立后进行额外的操作(afterConnectionEstablished)以及错误处理(handleTransportError)。为了主动推送消息往往需要提供获取对应Session的方法,比如将用户和对应的Session保存在Map(ConcurrentHashMap)中。

采用Spring的Websocket封装后另外一个比较大的不同是针对org.springframework.web.socket.WebSocketSession来进行操作,屏蔽了底层的差异(Websocket Session、SockJS等),同时能够携带额外的数据,包括Attribute(属性map)、Principal(身份)、HandshakeHeader等。

STOMP On Spring Websocket

不同于Http各种各样的头部、请求体等拥有各种规范标准,Websocket握手结束建立连接后并没有提供更加具体的交互协议,只是文本和二进制都支持,这对于一个非常简单的应用程序来说会显得比较低级、过于靠近底层。处于此原因Websocket RFC定义了子协议的使用,在握手阶段可以使用头部Sec-WebSocket-Protocol来就使用何种子协议达成一致,但这并不是必须的。Spring提供了以STOMP作为Websocket子协议的支持,选用STOMP的原因可能包括:

  1. Spring认为Websocket持续在同一个TCP连接上以事件为驱动异步传递消息的形式更加趋近于消息传递应用程序如JMS、AMQP,而STOMP属于消息传递协议
  2. STOMP的设计哲学是简洁性、互操作性,STOMP很轻巧同时应用广泛较为成熟,通用性比较强

启用STOMP后整个信息流就会复杂很多,但是也减少了很多额外的人工配置,从Spring文档的篇幅、提供的应用样例以及spring-boot-starter-websocket直接引入了spring-messaging模块(包含了STOMP等相关内容)等各种情况不难看出这种方式是spring主推的websocket解决方案。使用这种方式的优点有:

  1. 使得Spring能够提供更加丰富的编程模型
  2. 无需自定义消息交互协议和消息格式,无需手动管理Session
  3. 提供了许多现成的工具,如Spring框架中包含的STOMP Java客户端
  4. 能够使用消息队列来代为管理消息
  5. 与Spring Security集成良好
基础配置

演示此种模式时,参考了Spring官方的引导样例代码,它提供了前端的内容,在试验时可以选择直接pull它的代码,然后修改。同样首先需要一个配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue"); // 1
config.setApplicationDestinationPrefixes("/app"); // 2
config.setUserDestinationPrefix("/user"); // 默认就是'/user' 3
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/gs-guide-sockjs") // 4
.withSockJS(); // 5

registry.addEndpoint("/gs-guid-websocket");
}

}

可以看到使用的是@EnableWebSocketMessageBroker而不是@EnableWebsocket,同时实现的接口是WebSocketMessageBrokerConfigurer。在这种应用方式下,首先要注意Spring提供好了发送消息、订阅消息的处理模型,而不是简单的客户端和服务端互发消息,整体更加趋向于一个消息队列服务器(Message Broker)。客户端可以订阅某个广播地址(广播)或传输通道地址(单播),订阅之后将会接收发布在该地址上的消息,同时客户端也可以主动给某个地址发送消息。在该配置类中:

①配置用于传输消息的地址前缀,使得用户可以订阅地址/topic/xxx或者向/topic/xxx发送消息,用于端到端的情况,即客户端之间直接通信
②配置用于直接发送给服务器执行处理的地址前缀,这是为了应对直接发送请求给服务器的情况,此时对于发送到/app/xxx的消息将被路由到@Controller注解的类中注解了@MessageMapping的方法上,与@RequestMapping非常相似,注意发送到/app/abc时将由@MessageMapping("/abc")接收处理
③配置用户地址前缀,该模式下支持给特定的用户发送消息,为了方便处理这种情况,提供了用户地址前缀。在默认的/user前缀情况下,客户端可以订阅/user/queue/xxx表明监听一个只会发给自己消息的地址/queue/xxx(注意/queue的配置),在服务端可以发送到形如/user/{username}/queue/xxx的目的地或者通过调用SimpMessagingTemplateconvertAndSendToUser('{username}', '/queue/xxx')来给目标用户发送消息,spring将自动解析转化为用户会话唯一的目的地(如/queue/xxx-user {session-id}),保证与其他用户不冲突
④添加websocket握手http地址,该地址仅用于握手
⑤开启SockJS

服务端消息处理

不同于单独使用Spring Websocket,配置中并没有出现Handler相关的配置,因为语义发生了变化(包含了端到端、端到服务),同时Spring提供了以注解的方式来提供Handler的描述和定义。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Controller
public class StompController {

@Autowired
private SimpMessagingTemplate messagingTemplate;

// HelloMessage是一个仅包含name字段的pojo
@MessageMapping("/hello")
@SendTo("/topic/greetings")
public Greeting greeting(Message<HelloMessage> message) throws Exception {
System.out.println("Receive a Message with header: " + message.getHeaders());
Thread.sleep(1000); // simulated delay
return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getPayload().getName()) + "!");
}

@MessageMapping("/hehe")
@SendToUser("/queue/xxx")
public Message<String> toUser(HelloMessage message) throws Exception {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setUser(message::getName); // set user name
return MessageBuilder.createMessage("Somebody say hello to you", accessor.getMessageHeaders());
}

@MessageMapping("/haha")
public void useTemplate(HelloMessage message) throws Exception {
messagingTemplate.convertAndSendToUser(message.getName(),
"/queue/xxx", "Hello from messaging template");
}

}

与SpringMVC非常相近地用@Controller修饰class,这也使得可以在同一个Controller类中同时处理Http和websocket,只不过这里websocket用的是@MessageMapping。上面的三个方法,分别使用了@SendTo@SendToUserSimpMessagingTemplate来将消息发送至指定的目的地,由于我们指定了ApplicationDestinationPrefix为/app,因此客户端在向/app/hello/app/hehe/app/haha来发送消息时将分别被对应的方法执行处理,而不是以/app开头的目的地将不会被@MessageMapping捕捉处理。常用的相关注解和类总结如下:

名称位置描述
@MessageMappingclass / method地址映射注解,注解在类上时对类内所有方法产生影响,注意applicationDestinationPrefix的配置
@SubscribeMappingmethod类似于@MessageMapping,但是仅限于订阅消息并且返回值直接发送给clientOutboundChannel
@MessageExceptionHandlermethod类似于mvc的@ExceptionHandler,用于错误处理
Message-消息的基类,包含Header和Payload
MessageHeaderAccessor-正如其名称所述,用于读取、设置Header内容,常用的子类有SimpMessageHeaderAccessor和StompHeaderAccessor,包含了标准头部值的处理方法
@Headermethod argument用于快速将消息的头部值并将其赋值到方法入参上
@Headersmethod argument用户将所有头部信息赋值到入参,可以分配给java.util.Map
@DestinationVariablemethod argument从映射地址中获取模板变量,与mvc的@PathVariable相似
Pricipal-作为方法参数时,将赋值为发送消息的用户信息
@SendToclass / method指定将方法返回值发送到指定的目的地
@SendToUserclass / method指定将方法返回值发送到指定的用户目的地,用户信息将从Message头部读取
SimpMessagingTemplate-发送消息的工具类,默认已经配置好了一个Bean可以直接注入,@SendTo和@SendToUser本质上也是调用这个工具
用户认证

在原生Java API和单独使用Spring Websocket的场景中,通常都需要手动管理Session以及用户与Session的映射,而在STOMP模式下这些都由框架处理完成。在Http中判断一个连接属于哪个用户通常有两种方式:Cookie-Session或者Auth Token,websocket也是类似,不同的是由于websocket是异步双工通信,用户的确认需要在握手连接阶段完成。带认证的配置文件如下(包含多种认证方法,择一即可):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user"); // 默认就是/user
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/gs-guide-sockjs")
.setHandshakeHandler(handshakeHandler())
.withSockJS();

registry.addEndpoint("/gs-guide-websocket")
.setHandshakeHandler(handshakeHandler());
}

@Bean
public UserAuthHandshakeHandler handshakeHandler() {
return new UserAuthHandshakeHandler();
}

public static class UserAuthHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// return request.getPrincipal(); // 1 默认认证方式

// 2 握手阶段URL参数认证
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();
// /gs-guide-websocket?token=xxxx
String token = httpRequest.getParameter("token");

String user = ...; // extract from token
return () -> user;
}
}

// 3 添加channel拦截器,在STOMP CONNECT阶段认证
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("Activate the inbound interceptor now! Message header: " + message.getHeaders());
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String auth = accessor.getFirstNativeHeader("Authentication");
if (null != auth) {
String user = ... ; // extract from token
accessor.setUser(() -> user);
} else {
throw new IllegalStateException("token illegal");
}
}

return message;
}
});
}
}
  1. 握手阶段通过Http Session认证:这是默认支持的验证方法,直接通过Http握手时Http Session获取用户信息,获取的方式是HttpServletRequest#getUserPrincipal()
  2. 握手阶段通过URL请求参数认证:属于Token认证的一种方法,由于Websocket和SockJS均不支持在握手阶段添加额外的Http头部(理由是安全问题,参考SockJS的一个issue),那么只好通过请求参数来实现,将token信息附带在握手URL中
  3. 在STOMP子协议的CONNECT阶段进行认证,spring提供了在CONNECT时通过对message调拥setUser方法触发回调函数来执行认证,常用于Token认证

值得注意的是在子协议阶段认证无法阻止大量恶意连接,客户端可以只进行连接而不发送CONNECT消息。

消息流和原理分析

消息分发、路由通过spring-messaging这个由Spring Integration抽象发展而来的模块来处理,因此要理解它的运作机制必须了解该模块的内容,本博正好有一篇Spring Integration的介绍,可供参考。这里简单介绍一下其中的几个最基本的概念:

  • Message:包括消息头部和负载,是对消息的抽象封装
  • MessageHandler:消息处理器的封装,接收消息执行操作
  • MessageChannel:消息通道,用于传递消息,解耦生产者和消费者。在应用层面,MessageChannel包含了1个或多个MessageHandler,给channel发消息即是请求其中的Handlers处理该消息,不同的Channel有不同的处理方式,可能是同步调用也可能是异步处理

了解概念后来看一下整个消息流的过程(不包含集成外部消息队列的情况):

客户端消息的流入:虽然经过了层层封装但是源头依然是基于原生Java API从web容器获取请求,Spring的接入点为StandardWebSocketHandlerAdapter,该类继承了javax.websocket.Endpoint。数据被封装为WebSocketMessage的子类后被发送给SubProtocolWebSocketHandler,对于传入的消息SubProtocolWebSocketHandler将获取到消息对应的子协议处理器,即StompSubProtocolHandler,将WebSocketMessage交由其处理。StompSubProtocolHandler解析消息原始数据,并将其封装为org.springframework.messaging.Message(也可能解析为多条Message),同时设置对应的Header信息包括User、SessionId等等,封装完成后将消息发送给ClientInboundChannel。ClientInboundChannel默认情况下有三个Handler:WebSocketAnnotationMethodMessageHandler(负责处理@MessageMapping)、UserDestinationMessageHandler(负责解析转换用户地址)、SimpleBrokerMessageHandler(负责发送响应消息以及记录订阅状况)。其中UserDestinationMessageHandler仅负责地址转换,转换完成后会重新将消息发送到ClientInboundChannel,SimpleBrokerMessageHandler将消息发送给ClientOutboundChannel。

响应消息的流出:消息的统一出口是ClientOutboundChannel,其对应的Handler为SubProtocolWebSocketHandler,它保存了SessionId到Session的映射,根据Message头部的SessionId信息获取到对应的Session后交由StompSubProtocolHandler最终执行消息的发送。注意可以直接通过SimpMessageTemplate在其他环境中直接给用户推送消息。

集成Spring Security

Spring Security 4添加了对Spring websocket的支持,不过值得注意的是Spring Security并不提供对原生Java API以及单独使用Spring websocket的支持,原因是数据格式不明确,Spring Security对未知格式的数据能做的事情比较少。我们需要在依赖中引入spring-boot-starter-securityspring-security-messaging,添加完成后主配置文件更改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Configuration
@EnableWebSocketMessageBroker
// 继承Spring Security提供的基类
public class SecurityWebSocketConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/queue", "/topic");
config.setApplicationDestinationPrefixes("/app");

/**
* 保证对同一个session发送的消息有序,这会带来一定的性能开销
* @see org.springframework.messaging.simp.broker.OrderedMessageSender
*/
// config.setPreservePublishOrder(true);
}


/**
* 发起websocket连接时要使用http,在进行连接时的请求也会被security拦截,注意放行
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/gs-guide-websocket")
.setHandshakeHandler(handshakeHandler()) // 认证方式选择一种即可
.setAllowedOrigins("*");

registry.addEndpoint("/gs-guide-sockjs")
.setHandshakeHandler(handshakeHandler())
.setAllowedOrigins("*")
.withSockJS();

// 错误处理
// registry.setErrorHandler()
}

@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
// 允许任何CONNECT、DISCONNECT消息
.simpTypeMatchers(CONNECT, DISCONNECT).permitAll()
// 无地址消息处理
.nullDestMatcher().authenticated()
// 按照路径的处理
.simpDestMatchers("/app/**").hasRole("USER")
.simpDestMatchers("/topic/**").authenticated()
// 按照订阅路径的处理
.simpSubscribeDestMatchers("/topic/**", "/user/**").authenticated()
// 其他消息全部禁止
.anyMessage().denyAll();
}

// 是否允许非同源访问,默认为false不允许,会添加CSRF处理
@Override
protected boolean sameOriginDisabled() {
return true;
}

@Bean
public UserAuthHandshakeHandler handshakeHandler() {
return new UserAuthHandshakeHandler();
}

// 认证方式选择一种即可
public static class UserAuthHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {

// 握手阶段URL参数认证
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();
// /gs-guide-websocket?token=xxxx
String token = httpRequest.getParameter("token");

String user = ...; // extract from token

// 用户名、权限的处理
UsernamePasswordAuthenticationToken authenticationToken = new UsernamePasswordAuthenticationToken(
new UserPrincipal(user), null, Collections.singleton(new SimpleGrantedAuthority("ROLE_USER")));

// 需要返回Spring Security适用的Authentication子类
return authenticationToken;
}
}

}

此外还需要Spring Security自身的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

@Override
protected void configure(HttpSecurity http) throws Exception {
http.cors().and().csrf().disable()
.authorizeRequests()
// 处理websocket握手地址,token认证模式下需要直接放行
.antMatchers("/gs-guide-websocket", "/gs-guide-sockjs").permitAll()
// ... more configuration
}

@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("*.html", "*.css", "*.js");
}

}

配置过程与普通的Spring Security风格保持一致,还是非常方便的。同样的,基本原理也差不多,也是添加了Spring Security的拦截器(ChannelSecurityInterceptor),在Message进入时创建SecurityContext然后执行后续校验,详细情况可参考官方文档

小结

从原生Java API到单独使用Spring Websocket,最后是STOMP on Spring Websocket,是一个封装程度逐渐提高的过程,最后的代码外观天差地别,可见代码结构的威力,不得不佩服Spring的抽象能力。虽然STOMP on Spring Websocket是功能最齐全、语义最丰富的一种应用方式,也是Spring主推的解决方案,但是个人觉得稍微复杂了那么一些,而且有额外的基础知识门槛,如果只是为了实现服务端推送一些消息,并不一定需要这种应用方式。

在此情况下服务端和消息队列非常相似(集成外部消息队列工具直接推送到页面倒是不错,不过本文没有介绍这种情况),但是需要注意客户端发送过来的消息有可能被异步处理n次(最多的情况达到4次),每次处理都是线程池调用,也就意味着要被系统调度选中n次,那么响应的延时可能会比较高,调优也是集中在Channel的线程池配置上。

Janusgraph数据处理
Micrometer使用介绍