跳到主要内容

WebSocket模块

服务端推送Web方案

短轮询(Short Polling)

实现原理:短轮询是一种定期向服务器发送请求的方式。客户端周期性地向服务器发送HTTP请求,服务器接收到请求后立即返回数据,无论数据是否发生变化。

优点:1.实现简单,适用于任何支持HTTP的环境。2. 服务器无状态,处理每个请求都独立。

缺点:1.不高效,频繁的HTTP请求会增加网络和服务器负载。2.实时性差,数据更新的延迟取决于轮询的频率。

适用场景:1.数据更新频率低且对实时性要求不高的应用。2.简单的状态检查或不需要频繁更新的后台任务。

长轮询(Long Polling)

实现原理:长轮询类似于短轮询,但区别在于服务器在接收到请求后,如果没有新数据则保持连接一段时间(通常是几秒或几十秒),直到有新数据或超时才返回响应。客户端收到响应后立即发起新的请求,形成一个持续的循环。

优点:1.较短轮询更高效,减少了无效的请求。2.提高了数据更新的实时性。

缺点:1.实现较短轮询复杂,服务器需要保持连接状态。2.长时间保持连接可能会增加服务器的负载。

适用场景:1.对实时性要求较高但无法使用WebSocket的应用。2.需要在数据变化时立即通知客户端的场景,例如聊天应用或通知系统。

WebSocket

实现原理:WebSocket是HTML5引入的一种持久化双向通信协议。客户端和服务器只需进行一次握手,然后就可以在单个TCP连接上进行双向通信。服务器可以主动向客户端推送消息,客户端也可以随时发送请求。

优点:1.实时性好,双向通信支持即时数据更新。2.高效,减少了频繁的HTTP请求和响应开销。

缺点:1.实现复杂,需要支持WebSocket协议的服务器和客户端。2.长时间保持连接会增加服务器资源消耗,需要管理好连接的生命周期。

适用场景:1.需要高实时性的数据更新的应用,如在线游戏、实时聊天、股票行情。2.双向通信需求的场景,例如实时协作工具、物联网设备通信。

对比

特性短轮询长轮询WebSocket
实时性较好最好
服务器资源消耗低(单连接管理复杂)
实现复杂度
网络带宽消耗较低
适用场景数据更新频率低的场景中等实时性要求的场景高实时性、双向通信需求的场景

总结

  • 短轮询适合简单的状态检查或低频数据更新,不适用于高实时性需求的场景。
  • 长轮询适用于中等实时性需求的场景,可以在不能使用WebSocket时作为替代方案。
  • WebSocket是高实时性和双向通信的最佳选择,适用于在线游戏、实时聊天和实时协作等场景。

WebSocket实现方案

1.tomcat实现websocket

https://juejin.cn/post/7095918534210879519

2.Netty实现Websocket

https://xxgblog.com/2021/04/14/netty-websocket/

选用Netty的原因:

  1. NIO(new I/O)基于事件驱动的多路复用框架,新版tomcat也支持了,不是重点
  2. 丰富的组件和功能

WebSocket升级过程

握手阶段(handshake)

在客户端和服务器建立 WebSocket 连接之前,客户端首先要发送一个 HTTP 协议的握手请求:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

其中请求头 Connection: UpgradeUpgrade: websocket 表示客户端想要升级协议为 WebSocket。服务器进行如下响应完成握手:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

完成握手后,接下来就是双向的数据传输的过程。

数据传输阶段( data transfer )

数据传输阶段传输的内容以帧( frame )为单位,其中分为控制帧(Control Frame)和数据帧(Data Frame):

  • 控制帧(Control Frame):包括 ClosePingPong 帧,Close 用于关闭 WebSocket 连接,PingPong 用于心跳检测
  • 数据帧(Data Frame):包括 TextBinary 帧,分别用于传输文本和二进制数据

Netty实现WebSocket

详细的代码如下:

@Slf4j
@Configuration
public class NettyWebSocketServer {
public static final int WEB_SOCKET_PORT = 8090;
public static final NettyWebSocketServerHandler NETTY_WEB_SOCKET_SERVER_HANDLER = new NettyWebSocketServerHandler();
// 创建线程池执行器
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());

