陆军指挥官免广告版
74.5MB · 2025-10-31
在 RAG 系统中,即便采用性能卓越的 LLM 并反复打磨 Prompt,问答仍可能出现上下文缺失、事实性错误或拼接不连贯等问题。多数团队会频繁更换检索算法与 Embedding模型,但收益常常有限。真正的瓶颈,往往潜伏在数据入库之前的一个细节——文档分块(chunking)。不当的分块会破坏语义边界,拆散关键线索并与噪声纠缠,使被检索的片段呈现“顺序错乱、信息残缺”的面貌。在这样的输入下,再强大的模型也难以基于支离破碎的知识推理出完整、可靠的答案。某种意义上,分块质量几乎决定了RAG的性能上限——它决定知识是以连贯的上下文呈现,还是退化为无法拼合的碎片。
在实际场景中,最常见的错误是按固定长度生硬切割,忽略文档的结构与语义:定义与信息被切开、表头与数据分离、步骤说明被截断、代码与注释脱节,结果就是召回命中却无法支撑结论,甚至诱发幻觉与错误引用。相反,高质量的分块应尽量贴合自然边界(标题、段落、列表、表格、代码块等),以适度重叠保持上下文连续,并保留必要的来源与章节元数据,确保可追溯与重排可用。当分块尊重文档的叙事与结构时,检索的相关性与答案的事实一致性往往显著提升,远胜于一味更换向量模型或调参;换言之,想要真正改善 RAG 的稳健性与上限,首先要把“知识如何被切开并呈现给模型”这件事做好。
PS:本文主要是针对中文文档类型的嵌入进行实战。
分块是将大块文本分解成较小段落的过程,这使得文本数据更易于管理和处理。通过分块,我们能够更高效地进行内容嵌入(embedding),并显著提升从向量数据库中召回内容的相关性和准确性。
在实际操作中,分块的好处是多方面的。首先,它能够提高模型处理的效率,因为较小的文本段落更容易进行嵌入和检索。
其次,分块后的文本能够更精确地匹配用户查询,从而提供更相关的搜索结果。这对于需要高精度信息检索和内容生成的应用程序尤为重要。
通过优化内容的分块和嵌入策略,我们可以最大化LLM在各种应用场景中的性能。分块技术不仅提高了内容召回的准确性,还提升了整体系统的响应速度和用户体验。
因此,在构建和优化基于LLM的应用程序时,理解和应用分块技术是不可或缺的步骤。
分块过程中主要的两个概念:chunk_size块的大小,chunk_overlap重叠窗口。
总之理想的分块是在“上下文完整性”和“信息密度”之间取得动态平衡:chunk_size决定信息承载量,chunk_overlap 用于弥补边界断裂并维持语义连续。只要边界对齐语义、粒度贴合内容,检索与生成的质量就能提升。
基于固定长度分块
from langchain_text_splitters import CharacterTextSplitter
splitter = CharacterTextSplitter(
    separator="",        # 纯按长度切
    chunk_size=600,      # 依据实验与模型上限调整
    chunk_overlap=90,    # 15% 重叠
)
chunks = splitter.split_text(text)
基于句子的分块
import re
def split_sentences_zh(text: str):
    # 在句末标点(。!?;)后面带可选引号的场景断句
    pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)')
    sentences = [m.group(0).strip() for m in pattern.finditer(text) if m.group(0).strip()]
    return sentences
def sentence_chunk(text: str, chunk_size=600, overlap=80):
    sents = split_sentences_zh(text)
    chunks, buf = [], ""
    for s in sents:
        if len(buf) + len(s) <= chunk_size:
            buf += s
        else:
            if buf:
                chunks.append(buf)
            # 简单重叠:从当前块尾部截取 overlap 字符与下一句拼接
            buf = (buf[-overlap:] if overlap > 0 and len(buf) > overlap else "") + s
    if buf:
        chunks.append(buf)
    return chunks
chunks = sentence_chunk(text, chunk_size=600, overlap=90)
HanLP 分句示例:
from hanlp_common.constant import ROOT
import hanlp
tokenizer = hanlp.load('PKU_NAME_MERGED_SIX_MONTHS_CONVSEG')  # 或句法/句子级管线
# HanLP 高层 API 通常通过句法/语料管线获得句子边界,具体以所用版本 API 为准
# 将句子列表再做聚合为 chunk_size
基于递归字符分块
import re
from langchain_text_splitters import RecursiveCharacterTextSplitter
separators = [
    r"n#{1,6}s",                 # 标题
    r"nd+(?:.d+)*s",          # 数字编号标题 1. / 2.3. 等
    "nn",                        # 段落
    "n",                          # 行
    " ",                           # 空格
    "",                            # 兜底字符级
]
splitter = RecursiveCharacterTextSplitter(
    separators=separators,
    chunk_size=700,
    chunk_overlap=100,
    is_separator_regex=True,       # 告诉分割器上面包含正则
)
chunks = splitter.split_text(text)
总结
利用文档固有结构(标题层级、列表、代码块、表格、对话轮次)作为分块边界,逻辑清晰、可追溯性强,能在保证上下文完整性的同时提升检索信噪比。
结构化文本分块
import re
from typing import List, Dict
heading_pat = re.compile(r'^(#{1,6})s+(.*)$')  # 标题
fence_pat = re.compile(r'^```')                 # fenced code fence
def split_markdown_structure(text: str, chunk_size=900, min_chunk=250, overlap_ratio=0.1) -> List[Dict]:
    lines = text.splitlines()
    sections = []
    in_code = False
    current = {"level": 0, "title": "", "content": [], "path": []}
    
    path_stack = []  # [(level, title)]
    
    for ln in lines:
        if fence_pat.match(ln):
            in_code = not in_code
        m = heading_pat.match(ln) if not in_code else None
        if m:
            if current["content"]:
                sections.append(current)
            level = len(m.group(1))
            title = m.group(2).strip()
            while path_stack and path_stack[-1][0] >= level:
                path_stack.pop()
            path_stack.append((level, title))
            breadcrumbs = [t for _, t in path_stack]
            current = {"level": level, "title": title, "content": [], "path": breadcrumbs}
        else:
            current["content"].append(ln)
    
    if current["content"]:
        sections.append(current)
    
    # 通过二次拆分/合并将部分平铺成块
    chunks = []
    def emit_chunk(text_block: str, path: List[str], level: int):
        chunks.append({
            "text": text_block.strip(),
            "meta": {
                "section_title": path[-1] if path else "",
                "breadcrumbs": path,
                "section_level": level,
            }
        })
    
    for sec in sections:
        raw = "n".join(sec["content"]).strip()
        if not raw:
            continue
        if len(raw) <= chunk_size:
            emit_chunk(raw, sec["path"], sec["level"])
        else:
            paras = [p.strip() for p in raw.split("nn") if p.strip()]
            buf = ""
            for p in paras:
                if len(buf) + len(p) + 2 <= chunk_size:
                    buf += (("nn" + p) if buf else p)
                else:
                    if buf:
                        emit_chunk(buf, sec["path"], sec["level"])
                    buf = p
            if buf:
                emit_chunk(buf, sec["path"], sec["level"])
    
    merged = []
    for ch in chunks:
        if not merged:
            merged.append(ch)
            continue
        if len(ch["text"]) < min_chunk and merged[-1]["meta"]["breadcrumbs"] == ch["meta"]["breadcrumbs"]:
            merged[-1]["text"] += "nn" + ch["text"]
        else:
            merged.append(ch)
    
    overlap = int(chunk_size * overlap_ratio)
    for ch in merged:
        bc = " > ".join(ch["meta"]["breadcrumbs"][-3:])
        prefix = f"[{bc}]n" if bc else ""
        if prefix and not ch["text"].startswith(prefix):
            ch["text"] = prefix + ch["text"]
        # optional character overlap can在检索阶段用邻接聚合替代,这里略
    
    return merged
对话式分块
from typing import List, Dict
def chunk_dialogue(turns: List[Dict], max_turns=10, max_chars=900, overlap_turns=2):
    """
    turns: [{"speaker":"User","text":"..." , "ts_start":123, "ts_end":130}, ...]
    """
    chunks = []
    i = 0
    while i < len(turns):
        j = i
        char_count = 0
        speakers = set()
        while j < len(turns):
            t = turns[j]
            uttr_len = len(t["text"])
            # 若单条超长,允许在句级二次切分(此处略),但不跨 speaker
            if (j - i + 1) > max_turns or (char_count + uttr_len) > max_chars:
                break
            char_count += uttr_len
            speakers.add(t["speaker"])
            j += 1
        
        if j > i:
            window = turns[i:j]
        elif i < len(turns):
            window = [turns[i]]
        else:
            break
        text = "n".join([f'{t["speaker"]}: {t["text"]}' for t in window])
        meta = {
            "speakers": list(speakers),
            "turns_range": (i, j - 1),
            "ts_start": window[0].get("ts_start"),
            "ts_end": window[-1].get("ts_end"),
        }
        chunks.append({"text": text, "meta": meta})
        
        # 按轮次重叠回退
        if j >= len(turns):
            break
        next_start = i + len(window) - overlap_turns
        i = max(next_start, i + 1)  # 确保至少前进1步
    return chunks
总结
该方法不依赖文档的物理结构,而是依据语义连续性与话题转移来决定切分点,尤其适合希望“块内高度内聚、块间清晰分界”的知识库与研究类文本。
语义分块
from typing import List, Dict, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
import re
def split_sentences_zh(text: str) -> List[str]:
    # 简易中文分句,可替换为 HanLP/Stanza 更稳健的实现
    pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)')
    return [m.group(0).strip() for m in pattern.finditer(text) if m.group(0).strip()]
def rolling_mean(vecs: np.ndarray, i: int, w: int) -> np.ndarray:
    s = max(0, i - w)
    e = min(len(vecs), i + w + 1)
    return vecs[s:e].mean(axis=0)
def semantic_chunk(
    text: str,
    model_name: str = "BAAI/bge-m3",
    window_size: int = 2,
    min_chars: int = 350,
    max_chars: int = 1100,
    lambda_std: float = 0.8,
    overlap_chars: int = 80,
) -> List[Dict]:
    sents = split_sentences_zh(text)
    if not sents:
        return []
    
    model = SentenceTransformer(model_name)
    emb = model.encode(sents, normalize_embeddings=True, batch_size=64, show_progress_bar=False)
    emb = np.asarray(emb)
    
    # 基于窗口均值的“新颖度”分数
    novelties = []
    for i in range(len(sents)):
        ref = rolling_mean(emb, i-1, window_size) if i > 0 else emb[0]
        ref = ref / (np.linalg.norm(ref) + 1e-8)
        novelty = 1.0 - float(np.dot(emb[i], ref))
        novelties.append(novelty)
    novelties = np.array(novelties)
    
    # 相对阈值:μ + λσ
    mu, sigma = float(novelties.mean()), float(novelties.std() + 1e-8)
    threshold = mu + lambda_std * sigma
    
    chunks, buf, start_idx = [], "", 0
    def flush(end_idx: int):
        nonlocal buf, start_idx
        if buf.strip():
            chunks.append({
                "text": buf.strip(),
                "meta": {"start_sent": start_idx, "end_sent": end_idx-1}
            })
        buf, start_idx = "", end_idx
    
    for i, s in enumerate(sents):
        # 若超长则先冲洗
        if len(buf) + len(s) > max_chars and len(buf) >= min_chars:
            flush(i)
            # 结构化重叠:附加上一个块的尾部
            if overlap_chars > 0 and len(s) < overlap_chars:
                buf = s
                continue
        
        buf += s
        
        # 达到最小长度后遇到突变则切分
        if len(buf) >= min_chars and novelties[i] > threshold:
            flush(i + 1)
    
    if buf:
        flush(len(sents))
    
    return chunks
主题的分块
from typing import List, Dict
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
import re
def split_sentences_zh(text: str) -> List[str]:
    pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)')
    return [m.group(0).strip() for m in pattern.finditer(text) if m.group(0).strip()]
def topic_chunk(
    text: str,
    k_topics: int = 5,
    min_chars: int = 500,
    max_chars: int = 1400,
    smooth_window: int = 2,
    model_name: str = "BAAI/bge-m3"
) -> List[Dict]:
    sents = split_sentences_zh(text)
    if not sents:
        return []
    
    model = SentenceTransformer(model_name)
    emb = model.encode(sents, normalize_embeddings=True, batch_size=64, show_progress_bar=False)
    emb = np.asarray(emb)
    
    km = KMeans(n_clusters=k_topics, n_init="auto", random_state=42)
    labels = km.fit_predict(emb)
    
    # 简单序列平滑:滑窗多数投票
    smoothed = labels.copy()
    for i in range(len(labels)):
        s = max(0, i - smooth_window)
        e = min(len(labels), i + smooth_window + 1)
        window = labels[s:e]
        vals, counts = np.unique(window, return_counts=True)
        smoothed[i] = int(vals[np.argmax(counts)])
    
    chunks, buf, start_idx, cur_label = [], "", 0, smoothed[0]
    def flush(end_idx: int):
        nonlocal buf, start_idx
        if buf.strip():
            chunks.append({
                "text": buf.strip(),
                "meta": {"start_sent": start_idx, "end_sent": end_idx-1, "topic": int(cur_label)}
            })
        buf, start_idx = "", end_idx
    
    for i, s in enumerate(sents):
        switched = smoothed[i] != cur_label
        over_max = len(buf) + len(s) > max_chars
        under_min = len(buf) < min_chars
        
        # 尝试延后切分,保证最小块长
        if switched and not under_min:
            flush(i)
            cur_label = smoothed[i]
        
        if over_max and not under_min:
            flush(i)
        
        buf += s
    
    if buf:
        flush(len(sents))
    
    return chunks
小-大分块
# 离线:构建小块索引,并保存 parent_id -> 大块文本 的映射
# 在线检索:
small_hits = small_index.search(embed(query), top_k=30)
groups = group_by_parent(small_hits)
scored_parents = score_groups(groups, agg="max")
candidates = top_m(scored_parents, m=3)
# 交叉编码重排
rerank_inputs = [(query, parent_text(pid)) for pid in candidates]
reranked = cross_encoder_rerank(rerank_inputs)
# 组装上下文:对每个父块,仅保留命中句及其邻近窗口,并加上标题路径
contexts = []
for pid, _ in reranked:
    hits = groups[pid]
    context = build_local_window(parent_text(pid), hits, window_sents=1)
    contexts.append(prefix_with_breadcrumbs(pid) + context)
final_context = pack_under_budget(contexts, token_budget=3000)    # 留出回答空间
父子段分块
from typing import List, Dict, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("BAAI/bge-m3")
def search_parent_child(query: str, top_k_child=40, top_m_parent=3, window_chars=180):
    q = embedder.encode([query], normalize_embeddings=True)[0]
    hits = small_index.search(q, top_k=top_k_child)  # 返回 [(child_id, score), ...]
    # 分组
    groups: Dict[str, List[Tuple[str, float]]] = {}
    for cid, score in hits:
        p = child_parent_id[cid]
        groups.setdefault(p, []).append((cid, float(score)))
    
    # 聚合打分(max + coverage)
    scored = []
    for pid, items in groups.items():
        scores = np.array([s for _, s in items])
        agg = 0.7 * scores.max() + 0.3 * (len(items) / (len(parents[pid]["sent_spans"]) + 1e-6))
        scored.append((pid, float(agg)))
    scored.sort(key=lambda x: x[1], reverse=True)
    candidates = [pid for pid, _ in scored[:top_m_parent]]
    
    # 为每个父块构造“命中窗口”
    contexts = []
    for pid in candidates:
        ptext = parents[pid]["text"]
        # 找到子块命中区间并合并窗口
        spans = sorted([(children[cid]["start"], children[cid]["end"]) for cid, _ in groups[pid]])
        merged = []
        for s, e in spans:
            s = max(0, s - window_chars)
            e = min(len(ptext), e + window_chars)
            if not merged or s > merged[-1][1] + 50:
                merged.append([s, e])
            else:
                merged[-1][1] = max(merged[-1][1], e)
        windows = [ptext[s:e] for s, e in merged]
        prefix = " > ".join(parents[pid]["meta"].get("breadcrumbs", [])[-3:])
        contexts.append((pid, f"[{prefix}]n" + "n...n".join(windows)))
    
    # 交叉编码重排(此处用占位函数)
    reranked = cross_encoder_rerank(query, [c[1] for c in contexts])  # 返回 indices 顺序
    ordered = [contexts[i] for i in reranked]
    return ordered  # [(parent_id, context_text), ...]
代理式分块
系统:你是分块器。目标:为RAG检索创建高内聚、可追溯的块。规则:
1) 不得在代码/表格/公式中间切分;
2) 每块400-1000字;
3) 保持标题路径完整;
4) 尽量让“定义+解释”在同一块;
5) 输出JSON,含 start_offset/end_offset/title_path。
用户:<文档片段文本>
助手(示例输出):
{
  "segments": [
    {"start": 0, "end": 812, "title_path": ["指南","安装"], "reason": "完整步骤+注意事项"},
    {"start": 813, "end": 1620, "title_path": ["指南","配置"], "reason": "参数表与示例紧密相关"}
  ]
}
单一策略难覆盖所有文档与场景。混合分块通过“先粗后细、按需细化”,在效率、可追溯性与答案质量之间取得稳健平衡。
from typing import List, Dict
def hybrid_chunk(
    doc_text: str,
    parse_structure,          # 函数:返回 [{'type': 'text|code|table|dialogue', 'text': str, 'breadcrumbs': [...], 'anchor': str}]
    recursive_splitter,       # 函数:text -> [{'text': str}]
    sentence_splitter,        # 函数:text -> [{'text': str}]
    semantic_splitter,        # 函数:text -> [{'text': str}]
    dialogue_splitter,        # 函数:turns(list) -> [{'text': str}],若无对话则忽略
    max_coarse_len: int = 1100,
    min_chunk_len: int = 320,
    target_len: int = 750,
    overlap_ratio: float = 0.1,
) -> List[Dict]:
    """
    返回格式: [{'text': str, 'meta': {...}}]
    """
    blocks = parse_structure(doc_text)  # 先拿到结构块
    chunks: List[Dict] = []
    
    def emit(t: str, meta_base: Dict):
        t = t.strip()
        if not t:
            return
        # 结构重叠前缀(标题路径)
        bc = " > ".join(meta_base.get("breadcrumbs", [])[-3:])
        prefix = f"[{bc}]n" if bc else ""
        chunks.append({
            "text": (prefix + t) if not t.startswith(prefix) else t,
            "meta": meta_base
        })
    
    for b in blocks:
        t = b["text"]
        btype = b.get("type", "text")
        
        # 原子块:代码/表格
        if btype in {"code", "table", "formula"}:
            emit(t, {**b, "splitter": "atomic"})
            continue
        
        # 对话块
        if btype == "dialogue":
            for ck in dialogue_splitter(b.get("turns", [])):
                emit(ck["text"], {**b, "splitter": "dialogue"})
            continue
        
        # 普通文本:依据长度与“可读性”启用不同细分器
        if len(t) <= max_coarse_len:
            # 中短文本:递归 or 句子
            sub = recursive_splitter(t)
            # 合并过短子块
            buf = ""
            for s in sub:
                txt = s["text"]
                if len(buf) + len(txt) < min_chunk_len:
                    buf += txt
                else:
                    emit(buf or txt, {**b, "splitter": "recursive"})
                    buf = "" if buf else ""
            if buf:
                emit(buf, {**b, "splitter": "recursive"})
        else:
            # 超长文本:语义分块优先
            for ck in semantic_splitter(t):
                emit(ck["text"], {**b, "splitter": "semantic"})
    
    # 轻量字符重叠(可选)
    if overlap_ratio > 0:
        overlapped = []
        for i, ch in enumerate(chunks):
            overlapped.append(ch)
            if i + 1 < len(chunks) and ch["meta"].get("breadcrumbs") == chunks[i+1]["meta"].get("breadcrumbs"):
                # 在相邻同章节块间引入小比例重叠
                ov = int(len(ch["text"]) * overlap_ratio)
                if ov > 0:
                    head = ch["text"][-ov:]
                    chunks[i+1]["text"] = head + chunks[i+1]["text"]
        chunks = overlapped
    
    return chunks
告别数据无序:得物数据研发与管理平台的破局之路
从一次启动失败深入剖析:Spring循环依赖的真相|得物技术
Apex AI辅助编码助手的设计和实践|得物技术
从 JSON 字符串到 Java 对象:Fastjson 1.2.83 全程解析|得物技术
用好 TTL Agent 不踩雷:避开内存泄露与CPU 100%两大核心坑|得物技术
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
 
                    