复真书法app
106.21MB · 2025-09-11
在大模型问答(LLM Chat)等实时场景中,用户期待能像 ChatGPT 一样边生成边输出,而不是等待完整结果。 实现这一效果的关键技术之一,就是 SSE(Server-Sent Events)流式输出。
我们知道,在前端开发中,常见的实时通信方案有:
SSE 的优势在于:
EventSource
API 开箱即用。SSE基于http长连接,服务端的响应头通常如下:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
之后服务端会持续不断地推送数据,每条数据的格式如下:
浏览器提供了 EventSource
对象来直接消费 SSE:
const eventSource = new EventSource('/sse/stream');
// 监听 message 事件
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('收到数据:', data);
};
// 自定义事件类型
eventSource.addEventListener('thinking', (event) => {
console.log('AI 正在思考:', event.data);
});
// 错误处理
eventSource.onerror = (err) => {
console.error('SSE 连接出错', err);
eventSource.close();
};
特点:
在大模型场景中,很多接口是 POST
请求(带上下文参数),而 EventSource
只支持 GET
。
这时我们常用 fetch + ReadableStream:
async function startStream() {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: '你好' }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
console.log('流数据:', chunk);
}
}
这种方式需要自己解析 event:
/ data:
行,适合更灵活的场景。
用户提问后,模型逐步返回回答内容:
async function askLLM(question: string) {
const response = await fetch('/llm/answer', { ... });
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let answer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
if (chunk.startsWith('data:')) {
const data = JSON.parse(chunk.slice(5));
answer += data.content;
render(answer);
}
}
}
很多大模型服务会把不同阶段拆成事件:
event: thinking
→ 模型推理过程event: mainbody
→ 主体回答event: result
→ 结构化结果前端可以根据事件类型决定展示形式(如灰色小字显示 "AI 正在思考...")。
技术对比:
event
字段,可以区分不同 Agent 的输出,特别适合多 Agent 协作场景。前端可以累积接收到的数据,逐步渲染,大幅降低延迟感。多 Agent 场景拓展:
event: agentA
、event: agentB
。思路:为每个 Agent 设置一个定时器,如果超过指定时间还没有返回数据,就标记该 Agent 超时。
interface AgentState {
buffer: string[];
timeoutId?: ReturnType<typeof setTimeout>;
completed: boolean;
error?: string;
}
// 初始化 Agent 状态
const agents: Record<string, AgentState> = {
agentA: { buffer: [], completed: false },
agentB: { buffer: [], completed: false },
};
// 设置超时时间 5s
const TIMEOUT = 5000;
function startAgentTimeout(agentName: string) {
agents[agentName].timeoutId = setTimeout(() => {
agents[agentName].completed = true;
agents[agentName].error = '超时';
renderCombined();
}, TIMEOUT);
}
// 收到 Agent 数据时取消定时器
function handleAgentData(agentName: string, chunk: string) {
clearTimeout(agents[agentName].timeoutId);
agents[agentName].buffer.push(chunk);
renderCombined();
}
效果:如果 Agent 很久没返回数据,前端会自动标记失败,但不影响其他 Agent 的输出。
思路:对失败的 Agent 触发单独请求,保持其他 Agent 输出不受影响。
async function retryAgent(agentName: string, retries = 2) {
for (let i = 0; i < retries; i++) {
try {
const chunks = await fetchAgentStream(agentName); // 单独请求
chunks.forEach(chunk => handleAgentData(agentName, chunk));
agents[agentName].completed = true;
return;
} catch (err) {
console.warn(`Agent ${agentName} 第${i+1}次重试失败`);
}
}
agents[agentName].error = '重试失败';
renderCombined();
}
// 触发失败 Agent 重试
function handleAgentError(agentName: string) {
retryAgent(agentName);
}
效果:某个 Agent 出现错误或超时,可以单独重试,其他 Agent 的输出继续。
思路:维护一个优先级数组,高优先级 Agent 的输出立即渲染,低优先级 Agent 继续累积。
const priorityOrder = ['agentA', 'agentB', 'agentC'];
function renderCombined() {
let combined = '';
for (const agent of priorityOrder) {
const state = agents[agent];
if (state.buffer.length) {
combined += state.buffer.join('');
}
if (state.error) {
combined += `[${agent} 错误: ${state.error}]`;
}
}
display(combined);
}
combined
buffer
中,但不会阻塞高优先级渲染106.21MB · 2025-09-11
22.55MB · 2025-09-11
63.0MB · 2025-09-11