前面的章节已经讲清楚了模型、提示词、消息、检索、结构化输出和状态管理。从本章开始,我们把这些能力真正拼起来,做一个最具代表性的落地项目:企业知识库问答系统。
之所以把它作为第一个完整项目,是因为它几乎覆盖了 LangChain 落地最核心的能力:
本章不会只讲概念,而是按“项目目标 → 目录结构 → 数据处理 → 检索链 → 接口封装 → 评估优化”的顺序,构建一个可以继续扩展的工程骨架。
一个企业知识库问答系统,至少要满足以下要求:
为了避免一开始就做成“大而全”的系统,建议第一版只做这些能力:
第一版先不要引入 Agent。因为对知识库问答系统来说,稳定检索和引用正确,往往比“会不会调用工具”更重要。
enterprise-rag/ ├── app/ │ ├── main.py │ ├── chains.py │ ├── prompts.py │ ├── retriever.py │ ├── ingestion.py │ └── settings.py ├── data/ │ ├── raw/ │ └── processed/ ├── vectorstore/ ├── tests/ │ ├── test_ingestion.py │ └── test_rag_chain.py └── requirements.txt
这样的结构有几个优点:
from pathlib import Path from langchain_community.document_loaders import DirectoryLoader, TextLoader def load_kb_documents(data_dir: str): loader = DirectoryLoader( data_dir, glob="**/*.md", loader_cls=TextLoader, show_progress=True, ) return loader.load() docs = load_kb_documents("./data/raw") print(f"Loaded {len(docs)} documents")
导入之后,不要立刻切分。建议先处理:
def normalize_document(doc): text = doc.page_content text = text.replace(" ", " ") text = " ".join(line.strip() for line in text.splitlines() if line.strip()) doc.page_content = text doc.metadata.setdefault("doc_type", "policy") doc.metadata.setdefault("source", "unknown") return doc normalized_docs = [normalize_document(doc) for doc in docs]
from langchain_text_splitters import RecursiveCharacterTextSplitter splitter = RecursiveCharacterTextSplitter( chunk_size=600, chunk_overlap=100, separators=[" ", " ", "。", ",", " ", ""] ) chunks = splitter.split_documents(normalized_docs) print(f"Total chunks: {len(chunks)}")
from langchain_openai import OpenAIEmbeddings from langchain_chroma import Chroma embeddings = OpenAIEmbeddings(model="text-embedding-3-large") vectorstore = Chroma.from_documents( documents=chunks, embedding=embeddings, persist_directory="./vectorstore/chroma" ) print("Vector store created")
def build_retriever(vectorstore): return vectorstore.as_retriever( search_kwargs={"k": 4} ) retriever = build_retriever(vectorstore)
如果你的文档 metadata 较完整,还可以加过滤:
finance_retriever = vectorstore.as_retriever( search_kwargs={ "k": 4, "filter": {"department": "finance"} } )
对于企业知识库问答,Prompt 不应只追求“回答自然”,而要优先强调:
from langchain_core.prompts import ChatPromptTemplate rag_prompt = ChatPromptTemplate.from_template(""" 你是企业知识库助手。 请严格依据提供的资料回答问题,不得虚构制度、日期或流程。 如果资料不足,请明确回答“未找到足够依据”。 请按以下格式回答: 1. 直接答案 2. 依据说明 3. 引用来源 问题:{question} 资料: {context} """)
from langchain_openai import ChatOpenAI from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser def format_docs(docs): return " ".join( f"[来源: {doc.metadata.get('source', 'unknown')}] {doc.page_content}" for doc in docs ) model = ChatOpenAI(model="gpt-4o-mini", temperature=0) rag_chain = ( { "context": retriever | format_docs, "question": RunnablePassthrough(), } | rag_prompt | model | StrOutputParser() ) answer = rag_chain.invoke("差旅报销一般多久到账?") print(answer)
实际业务里,常见需求是不仅返回答案,还要把命中的文档片段一起带回前端。
def ask_with_sources(question: str): docs = retriever.invoke(question) context = format_docs(docs) answer = (rag_prompt | model | StrOutputParser()).invoke({ "question": question, "context": context, }) return { "answer": answer, "sources": [ { "source": doc.metadata.get("source", "unknown"), "preview": doc.page_content[:180], } for doc in docs ] } result = ask_with_sources("发票抬头写错了还能报销吗?") print(result["answer"]) print(result["sources"])
这样前端就能展示:
企业问答常常不是单轮的。比如:
这时就要引入消息历史。
from langchain_core.prompts import MessagesPlaceholder from langchain_core.messages import HumanMessage, AIMessage chat_rag_prompt = ChatPromptTemplate.from_messages([ ("system", "你是企业知识库助手,仅依据提供资料回答。"), MessagesPlaceholder("history"), ("human", "问题:{question} 资料:{context}") ]) history = [ HumanMessage(content="差旅报销多久到账?"), AIMessage(content="通常在审批通过后的 3 个工作日内到账。") ]
一个更像样的项目,不应把链直接散落在脚本里,而应封装成服务接口。
class EnterpriseRAGService: def __init__(self, retriever, model, prompt): self.retriever = retriever self.model = model self.prompt = prompt def ask(self, question: str): docs = self.retriever.invoke(question) context = format_docs(docs) answer = (self.prompt | self.model | StrOutputParser()).invoke({ "question": question, "context": context, }) return { "answer": answer, "documents": docs, }
之后你可以在 FastAPI、Flask 或 Streamlit 中调用这个服务。
第一版上线后,常见优化路线包括:
test_cases = [ { "question": "差旅报销多久到账?", "must_include": ["3 个工作日"], }, { "question": "试用期员工可以申请差旅报销吗?", "must_include": ["未找到足够依据"], }, ] for case in test_cases: answer = rag_chain.invoke(case["question"]) passed = all(keyword in answer for keyword in case["must_include"]) print(case["question"], passed)
这虽然只是最粗糙的评测,但已经比“凭感觉好像还行”强很多。
企业知识库问答系统最重要的不是“说得像不像人”,而是:
1. 为你的业务场景列出第一批最适合入库的 20 份文档。
2. 实现一个从本地目录导入、切分并写入 Chroma 的脚本。
3. 用 LCEL 构建一个最小 RAG 链,并返回引用来源。
4. 增加一个“资料不足时拒答”的测试样例。
5. 设计一个 API 输出格式,包含答案、来源、trace_id 和耗时。
from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() rag_service = EnterpriseRAGService(retriever=retriever, model=model, prompt=rag_prompt) class AskRequest(BaseModel): question: str @app.post("/ask") def ask(req: AskRequest): result = rag_service.ask(req.question) return { "answer": result["answer"], "sources": [doc.metadata for doc in result["documents"]], }
在实际项目中,知识库不会只建一次。你需要定期更新向量索引:
def rebuild_index(data_dir: str, persist_dir: str): docs = load_kb_documents(data_dir) docs = [normalize_document(doc) for doc in docs] chunks = splitter.split_documents(docs) Chroma.from_documents( documents=chunks, embedding=OpenAIEmbeddings(model="text-embedding-3-large"), persist_directory=persist_dir, ) print("Index rebuilt")
这个脚本后续可以接到: