引言

服务器推送技术背景简介

服务器推送(Server Push)技术允许网站和应用在有新内容可用时主动向用户推送更新,而不需要用户主动去查询。与传统的”拉”模型不同,服务器推送采用”推”的方式主动把信息发给客户端。服务器推送的优点有两个:

用户体验更流畅。用户不需要一直去刷新页面来获取最新内容,系统会在有新的消息出现时自动推送给客户端。
更高效。服务器只在有真正有用的内容时才主动推送,节省了大量不必要的客户端请求。常见的服务器推送技术包括:

长轮询:客户端向服务器发起一个长时间的请求,一直保持打开,直到服务器有新内容推送。效率不高但兼容性好。
SSE(Server Sent Events):服务器可以在需要时一直向客户端推送事件,客户端只需要监听一个事件源。兼容性一般。
WebSocket:基于TCP的双向通信,服务器和客户端建立持久连接,允许双向实时消息传输。兼容性差但效率高。

Spring SseEmitter就是使用SSE技术实现服务器推送。与传统的Http长连接不同,它允许Spring服务能主动向浏览器推送消息。这可以显著提高用户体验。比如在聊天应用中,只有在有新消息时才主动推送,让用户感觉及时接收到信息。
SseEmitter 的功能和用途
SseEmitter 的主要功能就是允许服务器能主动将信息推送给浏览器客户端。它实现了服务器推送功能。 它的主要功能和用途有以下几个:

能主动向单个客户端推送消息。SseEmitter能匹配唯一的客户端请求,并与该客户端保持持久连接。通过此连接,服务器可以随时将事件推送给这个客户端。
能推送重复的消息。SseEmitter允许服务器不停发送相同的消息给客户端,形成一个连续的事件流。客户端只需要监听这个事件流即可。
支持延迟和定时推送。通过@Scheduled注解,服务器可以在指定时间推送指定延迟的事件。
支持推送不同类型的事件。客户端通过事件的名称能区分不同类型的事件,并作出不同的响应。
支持推送基本数据类型和POJO对象。服务器可以推送String、int等基本类型,也可以推送任意的Java对象。
能主动通知客户端关闭。通过调用complete()或error()方法,服务器可以主动告知客户端连接已关闭。
解耦服务器端和客户端。服务器端仅负责推送事件,与具体的客户端无关。

总的来说,SseEmitter的作用就是让服务器端能主动将信息推送给单个浏览器客户端,实现服务器推送的功能。它解耦了服务器端和客户端,给予服务器端主权主动推送事件的能力。这对实时通信、实时消息推送非常有用,能显著提高用户体验。

准备工作

引入maven依赖

SseEmitter 包含在 spring-webmvc 包中,如果是 spring boot 项目,确定已经引入了如下依赖即可

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

使用 SseEmitter

  • Controller 接口代码如下,先同步返回一个建立的 SseEmitter 连接给客户端,然后在异步线程中进行数据推送。为了防止串流以及后续支持客户端主动停止推流,每次请求携带唯一的客户端id。
@GetMapping(value = "test/{clientId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})  
@ApiOperation(value = " 建立连接")
public SseEmitter test(@PathVariable("clientId") @ApiParam("客户端 id") String clientId) {  
    final SseEmitter emitter = service.getConn(clientId);  
    CompletableFuture.runAsync(() -> {  
        try {  
            service.send(clientId);  
        } catch (Exception e) {  
            throw new BusinessException("推送数据异常");  
        }  
    });  

    return emitter;  
}

@GetMapping("closeConn/{clientId}")  
@ApiOperation(value = " 关闭连接")  
public Result<String> closeConn(@PathVariable("clientId") @ApiParam("客户端 id") String clientId) {  
    service.closeConn(clientId);  
    return Result.success("连接已关闭");  
}

Sevice 层相关代码如下

private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();


@Override  
public SseEmitter getConn(@NotBlank String clientId) {  
    final SseEmitter sseEmitter = SSE_CACHE.get(clientId);  

    if (sseEmitter != null) {  
        return sseEmitter;  
    } else {  
        // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用  
        final SseEmitter emitter = new SseEmitter(600_000L);  
        // 注册超时回调,超时后触发
        emitter.onTimeout(() -> {  
            logger.info("连接已超时,正准备关闭,clientId = {}", clientId);  
            SSE_CACHE.remove(clientId);  
        });  
        // 注册完成回调,调用 emitter.complete() 触发
        emitter.onCompletion(() -> {  
            logger.info("连接已关闭,正准备释放,clientId = {}", clientId);  
            SSE_CACHE.remove(clientId);  
            logger.info("连接已释放,clientId = {}", clientId);  
        });  
        // 注册异常回调,调用 emitter.completeWithError() 触发
        emitter.onError(throwable -> {  
            logger.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);  
            SSE_CACHE.remove(clientId);  
        });  

        SSE_CACHE.put(clientId, emitter);  

        return emitter;  
    }  
}