/**
* 启动 ws server springboot 启动时自动执行
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}

/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}

public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// todo 测试不开 30秒客户端没有向服务器发送心跳则关闭连接
// pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
// 保存用户ip
// pipeline.addLast(new HttpHeadersHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
pipeline.addLast(new MyHeaderollectHandler());
// 自定义handler ,处理业务逻辑
pipeline.addLast(NETTY_WEB_SOCKET_SERVER_HANDLER);
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}
}

代码解释

  1. 生命周期管理:使用Spring的@PostConstruct@PreDestroy注解在Bean初始化和销毁时启动和关闭Netty服务器。
  2. NettyWebSocketServerHandler:这是我们自己写的Websocket处理器,后面再说
  3. bossGroup和workerGroup:这种分工模式可以提高服务器的吞吐量和性能。bossGroup 负责接受新的连接请求,而 workerGroup 则专注于处理已经建立的连接,避免了资源的浪费和竞争。具体表现如下:
  • bossGroup :负责接受新的客户端连接请求。当客户端发起连接时,这些连接首先会被 bossGroup 处理。bossGroup 会将接受的新连接注册到 workerGroup 中进行后续的处理。bossGroup 通常只需要很少的线程(例如 1 个)就可以胜任,因为它的工作相对比较简单。

  • workerGroup(Worker Thread Group):负责处理已经被 bossGroup 接受的客户端连接请求。它负责读写数据、执行业务逻辑等操作。workerGroup 通常需要更多的线程,以提高并发处理能力。线程数通常设置为 CPU 核心数。

  1. run() 方法是实际启动 WebSocket 服务器的逻辑,主要包括:

    • 创建 ServerBootstrap 对象,并设置 boss 和 worker 线程组、socket channel 类型、backlog 大小、是否保持 TCP 连接活跃等。

    • 为 boss 线程组添加日志处理器。

    • 为每个连接的 socket channel 添加一系列处理器:

      • IdleStateHandler:心跳处理器

      • HttpServerCodec: HTTP 编解码器
      • ChunkedWriteHandler: 以块方式写数据
      • HttpObjectAggregator: 将 HTTP 消息进行聚合
      • WebSocketServerProtocolHandler: WebSocket 协议处理器,负责 HTTP 协议升级为 WebSocket 协议
      • MyHeaderollectHandler: 自定义的处理器,用于处理业务逻辑
      • NettyWebSocketServerHandler: 自定义的 WebSocket 服务器处理器

这里需要为每一个都new对应的处理器,因为这些channel是有状态的,而NETTY_WEB_SOCKET_SERVER_HANDLER是无状态的,全局共用即可。

.channel(NioServerSocketChannel.class):NioServerSocketChannel 就是基于 Java 的 java.nio.channels.ServerSocketChannel 实现的 Netty 中的服务器 Channel。它可以高效地接受和处理大量的客户端连接请求。这一行代码就是告诉 Netty 服务器使用基于 NIO 的 NioServerSocketChannel 作为底层的网络传输实现

.option(ChannelOption.SO_BACKLOG, 128):这个选项设置了服务器 Socket 的 backlog 队列大小。 backlog 队列用于存放已经完成三次握手但未被 accept 的连接请求。当新的连接请求到来时,如果 backlog 队列已满,后续的连接请求将被拒绝。设置 backlog 队列大小为 128 是一个比较常见的值,表示最多可以有 128 个连接请求在等待被 accept。

.option(ChannelOption.SO_KEEPALIVE, true):这个选项开启了 TCP 的 keep-alive 机制。keep-alive 机制可以检测客户端连接是否处于活跃状态。当客户端长时间没有发送数据时,服务器会主动向客户端发送 keep-alive 探测包,如果客户端没有响应则会认为连接已经断开。开启 keep-alive 可以帮助服务器及时发现和处理已经断开的连接,避免资源浪费。

Netty自定义处理器

具体的代码如下

@Slf4j
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


private WebSocketService webSocketService;

/**
* 建立连接,需要在这里保存channel
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
webSocketService = SpringUtil.getBean(WebSocketService.class);
webSocketService.connect(ctx.channel());
super.channelActive(ctx);
}

/**
* 处理消息
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
String text = textWebSocketFrame.text();
log.info("接收到消息: {}", text);
WSBaseReq request = JSONUtil.toBean(text, WSBaseReq.class);
switch (WSReqTypeEnum.of(request.getType())) {
case LOGIN:
webSocketService.handleLoginRequest(channelHandlerContext.channel());
break;
case AUTHORIZE:
log.info("登录认证");
webSocketService.authorize(channelHandlerContext.channel(), request.getData());
break;
case HEARTBEAT:
log.info("心跳包");
break;
default:
log.info("未知请求类型");
break;
}
}

/**
* 握手
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
log.info("握手成功");
String token = NettyUtils.getAttr(ctx.channel(), NettyUtils.TOKEN);
if (StrUtil.isNotBlank(token)) {
webSocketService.authorize(ctx.channel(), token);
}
} else if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("读空闲");
//用户下线
userOffline(ctx.channel());
}
}
}

/**
* 用户下线统一处理
*/
private void userOffline(Channel channel) {
webSocketService.remove(channel);
channel.close();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
userOffline(ctx.channel());
}
}

