脑洞侦探
48.29MB · 2025-10-05
大家好,我是程序员云喜,向大家分享我的编程成长之路!
最早让我产生兴趣的,是看豆包的“打字机式”对话效果:它不像普通对话那样卡顿几秒后突然冒出一大段文字,而是像真人一样边思考边表达,随时可以打断,说完还会有一个自然的收尾。后来,在学习RAG的过程中我逐渐了解到了它的本质,所以我把这种体验拆解成了四个关键点:传输、生成、控制和可观测性。这篇文章主要聚焦于“流式对话”的工程实现。
流式传输的核心优势可通过三个典型场景体现:
流式的核心逻辑是:把“第一个可用的字”立刻送出去,具体拆解为四层实现:
为降低前后端耦合,将流式对话消息简化为四类核心事件,协议越简单,越不容易踩坑。
// 1. 流式正文(增量内容)
{ "type": "chunk", "chunk": "……当前片段……", "ts": 1710000000000 }
// 2. 正常完成(生成结束)
{ "type": "completion", "status": "finished", "ts": 1710000002345 }
// 3. 用户主动停止(中断响应)
{ "type": "stop", "message": "响应已停止", "ts": 1710000001234 }
// 4. 兜底报错(异常处理)
{ "type": "error", "message": "AI服务暂不可用,请稍后重试", "ts": 1710000001200 }
chunk
:立即append到页面,实现增量渲染。completion/stop/error
:执行收尾操作(如隐藏加载状态、记录埋点)。维度 | 详情 |
---|---|
用途 | 统一前后端事件语义,降低耦合度 |
输入 | 后端向前端推送的事件对象 |
输出 | UI渲染、状态流转、埋点数据 |
关键设计点 | 事件最小集(避免冗余)、时间戳(排序)、可扩展字段(reqId/convId/seq) |
常见坑 | 事件过多导致状态机复杂、字段未版本化(迭代兼容问题) |
扩展方案 | 增加keepalive (心跳)、tool_call (工具调用)、metadata (元数据) |
const resp = await fetch('/api/stream');
const reader = resp.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
append(decoder.decode(value)); // 增量渲染
}
const ws = new WebSocket('wss://host/ws/stream');
// 接收流式内容
ws.onmessage = (e) => append(JSON.parse(e.data).chunk);
// 发送提问请求
ws.send(JSON.stringify({ type: 'ask', text: '你好' }));
// 主动停止生成
ws.send(JSON.stringify({ type: 'stop' }));
stop
确认;public class ChatGateway {
private final StreamingLlmClient llm;
// 会话级停止标志,线程安全
private final Map<String, Boolean> stopFlags = new ConcurrentHashMap<>();
// 处理“提问”请求
public void onAsk(WebSocketSession session, String question, List<Map<String,String>> history, String context) {
stopFlags.put(session.getId(), false); // 初始化停止标志
llm.stream(
question, context, history,
// 收到LLM片段时,推送chunk事件
chunk -> send(session, Map.of("type","chunk","chunk",chunk,"ts",System.currentTimeMillis())),
// LLM生成完成时,推送completion事件
() -> send(session, Map.of("type","completion","status","finished","ts",System.currentTimeMillis())),
// 发生错误时,推送error事件
err -> send(session, Map.of("type","error","message","AI服务暂不可用","ts",System.currentTimeMillis())),
// 停止条件判断(会话级标志)
() -> Boolean.TRUE.equals(stopFlags.get(session.getId()))
);
}
// 处理“停止”请求
public void onStop(WebSocketSession session){
stopFlags.put(session.getId(), true); // 标记会话停止
// 推送stop事件给前端
send(session, Map.of("type","stop","message","响应已停止","ts",System.currentTimeMillis()));
}
// 安全发送(避免连接异常导致的线程终止)
private void send(WebSocketSession session, Map<String, Object> data) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(new ObjectMapper().writeValueAsString(data)));
} catch (IOException e) {
// 日志记录,避免抛出异常中断流程
log.error("WebSocket send failed: {}", e.getMessage());
}
}
}
}
代码说明:
stopFlags
(支持并发控制)、safeSend
(异常保护);rateLimiter
(限流)、metrics
(监控钩子)、onClose
(资源清理)。public class StreamingLlmClient {
private final WebClient webClient; // 响应式HTTP客户端
/**
* 流式调用LLM
* @param question 用户问题
* @param context 上下文(如RAG检索结果)
* @param history 对话历史
* @param onChunk 片段回调
* @param onComplete 完成回调
* @param onError 错误回调
* @param shouldStop 停止条件
*/
public void stream(String question, String context, List<Map<String,String>> history,
Consumer<String> onChunk, Runnable onComplete,
Consumer<Throwable> onError, BooleanSupplier shouldStop){
// 构建LLM请求体(适配主流模型API格式)
Map<String,Object> body = Map.of(
"model","your-model", // 模型名称
"stream", true, // 开启流式
"messages", List.of(
Map.of("role","system","content", buildSystemPrompt(context)), // 系统提示
// 追加对话历史(省略历史拼接逻辑)
Map.of("role","user","content", question) // 用户问题
)
);
webClient.post().uri("/v1/chat/completions") // LLM API地址
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(body)
.retrieve()
.bodyToFlux(String.class) // 流式接收响应(Flux为响应式流)
.doOnError(onError) // 错误处理
.subscribe(raw -> {
// 检查是否需要停止
if (shouldStop.getAsBoolean()) {
onComplete.run();
return;
}
// 解析LLM返回的增量文本(需适配具体厂商格式,如OpenAI的SSE)
String delta = parseDelta(raw);
if (!delta.isEmpty()) {
onChunk.accept(delta); // 回调传递片段
}
// 检查是否生成完成(如LLM返回finish_reason或[DONE]标志)
if (isFinished(raw)) {
onComplete.run();
}
});
}
// 构建系统提示(拼接上下文)
private String buildSystemPrompt(String context) {
return "基于以下上下文回答问题:n" + context;
}
// 解析增量文本(示例逻辑,需按实际返回格式调整)
private String parseDelta(String raw) {
// 省略具体解析逻辑(如处理SSE的data:前缀、JSON解析delta字段)
return "解析后的增量文本";
}
// 判断是否生成完成(示例逻辑)
private boolean isFinished(String raw) {
return raw.contains("[DONE]") || raw.contains(""finish_reason":"stop"");
}
}
代码说明:
parseDelta/isFinished
可插拔(适配不同厂商)、响应式流(背压支持);onToolCall
(工具调用回调)、onCitation
(引用回调)、多路模型竞速(racing)。模块 | 核心职责 |
---|---|
ChatGateway(网关) | 会话管理、指令接收(ask/stop)、事件推送(chunk/completion等) |
StreamingLlmClient | LLM请求构建、流式响应解析、回调触发(onChunk/onComplete)、停止条件判断 |
两者共同构成流式对话系统:网关负责“协议交互”,LLM客户端负责“模型通信”,分工协作实现端到端流式能力。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final ChatWebSocketHandler handler;
// 构造注入处理器
public WebSocketConfig(ChatWebSocketHandler handler) {
this.handler = handler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/ws/stream") // 注册WebSocket端点
.setAllowedOrigins("https://your-frontend.example.com") // 精确CORS白名单(避免*)
.addInterceptors(new WebSocketHandshakeInterceptor() {
// 握手期鉴权(如Cookie/Token验证)
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String token = request.getHeaders().getFirst("Authorization");
// 鉴权逻辑(省略,如验证token有效性)
if (token == null || !isValidToken(token)) {
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return false;
}
return true;
}
});
}
// 简单的token验证(示例)
private boolean isValidToken(String token) {
// 实际项目中对接认证服务(如OAuth2、JWT验证)
return "valid-token".equals(token);
}
}
关键要点:
proxy_read_timeout
(延长超时)、开启压缩(减少带宽)。let ws;
let backoff = 500; // 重连退避时间(初始500ms)
const SESSION_ID = localStorage.getItem('sessionId') || generateSessionId(); // 会话ID(持久化)
// 生成唯一会话ID
function generateSessionId() {
const id = `session_${Date.now()}_${Math.random().toString(36).slice(2)}`;
localStorage.setItem('sessionId', id);
return id;
}
// 连接WebSocket
function connect() {
// 构建连接URL(携带会话ID)
const wsUrl = `wss://host/ws/stream?sessionId=${SESSION_ID}`;
ws = new WebSocket(wsUrl);
// 连接成功:重置退避时间
ws.onopen = () => {
backoff = 500;
console.log('WebSocket connected');
};
// 接收消息:处理四类事件
ws.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
switch (msg.type) {
case 'chunk':
appendToUI(msg.chunk); // 增量渲染
break;
case 'completion':
finishUI(); // 收尾(如隐藏加载动画)
recordMetric('completion', msg.ts); // 埋点
break;
case 'stop':
stopUI(); // 显示“已停止”状态
break;
case 'error':
showErrorUI(msg.message); // 显示错误提示
break;
}
};
// 连接关闭:指数退避重连(最大10秒)
ws.onclose = (ev) => {
console.log('WebSocket closed, reconnecting...');
setTimeout(connect, Math.min(backoff *= 2, 10000));
};
// 连接错误:同close逻辑(触发重连)
ws.onerror = (err) => {
console.error('WebSocket error:', err);
ws.close();
};
}
// 发送提问请求
function ask(text) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'ask',
text: text,
sessionId: SESSION_ID
}));
} else {
showErrorUI('连接未就绪,请稍后重试');
}
}
// 发送停止请求
function stop() {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'stop',
sessionId: SESSION_ID
}));
}
}
// 初始化连接
connect();
关键要点:
seq
字段(消息排序)、本地缓冲(重连后补传)。type=stop
确认;type=error
文案,避免泄露内部细节;日志里保留堆栈。示例(响应式背压):
webClient.post().uri("/v1/chat/completions")
.retrieve()
.bodyToFlux(String.class)
.onBackpressureBuffer(1024)
.limitRate(64)
.subscribe(raw -> { /* parse + onChunk */ });
conversation:{id}
存历史;用户与会话用 user:{uid}:current_conversation
关联。指标命名建议:
stream_ttfb_ms
、stream_chunks_per_second
、stream_completion_ratio
;stream_stop_ratio
、stream_error_ratio
、stream_disconnect_ratio
;reqId/sessionId/convId/userScopedId
,便于串联。filter
。completion
。stop
;所谓“豆包式”的一句一句,本质是“把第一口热乎的字先端上来”。当传输、生成、检索、控制协同起来,用户不再“等答案”,而是在“看它开口说话”。按上面的最小组合做起,你很快就能把“会说话”的 RAG 放进生产。