/**  
* 模拟类似于 chatGPT 的流式推送回答  
*  
* @param clientId 客户端 id  
* @throws IOException 异常  
*/  
@Override  
public void send(@NotBlank String clientId) throws IOException {  
    final SseEmitter emitter = SSE_CACHE.get(clientId);  
    // 推流内容到客户端  
    emitter.send("此去经年", org.springframework.http.MediaType.APPLICATION_JSON);  
    emitter.send("此去经年,应是良辰好景虚设");  
    emitter.send("此去经年,应是良辰好景虚设,便纵有千种风情");  
    emitter.send("此去经年,应是良辰好景虚设,便纵有千种风情,更与何人说");  
    // 结束推流  
    emitter.complete();  
}

@Override  
public void closeConn(@NotBlank String clientId) {  
    final SseEmitter sseEmitter = SSE_CACHE.get(clientId);   
    if (sseEmitter != null) {  
        sseEmitter.complete();  
    }  
}

接口调试

如果在推送数据过程中由客户端主动停止推送数据,可以直接调用关闭连接的接口。

注意事项

推送数据结束后,不要在 finally 块中调用 emitter.complete() 来关闭连接,否则会触发一个很诡异的BUG,如果此时在很短的时间内请求别的接口,可能会收到一个502 bad Gateway 的异常信息,原因就是和这个帖子 记一次springboot应用偶发502错误的排查过程_帅帅兔子的博客-CSDN博客 差不多。

与 WebSocket 对比

SSE(SseEmitter)与WebSocket的主要区别:

  • 建立连接的方式不同:
    • SSE:客户端发送一个长连接请求,然后服务端将事件通过 HTTP 响应推送给客户端。
    • WebSocket:采用双工通信,客户端和服务器建立实时的双向通信信道。
  • 传输效率不同:
    • SSE:需要经常建立和关闭连接,效率不如 WebSocket。但支持 HTTP 缓存。
    • WebSocket:建立后保持连接不断,效率高于SSE。
  • 兼容性不同:
    • SSE:原生支持的浏览器相对较少。需要Polyfill。
    • WebSocket:现代浏览器基本全面支持。
  • 传输内容不同:
    • SSE:只允许推送文本,不支持传输二进制数据。
    • WebSocket:支持传输文本以及二进制数据。
  • 功能不同:
    • SSE:只支持服务器主动推送,客户端只能被动接收。
    • WebSocket:支持双向全 duplex 通信,客户端和服务器都可以主动发送消息。
  • 使用场景不同:
    • SSE:适用于需要一对一推送事件的场景。客户端只需监听,服务器主动推送。
    • WebSocket:适用于需要实时双向交互的场景。例如聊天应用。

总的来说:

  • SSE 适用于服务器单向推送文本事件的场景,兼容性稍差但效率高。
  • WebSocket 适用于实时双向通信的场景,效率更高但兼容性要求高。

参考资料

作者:Jeebiz  创建时间:2024-03-06 15:12
最后编辑:Jeebiz  更新时间:2024-10-05 00:01