AI在线 AI在线

RAG系列:切分优化 - 基于句子余弦距离的语义切分

引言传统的文档切分方法通常采用基于特定字符和固定长度的切分策略,这种方法虽然实现简单,但在实际应用中容易割裂完整的语义单元,导致后续的信息检索与理解受到影响。 相比之下,一种更智能的切分方法是基于句子余弦距离的语义切分。 它不再依据特定字符和固定长度进行机械切分,而是对每个句子进行 embedding,以此来计算相邻句子的余弦距离,再通过算法算出一个相对合理的切分点(某个距离值),最后将不大于该阈值的相邻句子聚合在一起作为一个文档块,从而实现文档语义切分。

RAG系列:切分优化 - 基于句子余弦距离的语义切分

引言

传统的文档切分方法通常采用基于特定字符和固定长度的切分策略,这种方法虽然实现简单,但在实际应用中容易割裂完整的语义单元,导致后续的信息检索与理解受到影响。

相比之下,一种更智能的切分方法是基于句子余弦距离的语义切分。它不再依据特定字符和固定长度进行机械切分,而是对每个句子进行 embedding,以此来计算相邻句子的余弦距离,再通过算法算出一个相对合理的切分点(某个距离值),最后将不大于该阈值的相邻句子聚合在一起作为一个文档块,从而实现文档语义切分。

例如 句子_1、句子_2、句子_3 之间的余弦距离都小于该阈值,而 句子_3 与 句子_4 的余弦距离大于该阈值,则在 句子_3 和 句子_4 之间增加切分点,最终的切分结果就是把 句子_1、句子_2、句子_3 聚合在一个文档块中,句子_4 在其它的文档块中。

实现原理

基于余弦距离的语义切分大致分为以下5个步骤:

图片

langchain-experimental中的 SemanticChunker[1] 实现了基于余弦距离的语义切分,因此本文我将通过 SemanticChunker 的源码来带大家了解语义切分的实现原理。

以下是 SemanticChunker 的初始化参数,后面根据不同步骤所需要的参数来了解这些参数的具体作用。

复制
class SemanticChunker(
    # 向量模型
    embeddings: Embeddings,
    # 向前向后取 buffer_size 个句子一起 embedding
    buffer_size: int = 1, 
    # 是否在元数据添加开始切分的位置(以文档字符长度计算)
    add_start_index: bool = False,
    # 切分点计算方法
    breakpoint_threshold_type: BreakpointThresholdType = "percentile",
    # 切分点计算阈值
    breakpoint_threshold_amount: float | None = None,
    # 切分后的文档块数量
    number_of_chunks: int | None = None,
    # 句子切分规则
    sentence_split_regex: str = r"(?<=[.?!])\s+",
    # 最小文档块大小
    min_chunk_size: int | None = None
)

句子切分

这一步是通过特定规则将文档切分为一个个句子,在 SemanticChunker 中通过参数 sentence_split_regex 来设置规则进行切分,默认值为 r"(?<=[.?!])\s+",这是以英文的句号、问号、感叹号来进行切分的,而且是对比较规范的英文行文,也就是这三种标点后还跟空白字符的。如果要对中文文档切分,那就需要将这个正则表达式替换成能切分中文的,例如:r"(?<=[。?!\n])",也就是以中文的句号、问号、感叹号以及换行符来进行切分。

SemanticChunker的实现源码如下:

复制
import re

def _get_single_sentences_list(self, text: str) -> List[str]:
    return re.split(self.sentence_split_regex, text)

句子 embedding

这一步是将每个句子进行 embedding,理论上接着就以每个句子 embedding 结果来计算相邻句子的距离就可以了。但通过实际操作发现对单个句子处理噪音比较大,后续切分的效果并不理想,因此 SemanticChunker 通过 buffer_size 来控制当前句子前、后各取几个句子组成一组来计算 embedding 并计算余弦距离。例如buffer_size设置为为1(默认值),表示取当前句子前、后各取1个句子组成一组来计算 embedding。

SemanticChunker的实现源码如下:

首先根据buffer_size得到当前句子的组合。

复制
def combine_sentences(sentences: List[dict], buffer_size: int = 1) -> List[dict]:
    for i inrange(len(sentences)):
        # 创建一个字符串变量来保存连接的句子
        combined_sentence = ""

        # 添加当前句子之前 buffer_size 个句子
        for j inrange(i - buffer_size, i):
            if j >= 0:
                combined_sentence += sentences[j]["sentence"] + " "

        # 添加当前句子
        combined_sentence += sentences[i]["sentence"]

        # 添加当前句子之后 buffer_size 个句子
        for j inrange(i + 1, i + 1 + buffer_size):
            if j < len(sentences):
                combined_sentence += " " + sentences[j]["sentence"]

        # 将合并好的句子存储在当前的句子 combined_sentence 中
        sentences[i]["combined_sentence"] = combined_sentence

    return sentences

然后根据通过参数 embeddings传入的向量模型对句子组合进行 embedding。

复制
def _calculate_sentence_distances(self, single_sentences_list: List[str]) -> Tuple[List[float], List[dict]]:
    _sentences = [
        {"sentence": x, "index": i} for i, x in enumerate(single_sentences_list)
    ]
    sentences = combine_sentences(_sentences, self.buffer_size)
    embeddings = self.embeddings.embed_documents(
        [x["combined_sentence"] for x in sentences]
    )
    for i, sentence in enumerate(sentences):
        sentence["combined_sentence_embedding"] = embeddings[i]

    return calculate_cosine_distances(sentences)

计算相邻句子(组)余弦距离

这一步就是通过计算相邻句子(组) 的余弦相似度来得到相邻句子(组) 的余弦距离。

将横轴记为句子(组)的序号,纵轴为相邻句子(组) 的余弦距离,就可得到下面类似的图:

图片

SemanticChunker的实现源码如下:

复制
from langchain_community.utils.math import cosine_similarity

defcalculate_cosine_distances(sentences: List[dict]) -> Tuple[List[float], List[dict]]:
    distances = []
    for i inrange(len(sentences) - 1):
        embedding_current = sentences[i]["combined_sentence_embedding"]
        embedding_next = sentences[i + 1]["combined_sentence_embedding"]

        # 计算余弦相似度
        similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]

        # 转换成余弦距离
        distance = 1 - similarity
        distances.append(distance)

        # 保存余弦距离
        sentences[i]["distance_to_next"] = distance

    # 【可选】最后一个句子的处理
    # sentences[-1]['distance_to_next'] = None  # 或其它默认值

    return distances, sentences

计算切分点

如何计算切分点,SemanticChunker给出了4种方法:

  • percentile: 分位法,默认方法。将所有余弦距离在第 X 分位数的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。第 X 分位数可通过breakpoint_threshold_amount 设置,默认为 95。还可以通过 number_of_chunks 指定切分后的文档块总数量,采用线性插值的方式反向推导出该分位数;

SemanticChunker的实现源码如下:

复制
import numpy as np

def_calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
# 第一种方式:指定分位数
return cast(
      float,
      np.percentile(distances, self.breakpoint_threshold_amount),
  ), distances

# 第二种方式:通过 number_of_chunks 反向推导分位数
  x1, y1 = len(distances), 0.0
  x2, y2 = 1.0, 100.0
  x = max(min(self.number_of_chunks, x1), x2)
if x2 == x1:
      y = y2
else:
      y = y1 + ((y2 - y1) / (x2 - x1)) * (x - x1) # 线性插值
  y = min(max(y, 0), 100)
    return cast(
      float,
      np.percentile(distances, y),), 
      distances
  • standard_deviation: 标准差偏离法,是统计学中表示偏离的常规方法,这种方法比较适合正态分布。将所有余弦距离的平均值加上 X 倍的所有余弦距离标准差的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。X 倍可通过breakpoint_threshold_amount 设置,默认为 3,这是最常用的值;

SemanticChunker的实现源码如下:

复制
import numpy as np

