驾驶俱乐部无限金币版
121.35MB · 2025-10-30
在传统的Spring MVC控制器中,我们通常返回一个完整的对象(如@ResponseBody String或ResponseEntity<User>)。框架会一次性将整个响应体序列化并写入HTTP响应。这种方式简单直观,但在某些场景下存在严重瓶颈:
OutOfMemoryError。异步流式响应就是为了解决这些问题而生。它的核心思想是:服务器端在处理数据的同时,逐步地将数据块(Chunks)发送给客户端。这实现了服务器资源的有效利用和客户端更快的感知速度。
ResponseBodyEmitter 是Spring 4.2引入的用于异步生成响应体的核心类。它本身是一个持有器,并不直接处理IO,而是将实际的数据写入工作委托给一个HttpMessageConverter。
工作流程:
ResponseBodyEmitter:Spring MVC识别到这个返回值后,会立即释放请求线程(Tomcat线程),但保持HTTP响应连接处于打开状态。TaskExecutor、@Async方法或其他任何线程)来进行业务计算。ResponseBodyEmitter对象的send()方法,多次发送数据块。这些数据块可以是String、Object(会被转换器序列化)、甚至是HttpMessage。complete()或completeWithError()来最终关闭连接。背后的技术:HTTP分块传输编码(Chunked Transfer Encoding)
服务器通过Transfer-Encoding: chunked响应头告知客户端:“我将分块发送数据,每个块都包含自身的大小”。这使得服务器可以在不知道整体内容长度的情况下开始传输响应。
让我们通过一个“实时数据报表生成”的场景来演示。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/api/reports")
public class ReportController {
// 创建一个专用的线程池来处理异步任务,避免使用Tomcat的工作线程
private final ExecutorService nonBlockingService = Executors.newCachedThreadPool();
@GetMapping("/streaming")
public ResponseBodyEmitter getStreamingReport() {
// 1. 创建ResponseBodyEmitter实例,可以设置超时时间(毫秒)
ResponseBodyEmitter emitter = new ResponseBodyEmitter(60_000L); // 60秒超时
// 2. 将耗时的计算任务提交到后台线程池
nonBlockingService.execute(() -> {
try {
// 模拟生成报告的不同阶段
for (int i = 1; i <= 10; i++) {
// 3. 模拟每一部分计算耗时
Thread.sleep(1000);
// 4. 发送一个数据块:这里可以发送任何对象,由Jackson等转换器序列化
ReportDataChunk chunk = new ReportDataChunk("Phase " + i, "Data for phase " + i, i * 10);
emitter.send(chunk);
// 也可以发送预序列化的字符串或JSON
// emitter.send("{"phase": "Phase " + i + ""}n");
}
// 5. 处理完成,关闭连接
emitter.complete();
} catch (IOException | InterruptedException e) {
// 6. 发生错误,终止并发送错误信息
emitter.completeWithError(e);
}
});
// 7. 立即返回emitter对象给Spring MVC框架
return emitter;
}
// 静态内部类,代表一个数据块
static class ReportDataChunk {
private String phase;
private String data;
private Integer progress;
// 构造方法、Getters和Setters省略...
public ReportDataChunk(String phase, String data, Integer progress) {
this.phase = phase;
this.data = data;
this.progress = progress;
}
}
}
ResponseBodyEmitter时务必设置合理的超时时间,防止连接长期挂起。Thread.sleep)。必须使用自定义线程池(如示例)或Spring的@Async。try-catch中调用send()和complete(),并使用completeWithError()通知客户端。前端可以使用多种方式消费这种流式API,EventSource和Fetch API是最常见的两种。
async function fetchStreamingReport() {
const response = await fetch('/api/reports/streaming');
// 检查响应是否成功
if (!response.ok) {
console.error('Server error:', response.status);
return;
}
// 重要:确保响应是分块的
// response.body 是一个 ReadableStream
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8'); // 用于将Uint8Array解码为字符串
try {
while (true) {
// read() 返回一个Promise,解析为下一个数据块
const { value, done } = await reader.read();
if (done) {
console.log('Stream completed');
break;
}
// 解码并处理块(假设服务器发送的是JSON字符串)
const chunkString = decoder.decode(value);
try {
// 如果每个块是一个完整的JSON对象
const dataChunk = JSON.parse(chunkString);
console.log('Received chunk:', dataChunk);
// 更新UI:更新进度条、填充表格等
updateProgress(dataChunk.progress);
appendToTable(dataChunk);
} catch (e) {
console.error('Error parsing chunk JSON:', e, chunkString);
}
}
} catch (error) {
console.error('Stream reading failed:', error);
} finally {
reader.releaseLock();
}
}
function updateProgress(percent) {
document.getElementById('progressBar').style.width = percent + '%';
}
function appendToTable(chunk) {
const table = document.getElementById('reportTable');
// ... 将chunk数据插入表格的行逻辑
}
如果你的服务端每个发送的数据块都是遵循SSE格式的字符串(如data: {...}nn),前端可以使用更简单的EventSource。但ResponseBodyEmitter默认不强制SSE格式,需要手动构建格式。
// 服务端发送SSE格式
emitter.send("data: " + JSON.stringify(chunk) + "nn");
前端EventSource代码:
// 前端使用EventSource
const eventSource = new EventSource('/api/reports/streaming');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received:', data);
// 更新UI
};
eventSource.onerror = function(err) {
console.error('EventSource failed:', err);
eventSource.close();
};
资源管理:
emitter.onTimeout()和emitter.onCompletion()注册回调),及时取消后台任务,节省资源。错误处理:
try-catch:确保所有可能的异常都能被捕获并通过completeWithError()通知客户端。{“status”: “error”, “message”: “...”})。可观察性(Observability) :
使用场景建议:
ResponseBodyEmitter:当你需要分步发送结构化的数据(如JSON对象序列)时。SseEmitter:当你需要向浏览器客户端推送实时事件流时。StreamingResponseBody:当你需要高效流式传输原始字节(如文件)时。ResponseBodyEmitter及其衍生的SseEmitter是Spring武器库中用于构建高效、可扩展、响应式Web应用的强大工具。它们将服务器从同步阻塞的枷锁中解放出来,极大地提升了处理长时间任务和大数据量响应的能力。正确地在项目中应用它们,可以显著改善用户体验和系统资源利用率。