详细解释

  1. 类上面加了@Sharable:解释:

    @Sharable 注解在 Netty 中有一个特殊的作用。在 Netty 中,ChannelHandler 是负责处理网络事件和实现业务逻辑的组件。当一个 ChannelHandler 被添加到 ChannelPipeline 中时,Netty 会为该 ChannelHandler 创建一个实例。

    • 通常情况下,每个 Channel 都会有一个独立的 ChannelPipeline,因此每个 ChannelHandler 也会有一个独立的实例。这种做法可以确保 ChannelHandler 是线程安全的,因为每个 Channel 都有自己的 ChannelHandler 实例。

    • 但是,如果一个 ChannelHandler 是无状态的,也就是说它不保存任何与特定 Channel 相关的数据,那么就可以将其标记为 @Sharable。这样,Netty 就会为这个 ChannelHandler 创建一个单例实例,并在所有 Channel 的 ChannelPipeline 中共享使用。

    使用 @Sharable 注解的好处有:

    1. 减少内存占用:由于 ChannelHandler 实例是共享的,因此可以减少内存的使用。

    2. 提高性能:共享 ChannelHandler 实例可以减少创建和销毁实例的开销,从而提高性能。

    3. 简化代码:当 ChannelHandler 是无状态的时候,可以将其标记为 @Sharable,这样就不需要在每个 Channel 中单独创建实例。

    在上面的代码中,NettyWebSocketServerHandler 被标记为 @Sharable。这意味着 Netty 会为这个 ChannelHandler 创建一个单例实例,并在所有 Channel 的 ChannelPipeline 中共享使用。这可以减少内存占用和提高性能,同时也简化了代码的编写。

  2. channelRead0函数:

    处理客户端发送的 WebSocket 文本帧消息。接收到消息后,将其转换为 WSBaseReq 对象。根据请求类型(WSReqTypeEnum)进行不同的处理,包括登录、授权、心跳等。详细见下文【前后端交互设计】

  3. userEventTriggered函数:

    这个方法主要用于处理一些与连接生命周期相关的事件,比如 WebSocket 握手完成、读写空闲等。

    1. WebSocket 握手完成事件:
      • 当 WebSocket 握手完成时,会触发 WebSocketServerProtocolHandler.HandshakeComplete 事件。
      • 在这个事件处理中,如果客户端携带了 token 信息,则进行授权操作。
    2. 读空闲事件:
      • 当客户端长时间没有向服务器发送任何数据时,会触发 IdleStateEvent 事件,也就是心跳检测。
      • 在这个事件处理中,判断是否为读空闲事件(即客户端长时间没有发送数据)。
      • 如果是读空闲,则执行用户下线的统一处理逻辑(userOffline)。

前后端交互设计

建立Websocket连接之后,前端向channel发送消息的格式为json:

{
"type":1,
"data":xxx
}

后端会根据type的类型作出相应的处理,主要有下面三种type:

LOGIN(1, "请求登录二维码"),
HEARTBEAT(2, "心跳包"),
AUTHORIZE(3, "登录认证"),

后端接收到请求后,也会发送给前端同样的json格式,主要为下面11种type:

LOGIN_URL(1, "登录二维码返回", WSLoginUrl.class),
LOGIN_SCAN_SUCCESS(2, "用户扫描成功等待授权", null),
LOGIN_SUCCESS(3, "用户登录成功返回用户信息", WSLoginSuccess.class),
MESSAGE(4, "新消息", WSMessage.class),
ONLINE_OFFLINE_NOTIFY(5, "上下线通知", WSOnlineOfflineNotify.class),
INVALIDATE_TOKEN(6, "使前端的token失效,意味着前端需要重新登录", null),
BLACK(7, "拉黑用户", WSBlack.class),
MARK(8, "消息标记", WSMsgMark.class),
RECALL(9, "消息撤回", WSMsgRecall.class),
APPLY(10, "好友申请", WSFriendApply.class),
MEMBER_CHANGE(11, "成员变动", WSMemberChange.class),

Netty心跳检测

代码

pipeline.addLast(new IdleStateHandler(30, 0, 0));

解释

在这段代码中,pipeline.addLast(new IdleStateHandler(30, 0, 0)) 的作用是在 Netty 的 ChannelPipeline 中添加一个 IdleStateHandler

IdleStateHandler 是 Netty 提供的一个用于检测连接空闲状态的处理器,它可以检测以下三种类型的空闲状态:

  1. 读空闲 (READER_IDLE):在指定的时间内,Channel 没有读取到任何数据。
  2. 写空闲 (WRITER_IDLE):在指定的时间内,Channel 没有写出任何数据。
  3. 全局空闲 (ALL_IDLE):在指定的时间内,Channel 既没有读取到数据,也没有写出数据。

在这个例子中,构造 IdleStateHandler 的参数分别是:

  • 读空闲时间: 30 秒
  • 写空闲时间: 0 秒
  • 全局空闲时间: 0 秒

这意味着,如果客户端在 30 秒内没有向服务器发送任何数据,就会触发读空闲事件。而写空闲和全局空闲事件在这里并没有被使用。

当读空闲事件被触发时,会调用 userEventTriggered 方法,在那里执行用户下线的逻辑。这样可以及时发现和处理那些已经断开连接但没有主动通知服务器的客户端,避免资源浪费。