def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
  return cast(
    float,
    np.mean(distances) + 
      self.breakpoint_threshold_amount * np.std(distances),), 
    distances
  • interquartile: 四分位距法,是统计学中表示偏离的另一种常规方法,这种方法计算分位数,所以数据分布不那么正态问题也不大。将所有余弦距离的平均值加上 X 倍的所有余弦距离四分位距的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。X 倍可通过breakpoint_threshold_amount 设置,默认为 1.5,也是最常用的值;

SemanticChunker的实现源码如下:

复制
import numpy as np

def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
  # 取出25分位(下四分位)和75分位(上四分位)的数值
  q1, q3 = np.percentile(distances, [25, 75])

  # 计算两个分位的差值(四分位距)
  iqr = q3 - q1

  return np.mean(distances) + 
            self.breakpoint_threshold_amount * iqr, distances
  • gradient: 梯度法。首先计算所有余弦距离的变化梯度,变化梯度计算出来后,就可以知道哪个地方余弦距离变化得快,然后将所有变化梯度在第 X 分位数的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。第 X 分位数可通过breakpoint_threshold_amount 设置,默认为 95。

SemanticChunker的实现源码如下:

复制
import numpy as np

def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
  # 计算所有余弦距离的变化梯度
  distance_gradient = np.gradient(distances, range(0, len(distances)))
  return cast(
      float,
      np.percentile(distance_gradient,
                    self.breakpoint_threshold_amount)),
      distance_gradient

按切分点切分

通过第4步各方法得到切分点后,就可以按切分点对文档进行切分(通过设置min_chunk_size控制合并较小的块),就可得到下面类似的图(包括切分位置以及切片):

图片

SemanticChunker的实现源码如下:

复制
def split_text(self, text: str,) -> List[str]:
      # 计算相邻句子的余弦距离
      distances, sentences = self._calculate_sentence_distances(single_sentences_list)

      # 计算切分点
      breakpoint_distance_threshold, breakpoint_array = self._calculate_breakpoint_threshold(distances)

      indices_above_thresh = [
          i
          for i, x inenumerate(breakpoint_array)
          if x > breakpoint_distance_threshold
      ]

      chunks = []
      start_index = 0

      # 遍历切分点来分割句子
      for index in indices_above_thresh:
          end_index = index
          group = sentences[start_index : end_index + 1]
          combined_text = " ".join([d["sentence"] for d in group])
        
          # 通过设置 min_chunk_size 来合并较小的文档块
          if (
              self.min_chunk_size isnotNone
              andlen(combined_text) < self.min_chunk_size
          ):
              continue
          chunks.append(combined_text)

          start_index = index + 1

      if start_index < len(sentences):
          combined_text = " ".join([d["sentence"] for d in sentences[start_index:]])
          chunks.append(combined_text)
      return chunks

代码实践

原 TypeScript 项目已使用 Python 进行了重构,后续将优先使用 Python 进行代码实践和讲解。

其中:RAG.libs 中是封装好的各种不同作用的模块,如 RAG/libs/text_splitter.py 是封装好的文档切分器,RAG/libs/evaluator.py 是封装好的评估器,因此文中不再贴具体的代码,如需查看具体代码实现,请移步到 github 代码仓库中查看。

本文完整代码地址[2]:https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py

先看下基于句子余弦距离的语义切分的评估结果:

图片

从评估结果来看,相较于 RecursiveCharacterTextSplitter 的切分方法,在上下文召回率、上下文相关性以及答案准确性都有不同程度的提升。

加载文件

复制
from RAG.libs.file_loader import FileLoader

file_loader = FileLoader(
    file_path="RAG/datas/2024少儿编程教育行业发展趋势报告.md",
    provider="customMarkdownLoader",
)
documents = file_loader.load()

语义切分

因为我们的文档只要是中文,因此需要将 sentence_split_regex修改成可对中文切分的规则,如:r"(?<=[。?!\n])"。

复制
from langchain_experimental.text_splitter import SemanticChunker
from RAG.libs.embedding import Embedding

# 向量模型
embeddings = Embedding(model="nomic-embed-text", provider="ollama")

# 使用 SemanticChunker 切分
text_splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile",
    sentence_split_regex=r"(?<=[。?!\n])",
)
documents = text_splitter.split_documents(documents=documents)

切分后处理

