引言
传统的文档切分方法通常采用基于特定字符和固定长度的切分策略,这种方法虽然实现简单,但在实际应用中容易割裂完整的语义单元,导致后续的信息检索与理解受到影响。
相比之下,一种更智能的切分方法是基于句子余弦距离的语义切分。它不再依据特定字符和固定长度进行机械切分,而是对每个句子进行 embedding,以此来计算相邻句子的余弦距离,再通过算法算出一个相对合理的切分点(某个距离值),最后将不大于该阈值的相邻句子聚合在一起作为一个文档块,从而实现文档语义切分。
例如 句子_1、句子_2、句子_3 之间的余弦距离都小于该阈值,而 句子_3 与 句子_4 的余弦距离大于该阈值,则在 句子_3 和 句子_4 之间增加切分点,最终的切分结果就是把 句子_1、句子_2、句子_3 聚合在一个文档块中,句子_4 在其它的文档块中。
实现原理
基于余弦距离的语义切分大致分为以下5个步骤:
langchain-experimental中的 SemanticChunker[1] 实现了基于余弦距离的语义切分,因此本文我将通过 SemanticChunker 的源码来带大家了解语义切分的实现原理。
以下是 SemanticChunker 的初始化参数,后面根据不同步骤所需要的参数来了解这些参数的具体作用。
复制class SemanticChunker( # 向量模型 embeddings: Embeddings, # 向前向后取 buffer_size 个句子一起 embedding buffer_size: int = 1, # 是否在元数据添加开始切分的位置(以文档字符长度计算) add_start_index: bool = False, # 切分点计算方法 breakpoint_threshold_type: BreakpointThresholdType = "percentile", # 切分点计算阈值 breakpoint_threshold_amount: float | None = None, # 切分后的文档块数量 number_of_chunks: int | None = None, # 句子切分规则 sentence_split_regex: str = r"(?<=[.?!])\s+", # 最小文档块大小 min_chunk_size: int | None = None )
句子切分
这一步是通过特定规则将文档切分为一个个句子,在 SemanticChunker 中通过参数 sentence_split_regex 来设置规则进行切分,默认值为 r"(?<=[.?!])\s+",这是以英文的句号、问号、感叹号来进行切分的,而且是对比较规范的英文行文,也就是这三种标点后还跟空白字符的。如果要对中文文档切分,那就需要将这个正则表达式替换成能切分中文的,例如:r"(?<=[。?!\n])",也就是以中文的句号、问号、感叹号以及换行符来进行切分。
SemanticChunker的实现源码如下:
复制import re def _get_single_sentences_list(self, text: str) -> List[str]: return re.split(self.sentence_split_regex, text)
句子 embedding
这一步是将每个句子进行 embedding,理论上接着就以每个句子 embedding 结果来计算相邻句子的距离就可以了。但通过实际操作发现对单个句子处理噪音比较大,后续切分的效果并不理想,因此 SemanticChunker 通过 buffer_size 来控制当前句子前、后各取几个句子组成一组来计算 embedding 并计算余弦距离。例如buffer_size设置为为1(默认值),表示取当前句子前、后各取1个句子组成一组来计算 embedding。
SemanticChunker的实现源码如下:
首先根据buffer_size得到当前句子的组合。
复制def combine_sentences(sentences: List[dict], buffer_size: int = 1) -> List[dict]: for i inrange(len(sentences)): # 创建一个字符串变量来保存连接的句子 combined_sentence = "" # 添加当前句子之前 buffer_size 个句子 for j inrange(i - buffer_size, i): if j >= 0: combined_sentence += sentences[j]["sentence"] + " " # 添加当前句子 combined_sentence += sentences[i]["sentence"] # 添加当前句子之后 buffer_size 个句子 for j inrange(i + 1, i + 1 + buffer_size): if j < len(sentences): combined_sentence += " " + sentences[j]["sentence"] # 将合并好的句子存储在当前的句子 combined_sentence 中 sentences[i]["combined_sentence"] = combined_sentence return sentences
然后根据通过参数 embeddings传入的向量模型对句子组合进行 embedding。
复制def _calculate_sentence_distances(self, single_sentences_list: List[str]) -> Tuple[List[float], List[dict]]: _sentences = [ {"sentence": x, "index": i} for i, x in enumerate(single_sentences_list) ] sentences = combine_sentences(_sentences, self.buffer_size) embeddings = self.embeddings.embed_documents( [x["combined_sentence"] for x in sentences] ) for i, sentence in enumerate(sentences): sentence["combined_sentence_embedding"] = embeddings[i] return calculate_cosine_distances(sentences)
计算相邻句子(组)余弦距离
这一步就是通过计算相邻句子(组) 的余弦相似度来得到相邻句子(组) 的余弦距离。
将横轴记为句子(组)的序号,纵轴为相邻句子(组) 的余弦距离,就可得到下面类似的图:
SemanticChunker的实现源码如下:
复制from langchain_community.utils.math import cosine_similarity defcalculate_cosine_distances(sentences: List[dict]) -> Tuple[List[float], List[dict]]: distances = [] for i inrange(len(sentences) - 1): embedding_current = sentences[i]["combined_sentence_embedding"] embedding_next = sentences[i + 1]["combined_sentence_embedding"] # 计算余弦相似度 similarity = cosine_similarity([embedding_current], [embedding_next])[0][0] # 转换成余弦距离 distance = 1 - similarity distances.append(distance) # 保存余弦距离 sentences[i]["distance_to_next"] = distance # 【可选】最后一个句子的处理 # sentences[-1]['distance_to_next'] = None # 或其它默认值 return distances, sentences
计算切分点
如何计算切分点,SemanticChunker给出了4种方法:
- percentile: 分位法,默认方法。将所有余弦距离在第 X 分位数的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。第 X 分位数可通过breakpoint_threshold_amount 设置,默认为 95。还可以通过 number_of_chunks 指定切分后的文档块总数量,采用线性插值的方式反向推导出该分位数;
SemanticChunker的实现源码如下:
复制import numpy as np def_calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]: # 第一种方式:指定分位数 return cast( float, np.percentile(distances, self.breakpoint_threshold_amount), ), distances # 第二种方式:通过 number_of_chunks 反向推导分位数 x1, y1 = len(distances), 0.0 x2, y2 = 1.0, 100.0 x = max(min(self.number_of_chunks, x1), x2) if x2 == x1: y = y2 else: y = y1 + ((y2 - y1) / (x2 - x1)) * (x - x1) # 线性插值 y = min(max(y, 0), 100) return cast( float, np.percentile(distances, y),), distances
- standard_deviation: 标准差偏离法,是统计学中表示偏离的常规方法,这种方法比较适合正态分布。将所有余弦距离的平均值加上 X 倍的所有余弦距离标准差的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。X 倍可通过breakpoint_threshold_amount 设置,默认为 3,这是最常用的值;
SemanticChunker的实现源码如下:
复制import numpy as np def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]: return cast( float, np.mean(distances) + self.breakpoint_threshold_amount * np.std(distances),), distances
- interquartile: 四分位距法,是统计学中表示偏离的另一种常规方法,这种方法计算分位数,所以数据分布不那么正态问题也不大。将所有余弦距离的平均值加上 X 倍的所有余弦距离四分位距的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。X 倍可通过breakpoint_threshold_amount 设置,默认为 1.5,也是最常用的值;
SemanticChunker的实现源码如下:
复制import numpy as np def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]: # 取出25分位(下四分位)和75分位(上四分位)的数值 q1, q3 = np.percentile(distances, [25, 75]) # 计算两个分位的差值(四分位距) iqr = q3 - q1 return np.mean(distances) + self.breakpoint_threshold_amount * iqr, distances
- gradient: 梯度法。首先计算所有余弦距离的变化梯度,变化梯度计算出来后,就可以知道哪个地方余弦距离变化得快,然后将所有变化梯度在第 X 分位数的值作为阈值,并在那些余弦距离超过该阈值的位置进行切分。第 X 分位数可通过breakpoint_threshold_amount 设置,默认为 95。
SemanticChunker的实现源码如下:
复制import numpy as np def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]: # 计算所有余弦距离的变化梯度 distance_gradient = np.gradient(distances, range(0, len(distances))) return cast( float, np.percentile(distance_gradient, self.breakpoint_threshold_amount)), distance_gradient
按切分点切分
通过第4步各方法得到切分点后,就可以按切分点对文档进行切分(通过设置min_chunk_size控制合并较小的块),就可得到下面类似的图(包括切分位置以及切片):
SemanticChunker的实现源码如下:
复制def split_text(self, text: str,) -> List[str]: # 计算相邻句子的余弦距离 distances, sentences = self._calculate_sentence_distances(single_sentences_list) # 计算切分点 breakpoint_distance_threshold, breakpoint_array = self._calculate_breakpoint_threshold(distances) indices_above_thresh = [ i for i, x inenumerate(breakpoint_array) if x > breakpoint_distance_threshold ] chunks = [] start_index = 0 # 遍历切分点来分割句子 for index in indices_above_thresh: end_index = index group = sentences[start_index : end_index + 1] combined_text = " ".join([d["sentence"] for d in group]) # 通过设置 min_chunk_size 来合并较小的文档块 if ( self.min_chunk_size isnotNone andlen(combined_text) < self.min_chunk_size ): continue chunks.append(combined_text) start_index = index + 1 if start_index < len(sentences): combined_text = " ".join([d["sentence"] for d in sentences[start_index:]]) chunks.append(combined_text) return chunks
代码实践
原 TypeScript 项目已使用 Python 进行了重构,后续将优先使用 Python 进行代码实践和讲解。
其中:RAG.libs 中是封装好的各种不同作用的模块,如 RAG/libs/text_splitter.py 是封装好的文档切分器,RAG/libs/evaluator.py 是封装好的评估器,因此文中不再贴具体的代码,如需查看具体代码实现,请移步到 github 代码仓库中查看。
本文完整代码地址[2]:https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py
先看下基于句子余弦距离的语义切分的评估结果:
从评估结果来看,相较于 RecursiveCharacterTextSplitter 的切分方法,在上下文召回率、上下文相关性以及答案准确性都有不同程度的提升。
加载文件
复制from RAG.libs.file_loader import FileLoader file_loader = FileLoader( file_path="RAG/datas/2024少儿编程教育行业发展趋势报告.md", provider="customMarkdownLoader", ) documents = file_loader.load()
语义切分
因为我们的文档只要是中文,因此需要将 sentence_split_regex修改成可对中文切分的规则,如:r"(?<=[。?!\n])"。
复制from langchain_experimental.text_splitter import SemanticChunker from RAG.libs.embedding import Embedding # 向量模型 embeddings = Embedding(model="nomic-embed-text", provider="ollama") # 使用 SemanticChunker 切分 text_splitter = SemanticChunker( embeddings=embeddings, breakpoint_threshold_type="percentile", sentence_split_regex=r"(?<=[。?!\n])", ) documents = text_splitter.split_documents(documents=documents)
切分后处理
使用SemanticChunker进行切分,会出现较短或者较长的切片。比如通过percentile进行切分后的结果可以看到,最小的文档块大小只有 1,最大的文档块大小有 3346。因此,为了更好的检索效果,我们一般需要=对较长的文档做二次切分、对较短的文档进行合并和添加标题等其它切分后处理。
复制count 75 // 总数 mean 731 // 平均值 std 685 // 标准差 min 1 // 最小值 25% 218 // 25分位值 50% 568 // 50分位值 75% 990 // 75分位值 90% 1535 // 90分位值 97% 2577 // 97分位值 99% 2876 // 99分位值 max 3346 // 最大值
将较大的文档块进行二次切分、合并较小的块和添加标题:
复制from RAG.libs.text_splitter import ( TextSplitter, merge_small_documents, add_headers_to_documents, ) # 将较大的文档块进行二次切分 text_splitter = TextSplitter( provider="recursiveCharacter", chunk_size=500, chunk_overlap=0, ) documents = text_splitter.split_documents(documents) # 合并较小的块和添加标题 documents = merge_small_documents(documents, merge_max_length=100) documents = add_headers_to_documents(documents)
效果评估
复制from RAG.libs.evaluator import BatchEvaluator eval_result = BatchEvaluator( chat_model=chat_model, vector_store=vector_store, qa_data=qa_data, top_k=3, filter=filter, output_path=output_path, )
结语
本文介绍了一种更智能的切分方法 - 基于句子余弦距离的语义切分,并通过 langchain-experimental 中的 SemanticChunker 的源码来带大家了解了语义切分的实现原理。
从最后评估结果来看,相较于 RecursiveCharacterTextSplitter 的切分方法,在上下文召回率、上下文相关性以及答案准确性都有不同程度的提升,这说明通过基于句子余弦距离的语义切分方法对文档切分优化具有一定的可行性,大家可以根据自己的实际情况进一步验证,欢迎大家留言交流。
引用链接
[1] SemanticChunker: https://github.com/langchain-ai/langchain-experimental/blob/main/libs/experimental/langchain_experimental/text_splitter.py#L99
[2] 本文完整代码地址: https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py