游乐游手机版
首页/AI热点日报/热点详情

LightRAG实现原理:文档解析与插入详解

类型:热点整理2026-06-28
概述 本文深度剖析LightRAG框架在文档插入环节的具体实现方式。如果你对其内部运作流程感兴趣,读完本篇应该能梳理出一条清晰的主线,并清楚该从哪些细节入手进一步深入探究。 我们先从一个最基础的例子入手,感受一下它的工作逻辑。 文档查询的基本使用 下面这段代码可以看作是LightRAG的“hello

概述

本文深度剖析LightRAG框架在文档插入环节的具体实现方式。如果你对其内部运作流程感兴趣,读完本篇应该能梳理出一条清晰的主线,并清楚该从哪些细节入手进一步深入探究。

LightRAG实现原理分析-文档的解析和插入

我们先从一个最基础的例子入手,感受一下它的工作逻辑。

文档查询的基本使用

下面这段代码可以看作是LightRAG的“hello world”——打开一个文件,将内容插入系统,随后即可直接向它提问并获取回答。

WORKING_DIR = "./dickens"

if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)
# 构建LightRAG对象    
rag = LightRAG(
    working_dir=WORKING_DIR,
    llm_model_func=gpt_4o_mini_complete,  # Use gpt_4o_mini_complete LLM model
    # llm_model_func=gpt_4o_complete  # Optionally, use a stronger model
)
# 打开文档,并插入到LightRAG中
with open("./dickens/book.txt", "r", encoding="utf-8") as f:
    rag.insert(f.read())

# 执行原始的本地查询
print(
    rag.query("What are the top themes in this story?", param=QueryParam(mode="naive"))
)

文档插入过程实现分析

那么,具体的实现函数是什么样的呢?其实入口非常简洁,insert方法本质上就是调用了一个异步的ainsert方法。

class LightRAG:
    def insert(self, string_or_strings):
        loop = always_get_an_event_loop()
        return loop.run_until_complete(self.ainsert(string_or_strings))

ainsert()函数实现分析

从整体逻辑来看,ainsert主要完成了以下几步操作:

(1)为每个传入的文档生成唯一ID,同时创建一个字典,并自动清除内容前后的空白字符。

(2)检查这些文档中哪些是全新的——如果全部是已有内容,则记录日志,直接结束流程。

(3)仅处理新文档,按固定大小将其切分为块,再执行一轮去重检查,新的保留,旧的跳过。

(4)将新生成的分块存储起来,同时将其向量表示写入向量数据库。

(5)从这些分块中抽取实体和关系,如果没有发现新的内容,同样直接返回。

(6)将抽取出的实体和关系全部写入图数据库中。

实现代码如下

async def ainsert(self, string_or_strings):
    # 定义异步插入方法,接收字符串或字符串列表参数
    
    update_storage = False
    # 初始化存储更新标志为False
    
    try:
        if isinstance(string_or_strings, str):
            string_or_strings = [string_or_strings]
        # 如果输入是单个字符串,转换为列表形式

        new_docs = {
            compute_mdhash_id(c.strip(), prefix="doc-"): {"content": c.strip()}
            for c in string_or_strings
        }
        # 为每个文档生成唯一ID并创建文档字典,移除首尾空白

        _add_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
        # 检查哪些文档ID是新的(未存储过的)

        new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
        # 只保留新文档

        if not len(new_docs):
            logger.warning("All docs are already in the storage")
            return
        # 如果没有新文档,记录警告并返回

        update_storage = True
        logger.info(f"[New Docs] inserting {len(new_docs)} docs")
        # 设置更新标志,记录新文档数量

        inserting_chunks = {}
        # 初始化分块字典

        for doc_key, doc in tqdm_async(new_docs.items(), desc="Chunking documents", unit="doc"):
            # 遍历每个文档,显示进度条
            
            chunks = {
                compute_mdhash_id(dp["content"], prefix="chunk-"): {
                    **dp,
                    "full_doc_id": doc_key,
                }
                for dp in chunking_by_token_size(
                    doc["content"],
                    overlap_token_size=self.chunk_overlap_token_size,
                    max_token_size=self.chunk_token_size,
                    tiktoken_model=self.tiktoken_model_name,
                )
            }
            # 对文档内容进行分块,为每个分块生成ID,并关联原始文档ID

            inserting_chunks.update(chunks)
            # 将当前文档的分块添加到总分块字典中

        _add_chunk_keys = await self.text_chunks.filter_keys(list(inserting_chunks.keys()))
        # 检查哪些分块ID是新的

     inserting_chunks = {k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys}
        # 只保留新分块

        if not len(inserting_chunks):
            logger.warning("All chunks are already in the storage")
            return
        # 如果没有新分块,记录警告并返回

        logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")
        # 记录新分块数量

        await self.chunks_vdb.upsert(inserting_chunks)
        # 将分块存入向量数据库

        logger.info("[Entity Extraction]...")
        maybe_new_kg = await extract_entities(
            inserting_chunks,
            knowledge_graph_inst=self.chunk_entity_relation_graph,
            entity_vdb=self.entities_vdb,
            relationships_vdb=self.relationships_vdb,
            global_config=asdict(self),
        )
        # 从分块中提取实体和关系

        if maybe_new_kg is None:
            logger.warning("No new entities and relationships found")
            return
        # 如果没有找到新实体和关系,记录警告并返回

        self.chunk_entity_relation_graph = maybe_new_kg
        # 更新知识图谱

        await self.full_docs.upsert(new_docs)
        await self.text_chunks.upsert(inserting_chunks)
        # 将完整文档和分块存入各自的存储

    finally:
        if update_storage:
            await self._insert_done()
        # 如果进行了存储更新,执行清理工作

小结

总体来看,LightRAG处理新文档的策略可概括为三个核心步骤:先对文档进行分块并计算向量表示,再深入文本挖掘,从中提取出实体及其关系。这张流程图清晰地展示了完整的处理链路,掌握这个主干之后,再深入具体细节便会事半功倍。

来源:https://www.53ai.com/news/RAG/2025010464083.html

相关热点

继续查看同栏目近期热点。

延伸阅读

补充最近整理过的热点入口。