使用SemanticChunker进行切分,会出现较短或者较长的切片。比如通过percentile进行切分后的结果可以看到,最小的文档块大小只有 1,最大的文档块大小有 3346。因此,为了更好的检索效果,我们一般需要=对较长的文档做二次切分、对较短的文档进行合并和添加标题等其它切分后处理。

复制
count 75 // 总数
mean 731 // 平均值
std 685 // 标准差
min 1 // 最小值
25% 218 // 25分位值
50% 568 // 50分位值
75% 990 // 75分位值
90% 1535 // 90分位值
97% 2577 // 97分位值
99% 2876 // 99分位值
max 3346 // 最大值

将较大的文档块进行二次切分、合并较小的块和添加标题:

复制
from RAG.libs.text_splitter import (
    TextSplitter,
    merge_small_documents,
    add_headers_to_documents,
)

# 将较大的文档块进行二次切分
text_splitter = TextSplitter(
    provider="recursiveCharacter",
    chunk_size=500,
    chunk_overlap=0,
)
documents = text_splitter.split_documents(documents)

# 合并较小的块和添加标题
documents = merge_small_documents(documents, merge_max_length=100)
documents = add_headers_to_documents(documents)

效果评估

复制
from RAG.libs.evaluator import BatchEvaluator

eval_result = BatchEvaluator(
    chat_model=chat_model,
    vector_store=vector_store,
    qa_data=qa_data,
    top_k=3,
    filter=filter,
    output_path=output_path,
)

结语

本文介绍了一种更智能的切分方法 - 基于句子余弦距离的语义切分,并通过 langchain-experimental 中的 SemanticChunker 的源码来带大家了解了语义切分的实现原理。

从最后评估结果来看,相较于 RecursiveCharacterTextSplitter 的切分方法,在上下文召回率、上下文相关性以及答案准确性都有不同程度的提升,这说明通过基于句子余弦距离的语义切分方法对文档切分优化具有一定的可行性,大家可以根据自己的实际情况进一步验证,欢迎大家留言交流。

引用链接

[1] SemanticChunker: https://github.com/langchain-ai/langchain-experimental/blob/main/libs/experimental/langchain_experimental/text_splitter.py#L99

[2] 本文完整代码地址: https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py

相关资讯

RAG系列:切分优化 - 基于 Markdown 语法的文档切分

引言在RAG系列:解析优化 - 不同文件类型统一转换成Markdown一文中我们介绍了将不同文件类型统一解析转换成 Markdown 文件的好处。 本文我们接着这篇文章解析转换后的 Markdown 文件,介绍下基于 Markdown 语法的文档切分方法。 关于指标在RAG系列:系统评估 - 五个主流评估指标详解一文中我们介绍了评估 RAG 系统的五个主流指标,从本文开始,我会根据不同优化阶段来选择要重点关注的指标,不必要每次都关注五个指标的表现,这样可以让我们的优化更聚焦,通过优化每个阶段的重点指标,从而逐步优化系统的各个环节。
6/10/2025 4:30:00 AM
燃哥讲AI

从RAG到QA-RAG:整合生成式AI以用于药品监管合规流程

图片引言聊天机器人的进步近期生成式AI的进展显著增强了聊天机器人的能力。 这些由生成式人工智能驱动的聊天机器人在各个行业中的应用正在被探索[Bahrini等人,2023年;Castelvecchi,2023年;Badini等人,2023年],其中制药行业是一个显著的关注领域。 在药物发现领域,最近的研究表明,由生成式人工智能驱动的聊天机器人在推进药物发现方面可以发挥重要作用[Wang等人,2023年;Savage,2023年;Bran等人,2023年]。
5/8/2025 2:22:00 AM
Wolfgang

FastRAG:半结构化数据的检索增强生成

本文介绍了FastRAG,一种针对半结构化数据的新型RAG方法。 FastRAG采用模式学习和脚本学习来提取和结构化数据,而无需将整个数据源提交给LLM。 它将文本搜索与知识图谱(KG)查询相结合,以提高在问答任务中检索上下文丰富信息的准确性。
1/23/2025 4:23:30 PM
  • 1