这是本文档旧的修订版!
第四章:核心组件详解 - Chains(链)
在 LangChain 框架中,Chain(链)是最核心、最重要的概念之一。它将多个组件连接在一起,形成一个可执行的工作流程。通过 Chain,我们可以将 LLM、提示模板、输出解析器、外部工具等各种组件有机地组合起来,构建出功能强大的应用程序。
本章将深入讲解 LangChain 中 Chain 的各个方面,从基础概念到高级用法,帮助你全面掌握这一核心组件。
4.1 Chain 基础概念
4.1.1 什么是 Chain
Chain(链)是 LangChain 中用于将多个组件串联执行的抽象概念。简单来说,Chain 就是一系列按顺序执行的步骤,每个步骤接收输入、进行处理、产生输出,并将输出传递给下一个步骤。
可以把 Chain 想象成工厂的生产流水线:
- 原材料(输入)从流水线的一端进入
- 经过各个工位(Chain 中的各个组件)的加工处理
- 最终在流水线的另一端产出成品(输出)
在 LangChain 中,Chain 可以包含以下类型的组件:
- LLM(大型语言模型):进行文本生成、推理等操作
- Prompt Templates(提示模板):管理和格式化输入提示
- Output Parsers(输出解析器):解析和结构化 LLM 的输出
- Tools(工具):与外部 API、数据库等进行交互
- 其他 Chain:实现 Chain 的嵌套和组合
4.1.2 Chain 的工作原理
Chain 的工作流程遵循输入-处理-输出的模式:
# Chain 的基本工作流程示意 输入数据 → [组件1处理] → 中间结果1 → [组件2处理] → 中间结果2 → ... → [组件N处理] → 最终输出
每个 Chain 都实现了两个核心方法:
1. _call 方法(同步调用)
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """ 执行 Chain 的核心逻辑 Args: inputs: 输入字典,包含 Chain 需要的所有输入参数 Returns: 输出字典,包含 Chain 产生的所有输出结果 """ # 实现具体的处理逻辑 pass
2. _acall 方法(异步调用)
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """ 异步执行 Chain 的核心逻辑 Args: inputs: 输入字典,包含 Chain 需要的所有输入参数 Returns: 输出字典,包含 Chain 产生的所有输出结果 """ # 实现具体的异步处理逻辑 pass
Chain 的基类 BaseChain 定义了所有 Chain 必须实现的接口:
from langchain.chains.base import Chain from typing import Dict, List, Any class MyCustomChain(Chain): """自定义 Chain 示例""" @property def input_keys(self) -> List[str]: """定义 Chain 需要的输入键""" return ["input_text"] @property def output_keys(self) -> List[str]: """定义 Chain 产生的输出键""" return ["output_text"] def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """同步执行逻辑""" input_text = inputs["input_text"] # 处理逻辑 output_text = f"处理后的结果: {input_text}" return {"output_text": output_text} async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """异步执行逻辑""" # 通常调用 _call 或实现真正的异步逻辑 return self._call(inputs)
4.1.3 Chain 的优势
使用 Chain 有以下几个显著优势:
1. 模块化设计 Chain 将复杂的任务分解为独立的、可重用的组件,每个组件只负责特定的功能。这使得代码更易理解、维护和测试。
# 不使用 Chain:所有逻辑混杂在一起 def process_document(text): # 格式化提示 prompt = f"请总结以下文本:\n\n{text}" # 调用 LLM response = llm.predict(prompt) # 解析输出 summary = response.strip() return summary # 使用 Chain:逻辑清晰分离 from langchain.chains import LLMChain from langchain.prompts import PromptTemplate summary_chain = LLMChain( llm=llm, prompt=PromptTemplate( template="请总结以下文本:\n\n{text}", input_variables=["text"] ) ) result = summary_chain.predict(text=input_text)
2. 可组合性 多个简单的 Chain 可以组合成复杂的 Chain,实现更强大的功能。
from langchain.chains import SimpleSequentialChain # 定义两个简单的 Chain chain1 = LLMChain(llm=llm, prompt=prompt1) chain2 = LLMChain(llm=llm, prompt=prompt2) # 组合成一个 SequentialChain combined_chain = SimpleSequentialChain(chains=[chain1, chain2])
3. 统一的接口 所有 Chain 都遵循相同的接口规范(input_keys、output_keys、_call、_acall),这使得不同类型的 Chain 可以无缝协作。
4. 内置功能支持 LangChain 的 Chain 内置了以下功能: * 内存管理:通过 Memory 组件维护对话历史 * 回调机制:支持回调函数进行日志记录、监控等 * 错误处理:提供统一的错误处理机制 * 配置管理:支持从配置文件加载 Chain
5. 便于调试和监控 Chain 的结构化设计使得调试更加容易,可以逐个组件检查输入输出,快速定位问题。
# 启用详细模式查看 Chain 的执行过程 from langchain.globals import set_debug set_debug(True) # 执行 Chain result = chain.invoke({"input": "测试输入"})
4.2 LLMChain
LLMChain 是 LangChain 中最基础、最常用的 Chain 类型。它将 PromptTemplate 和 LLM 组合在一起,实现从格式化输入到调用 LLM 的完整流程。
4.2.1 基础使用
最简单的 LLMChain 示例:
from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain # 1. 创建 LLM 实例 llm = OpenAI(temperature=0.7) # 2. 创建 PromptTemplate prompt = PromptTemplate( input_variables=["product"], template="为以下产品写一个吸引人的产品描述:\n\n产品名称:{product}\n\n产品描述:" ) # 3. 创建 LLMChain chain = LLMChain(llm=llm, prompt=prompt) # 4. 执行 Chain result = chain.predict(product="智能手表") print(result)
输出示例:
这款智能手表融合了时尚设计与先进技术,是您日常生活的完美伴侣。它不仅能精准记录您的运动数据、监测健康指标,还能与您的智能手机无缝连接,让您不错过任何重要信息。超长续航、防水设计,无论是商务场合还是户外运动,都能轻松应对。
使用 invoke 方法(推荐方式):
# LangChain 推荐使用 invoke 方法 result = chain.invoke({"product": "无线耳机"}) print(result["text"])
4.2.2 参数详解
LLMChain 支持多种参数配置,下面是详细说明:
1. 构造函数参数
from langchain.chains import LLMChain chain = LLMChain( # 必需参数 llm=llm, # LLM 实例,用于生成文本 prompt=prompt, # PromptTemplate 实例 # 可选参数 output_key="text", # 输出结果的键名,默认为 "text" verbose=False, # 是否输出详细日志 callback_manager=None, # 回调管理器 callbacks=None, # 回调函数列表 tags=None, # 标签列表,用于追踪 metadata=None, # 元数据字典 )
2. 输出键(output_key)
可以通过 output_key 参数自定义输出的键名:
from langchain.chains import LLMChain # 自定义输出键为 "description" chain = LLMChain( llm=llm, prompt=prompt, output_key="description" # 输出将使用 "description" 作为键 ) result = chain.invoke({"product": "蓝牙耳机"}) print(result["description"]) # 使用自定义的键名访问结果
3. 详细模式(verbose)
启用 verbose 模式可以查看 Chain 的执行过程:
chain = LLMChain( llm=llm, prompt=prompt, verbose=True # 启用详细输出 ) # 执行时会打印详细的执行信息 result = chain.invoke({"product": "平板电脑"})
输出示例:
> Entering new LLMChain chain... Prompt after formatting: 为以下产品写一个吸引人的产品描述: 产品名称:平板电脑 产品描述: > Finished chain.
4. 回调函数(callbacks)
可以通过回调函数监听 Chain 的执行事件:
from langchain.callbacks import StdOutCallbackHandler # 创建回调处理器 handler = StdOutCallbackHandler() chain = LLMChain( llm=llm, prompt=prompt, callbacks=[handler] # 添加回调 ) result = chain.invoke({"product": "智能音箱"})
自定义回调处理器:
from langchain.callbacks.base import BaseCallbackHandler class MyCallbackHandler(BaseCallbackHandler): """自定义回调处理器""" def on_chain_start(self, serialized, inputs, **kwargs): print(f"Chain 开始执行,输入: {inputs}") def on_chain_end(self, outputs, **kwargs): print(f"Chain 执行完成,输出: {outputs}") def on_llm_start(self, serialized, prompts, **kwargs): print(f"LLM 开始生成,提示: {prompts}") def on_llm_end(self, response, **kwargs): print(f"LLM 生成完成,响应: {response}") # 使用自定义回调 chain = LLMChain( llm=llm, prompt=prompt, callbacks=[MyCallbackHandler()] )
4.2.3 输出解析
LLMChain 默认返回原始文本,但通常我们需要将输出解析为结构化数据。
1. 使用 OutputParser
from langchain.output_parsers import CommaSeparatedListOutputParser from langchain.prompts import PromptTemplate # 创建列表输出解析器 output_parser = CommaSeparatedListOutputParser() # 获取格式指令 format_instructions = output_parser.get_format_instructions() # 创建带格式指令的提示模板 prompt = PromptTemplate( template="列出5种{category}。\n{format_instructions}", input_variables=["category"], partial_variables={"format_instructions": format_instructions} ) # 创建 LLMChain chain = LLMChain( llm=llm, prompt=prompt, output_parser=output_parser, # 添加输出解析器 output_key="items" ) # 执行并获取结构化结果 result = chain.invoke({"category": "水果"}) print(result["items"]) # 输出: ['苹果', '香蕉', '橙子', '葡萄', '西瓜']
2. 使用 PydanticOutputParser
对于更复杂的结构化输出,可以使用 PydanticOutputParser:
from pydantic import BaseModel, Field from langchain.output_parsers import PydanticOutputParser # 定义数据模型 class ProductInfo(BaseModel): name: str = Field(description="产品名称") price: float = Field(description="产品价格") features: list = Field(description="产品特点列表") target_audience: str = Field(description="目标用户群体") # 创建解析器 parser = PydanticOutputParser(pydantic_object=ProductInfo) # 创建提示模板 prompt = PromptTemplate( template="""根据以下产品描述,提取产品信息。 产品描述:{description} {format_instructions} """, input_variables=["description"], partial_variables={"format_instructions": parser.get_format_instructions()} ) # 创建 Chain chain = LLMChain( llm=llm, prompt=prompt, output_parser=parser, output_key="product_info" ) # 执行 description = """ 苹果最新推出的 iPhone 15 Pro,售价999美元起。 主要特点包括:钛金属设计、A17 Pro芯片、4800万像素主摄、USB-C接口。 主要面向追求高端体验的专业用户和科技爱好者。 """ result = chain.invoke({"description": description}) product = result["product_info"] print(f"产品名称: {product.name}") print(f"价格: ${product.price}") print(f"特点: {product.features}") print(f"目标用户: {product.target_audience}")
3. 自定义输出解析器
from langchain.schema import BaseOutputParser import json class JsonOutputParser(BaseOutputParser[dict]): """自定义 JSON 输出解析器""" def parse(self, text: str) -> dict: """解析 LLM 输出为 JSON 对象""" # 尝试从文本中提取 JSON try: # 查找 JSON 代码块 if "```json" in text: json_str = text.split("```json")[1].split("```")[0].strip() elif "```" in text: json_str = text.split("```")[1].split("```")[0].strip() else: json_str = text.strip() return json.loads(json_str) except json.JSONDecodeError as e: # 解析失败时返回原始文本 return {"raw_text": text, "error": str(e)} def get_format_instructions(self) -> str: return "请以 JSON 格式输出结果。" # 使用自定义解析器 parser = JsonOutputParser() prompt = PromptTemplate( template="""分析以下评论的情感。 评论:{review} 请以以下 JSON 格式返回结果: {{ "sentiment": "positive/negative/neutral", "confidence": 0.0-1.0, "keywords": ["关键词1", "关键词2"] }} """, input_variables=["review"] ) chain = LLMChain( llm=llm, prompt=prompt, output_parser=parser, output_key="analysis" ) result = chain.invoke({"review": "这个产品太棒了,完全超出预期!"}) print(result["analysis"])
4.3 SequentialChain
SequentialChain 允许我们将多个 Chain 串联起来,形成一个执行流水线。前一个 Chain 的输出可以作为后一个 Chain 的输入。
4.3.1 SimpleSequentialChain
SimpleSequentialChain 是最简单的顺序链,它按顺序执行多个 Chain,每个 Chain 只有一个输出,这个输出直接作为下一个 Chain 的输入。
基本使用:
from langchain.chains import LLMChain, SimpleSequentialChain from langchain.prompts import PromptTemplate # 第一步:生成产品名称 first_prompt = PromptTemplate( input_variables=["product_type"], template="为一款{product_type}起一个吸引人的产品名称:" ) chain_one = LLMChain(llm=llm, prompt=first_prompt) # 第二步:根据产品名称生成广告文案 second_prompt = PromptTemplate( input_variables=["product_name"], # 这里接收 chain_one 的输出 template="为名为'{product_name}'的产品写一段20字的广告语:" ) chain_two = LLMChain(llm=llm, prompt=second_prompt) # 创建 SimpleSequentialChain overall_chain = SimpleSequentialChain( chains=[chain_one, chain_two], verbose=True ) # 执行 result = overall_chain.run("运动鞋") print(result)
执行过程说明: 1. 输入 “运动鞋” 传递给 chain_one 2. chain_one 生成产品名称(如 “疾风跑鞋”) 3. “疾风跑鞋” 自动传递给 chain_two 作为输入 4. chain_two 生成广告语并返回
4.3.2 SequentialChain
SequentialChain 比 SimpleSequentialChain 更灵活,它允许: * 指定多个输入和输出变量 * 控制变量在 Chain 之间的传递方式 * 访问中间步骤的输出
基本使用:
from langchain.chains import LLMChain, SequentialChain from langchain.prompts import PromptTemplate # 第一步:生成产品名称 template1 = """你是一个产品经理。请为以下产品创建一个吸引人的名称。 产品类型:{product_type} 目标客户:{target_audience} 产品名称:""" prompt1 = PromptTemplate( input_variables=["product_type", "target_audience"], template=template1 ) chain1 = LLMChain( llm=llm, prompt=prompt1, output_key="product_name" # 指定输出键名 ) # 第二步:生成产品描述 template2 = """你是一个文案撰写专家。请为以下产品写一段描述。 产品名称:{product_name} 产品类型:{product_type} 产品描述:""" prompt2 = PromptTemplate( input_variables=["product_name", "product_type"], template=template2 ) chain2 = LLMChain( llm=llm, prompt=prompt2, output_key="description" ) # 第三步:生成营销文案 template3 = """你是一个营销专家。请根据以下信息创建一个营销口号。 产品名称:{product_name} 产品描述:{description} 营销口号:""" prompt3 = PromptTemplate( input_variables=["product_name", "description"], template=template3 ) chain3 = LLMChain( llm=llm, prompt=prompt3, output_key="slogan" ) # 创建 SequentialChain overall_chain = SequentialChain( chains=[chain1, chain2, chain3], input_variables=["product_type", "target_audience"], output_variables=["product_name", "description", "slogan"], verbose=True ) # 执行 result = overall_chain({ "product_type": "智能手环", "target_audience": "运动爱好者" }) print("产品名称:", result["product_name"]) print("产品描述:", result["description"]) print("营销口号:", result["slogan"])
SequentialChain 参数说明:
| 参数 | 说明 | 必需 |
| —— | —— | —— |
| chains | Chain 列表,按执行顺序排列 | 是 |
| input_variables | 整个 Chain 的输入变量列表 | 是 |
| output_variables | 最终输出的变量列表 | 否(默认输出所有) |
| verbose | 是否输出详细日志 | 否 |
4.3.3 输出传递机制
理解 SequentialChain 的输出传递机制对于构建复杂的 Chain 流水线至关重要。
变量传递规则: 1. 每个 Chain 的输出会自动添加到全局变量池中 2. 下游 Chain 可以从变量池中获取所需的输入 3. 变量名冲突时,后面的值会覆盖前面的值
示例:变量传递详解
from langchain.chains import LLMChain, SequentialChain from langchain.prompts import PromptTemplate # Chain A: 输入 type,输出 name 和 price prompt_a = PromptTemplate( input_variables=["type"], template="为{type}生成产品名称和价格(格式:名称|价格):" ) chain_a = LLMChain(llm=llm, prompt=prompt_a, output_key="name_price") # Chain B: 输入 name_price,解析出 name prompt_b = PromptTemplate( input_variables=["name_price"], template="从'{name_price}'中提取产品名称:" ) chain_b = LLMChain(llm=llm, prompt=prompt_b, output_key="name") # Chain C: 输入 name 和 type,输出 slogan prompt_c = PromptTemplate( input_variables=["name", "type"], template="为{name}({type})写广告语:" ) chain_c = LLMChain(llm=llm, prompt=prompt_c, output_key="slogan") # 组装 SequentialChain chain = SequentialChain( chains=[chain_a, chain_b, chain_c], input_variables=["type"], output_variables=["name_price", "name", "slogan"], verbose=True ) result = chain({"type": "咖啡机"}) print(result)
中间结果访问:
# SequentialChain 可以返回中间步骤的结果 result = overall_chain({ "product_type": "智能音箱", "target_audience": "年轻家庭" }) # 访问所有输出(包括中间结果) print("中间结果 - 产品名称:", result["product_name"]) print("中间结果 - 产品描述:", result["description"]) print("最终结果 - 营销口号:", result["slogan"])
4.4 RouterChain
RouterChain(路由链) 能够根据输入内容动态选择不同的处理路径。这类似于编程中的 switch-case 或 if-else 逻辑。
4.4.1 路由链的概念
路由链的核心思想是: * 接收用户输入 * 根据输入特征决定使用哪个“目的地”(destination) * 将输入路由到对应的处理 Chain
典型应用场景: * 客服系统:根据问题类型路由到不同部门(技术支持、售后服务、销售咨询) * 内容生成:根据主题路由到不同的写作风格(科技、财经、娱乐) * 数据处理:根据数据类型选择不同的处理方法(文本、数字、日期)
路由链的工作流程:
用户输入
↓
[路由判断] → 选择目的地
↓
[对应 Chain 处理]
↓
返回结果
4.4.2 根据输入选择不同处理路径
基础路由链示例:
from langchain.chains.router import MultiPromptChain from langchain.chains import LLMChain, ConversationChain from langchain.prompts import PromptTemplate from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE # 定义多个不同的提示模板(目的地) physics_template = """你是一个物理学专家。请用简单易懂的方式解释以下物理概念。 概念:{input} 解释:""" math_template = """你是一个数学家。请详细解答以下数学问题,并展示计算过程。 问题:{input} 解答:""" history_template = """你是一个历史学家。请从多角度分析以下历史事件。 事件:{input} 分析:""" computer_science_template = """你是一个计算机科学专家。请用代码示例说明以下概念。 概念:{input} 说明:""" # 创建目的地信息列表 prompt_infos = [ { "name": "physics", "description": "适合回答物理学相关问题", "prompt_template": physics_template, }, { "name": "math", "description": "适合回答数学问题", "prompt_template": math_template, }, { "name": "history", "description": "适合回答历史相关问题", "prompt_template": history_template, }, { "name": "computer_science", "description": "适合回答计算机科学问题", "prompt_template": computer_science_template, }, ] # 为每个目的地创建 Chain destination_chains = {} for p_info in prompt_infos: name = p_info["name"] prompt_template = p_info["prompt_template"] prompt = PromptTemplate(template=prompt_template, input_variables=["input"]) chain = LLMChain(llm=llm, prompt=prompt) destination_chains[name] = chain # 创建默认 Chain(当路由无法确定目的地时使用) default_chain = ConversationChain(llm=llm, output_key="text")
4.4.3 MultiPromptChain
MultiPromptChain 是 LangChain 提供的完整路由链实现,它结合了 LLMRouterChain 和多个目的地 Chain。
完整实现:
from langchain.chains.router import MultiPromptChain from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser from langchain.prompts import PromptTemplate # 构建路由提示模板 destinations = [f"{p['name']}: {p['description']}" for p in prompt_infos] destinations_str = "\n".join(destinations) router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format(destinations=destinations_str) # 创建路由 Chain router_prompt = PromptTemplate( template=router_template, input_variables=["input"], output_parser=RouterOutputParser(), ) router_chain = LLMRouterChain.from_llm(llm, router_prompt) # 创建 MultiPromptChain chain = MultiPromptChain( router_chain=router_chain, destination_chains=destination_chains, default_chain=default_chain, verbose=True ) # 测试不同输入 print("=== 物理问题 ===") result = chain.invoke({"input": "什么是相对论?"}) print(result["text"]) print("\n=== 数学问题 ===") result = chain.invoke({"input": "求解方程 x^2 - 5x + 6 = 0"}) print(result["text"]) print("\n=== 计算机问题 ===") result = chain.invoke({"input": "解释什么是递归函数"}) print(result["text"]) print("\n=== 通用问题 ===") result = chain.invoke({"input": "今天天气怎么样?"}) print(result["text"])
路由决策解析:
当执行上述代码时,LLMRouterChain 会根据输入决定目的地:
# 输入 "什么是相对论?" # 路由决策:destination: physics # 输入 "求解方程 x^2 - 5x + 6 = 0" # 路由决策:destination: math # 输入 "今天天气怎么样?" # 路由决策:destination: DEFAULT(因为没有匹配的目的地)
自定义路由逻辑:
如果需要更复杂的路由逻辑,可以自定义 RouterChain:
from langchain.chains import LLMChain from langchain.chains.base import Chain from typing import Dict, List class CustomRouterChain(Chain): """自定义路由链 - 基于关键词匹配""" destination_chains: Dict[str, Chain] default_chain: Chain @property def input_keys(self) -> List[str]: return ["input"] @property def output_keys(self) -> List[str]: return ["text"] def _call(self, inputs: Dict[str, str]) -> Dict[str, str]: input_text = inputs["input"].lower() # 定义关键词映射 keyword_map = { "physics": ["物理", "相对论", "量子", "牛顿", "能量"], "math": ["数学", "方程", "计算", "求解", "微积分"], "code": ["代码", "编程", "函数", "算法", "python", "java"], } # 匹配目的地 for destination, keywords in keyword_map.items(): if any(keyword in input_text for keyword in keywords): if destination in self.destination_chains: return self.destination_chains[destination](inputs) # 默认 Chain return self.default_chain(inputs) async def _acall(self, inputs: Dict[str, str]) -> Dict[str, str]: return self._call(inputs) # 使用自定义路由链 custom_router = CustomRouterChain( destination_chains=destination_chains, default_chain=default_chain ) result = custom_router.invoke({"input": "请解释Python的装饰器"}) print(result)
4.5 TransformChain
TransformChain 用于在 Chain 流水线中进行数据转换。当你需要在两个 Chain 之间对数据进行清洗、格式化或转换时,TransformChain 非常有用。
4.5.1 数据转换
TransformChain 的核心功能是对输入数据进行自定义转换。
基本结构:
from langchain.chains import TransformChain # 定义转换函数 def transform_func(inputs: dict) -> dict: """ 转换函数接收输入字典,返回转换后的字典 """ original_text = inputs["text"] # 执行转换操作 transformed_text = original_text.upper() # 示例:转为大写 return {"transformed_text": transformed_text} # 创建 TransformChain transform_chain = TransformChain( input_variables=["text"], output_variables=["transformed_text"], transform=transform_func ) # 执行 result = transform_chain.invoke({"text": "hello world"}) print(result) # {'text': 'hello world', 'transformed_text': 'HELLO WORLD'}
4.5.2 自定义转换函数
TransformChain 的真正威力在于可以定义任意复杂的转换逻辑。
示例1:文本清洗
import re from langchain.chains import TransformChain, LLMChain, SequentialChain from langchain.prompts import PromptTemplate def clean_text(inputs: dict) -> dict: """清洗和格式化文本""" text = inputs["raw_text"] # 移除多余空白 text = re.sub(r'\s+', ' ', text) # 移除特殊字符 text = re.sub(r'[^\w\s\.\,\?\!\u4e00-\u9fff]', '', text) # 限制长度 text = text[:500] return {"cleaned_text": text.strip()} # 创建 TransformChain clean_chain = TransformChain( input_variables=["raw_text"], output_variables=["cleaned_text"], transform=clean_text ) # 后续 LLMChain 使用清洗后的文本 prompt = PromptTemplate( input_variables=["cleaned_text"], template="请总结以下内容:\n\n{cleaned_text}\n\n总结:" ) summary_chain = LLMChain(llm=llm, prompt=prompt, output_key="summary") # 组合成 SequentialChain overall_chain = SequentialChain( chains=[clean_chain, summary_chain], input_variables=["raw_text"], output_variables=["cleaned_text", "summary"], verbose=True ) # 测试 dirty_text = """ 这是一段包含 大量多余空格和特殊字符!!!@@@###的文本。 需要清洗后才能使用。 """ result = overall_chain.invoke({"raw_text": dirty_text}) print("清洗后:", result["cleaned_text"]) print("总结:", result["summary"])
示例2:数据格式转换
import json from langchain.chains import TransformChain def parse_json_input(inputs: dict) -> dict: """将 JSON 字符串解析为结构化数据""" json_str = inputs["json_data"] try: data = json.loads(json_str) return { "parsed_data": data, "name": data.get("name", ""), "age": data.get("age", 0), "valid": True } except json.JSONDecodeError: return { "parsed_data": None, "name": "", "age": 0, "valid": False, "error": "Invalid JSON" } def format_output(inputs: dict) -> dict: """将输出格式化为特定格式""" text = inputs["llm_output"] return { "formatted_output": f"【AI回复】\n{text}\n【结束】", "word_count": len(text.split()) } # 创建 TransformChains parse_chain = TransformChain( input_variables=["json_data"], output_variables=["parsed_data", "name", "age", "valid"], transform=parse_json_input ) format_chain = TransformChain( input_variables=["llm_output"], output_variables=["formatted_output", "word_count"], transform=format_output ) # 示例 JSON 数据 json_data = '{"name": "张三", "age": 28, "interests": ["编程", "阅读"]}' result = parse_chain.invoke({"json_data": json_data}) print(result)
示例3:结合 TransformChain 和 LLMChain
from langchain.chains import TransformChain, LLMChain, SequentialChain from langchain.prompts import PromptTemplate def extract_keywords(inputs: dict) -> dict: """提取文本关键词""" import jieba text = inputs["content"] # 使用 jieba 分词提取关键词 words = jieba.lcut(text) # 过滤短词和停用词 keywords = [w for w in words if len(w) > 1] return { "keywords": keywords[:10], # 取前10个关键词 "keyword_str": ", ".join(keywords[:10]) } # TransformChain 提取关键词 keyword_chain = TransformChain( input_variables=["content"], output_variables=["keywords", "keyword_str"], transform=extract_keywords ) # LLMChain 基于关键词生成标题 title_prompt = PromptTemplate( input_variables=["keyword_str"], template="""根据以下关键词,生成一个吸引人的文章标题(不超过20字): 关键词:{keyword_str} 标题:""" ) title_chain = LLMChain(llm=llm, prompt=title_prompt, output_key="title") # 组合 Chain content_chain = SequentialChain( chains=[keyword_chain, title_chain], input_variables=["content"], output_variables=["keywords", "keyword_str", "title"], verbose=True ) # 测试 article = """ 人工智能(AI)正在改变我们的生活方式。从智能手机助手到自动驾驶汽车, AI技术已经渗透到日常生活的方方面面。机器学习、深度学习等技术的发展, 使得AI能够处理越来越复杂的任务。 """ result = content_chain.invoke({"content": article}) print(f"关键词: {result['keyword_str']}") print(f"生成标题: {result['title']}")
4.6 自定义 Chain
当 LangChain 内置的 Chain 无法满足需求时,可以通过继承 BaseChain 创建自定义 Chain。
4.6.1 继承 BaseChain
创建自定义 Chain 需要继承 BaseChain 并实现以下方法:
| 方法/属性 | 说明 |
| ———– | —— |
| input_keys | 返回 Chain 需要的输入键列表 |
| output_keys | 返回 Chain 产生的输出键列表 |
| _call | 同步执行逻辑 |
| _acall | 异步执行逻辑(可选) |
最简单的自定义 Chain:
from langchain.chains.base import Chain from typing import Dict, List, Any class EchoChain(Chain): """简单的回显 Chain - 将输入原样返回""" @property def input_keys(self) -> List[str]: """定义输入键""" return ["message"] @property def output_keys(self) -> List[str]: """定义输出键""" return ["echo"] def _call(self, inputs: Dict[str, Any]) -> Dict[str, str]: """同步执行""" message = inputs["message"] return {"echo": f"Echo: {message}"} async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, str]: """异步执行""" # 通常直接调用 _call 或实现真正的异步逻辑 return self._call(inputs) # 使用自定义 Chain echo = EchoChain() result = echo.invoke({"message": "Hello!"}) print(result) # {'message': 'Hello!', 'echo': 'Echo: Hello!'}
4.6.2 实现必要方法
完整的自定义 Chain 示例:
from langchain.chains.base import Chain from langchain.llms.base import BaseLLM from langchain.prompts import PromptTemplate from typing import Dict, List, Any, Optional import time class TranslationChain(Chain): """ 多语言翻译 Chain 支持指定源语言和目标语言进行翻译 """ # 定义 Chain 的配置参数 llm: BaseLLM source_lang: str = "中文" target_lang: str = "英文" verbose_timing: bool = False @property def input_keys(self) -> List[str]: """输入需要一个文本""" return ["text"] @property def output_keys(self) -> List[str]: """输出包含翻译结果和元信息""" return ["translation", "source_lang", "target_lang", "processing_time"] def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """执行翻译""" start_time = time.time() text = inputs["text"] # 构建翻译提示 prompt_text = f"""请将以下{self.source_lang}翻译成{self.target_lang}: {self.source_lang}:{text} {self.target_lang}:""" # 调用 LLM translation = self.llm.predict(prompt_text) processing_time = time.time() - start_time if self.verbose_timing: print(f"翻译耗时: {processing_time:.2f}秒") return { "translation": translation.strip(), "source_lang": self.source_lang, "target_lang": self.target_lang, "processing_time": processing_time } async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """异步执行""" # 实际应用中可以使用 ainvoke 或异步 LLM 调用 return self._call(inputs) def translate(self, text: str) -> str: """便捷方法 - 只返回翻译结果""" result = self.invoke({"text": text}) return result["translation"] # 使用自定义 TranslationChain from langchain.llms import OpenAI llm = OpenAI(temperature=0.3) translator = TranslationChain( llm=llm, source_lang="中文", target_lang="英文", verbose_timing=True ) # 方式1:使用 invoke result = translator.invoke({"text": "人工智能正在改变世界"}) print(f"原文: 人工智能正在改变世界") print(f"译文: {result['translation']}") print(f"耗时: {result['processing_time']:.2f}秒") # 方式2:使用便捷方法 translation = translator.translate("机器学习是人工智能的一个重要分支") print(f"便捷翻译: {translation}")
带可选参数的自定义 Chain:
from pydantic import Field class SummarizationChain(Chain): """ 文本摘要 Chain 支持指定摘要长度和风格 """ llm: BaseLLM = Field(..., description="LLM 实例") max_length: int = Field(default=100, description="摘要最大字数") style: str = Field(default="concise", description="摘要风格: concise/detailed/bullet") @property def input_keys(self) -> List[str]: return ["text"] @property def output_keys(self) -> List[str]: return ["summary", "word_count", "style_used"] def _get_style_instruction(self) -> str: """根据风格获取指令""" styles = { "concise": "提供一个简洁的摘要", "detailed": "提供一个详细的摘要,包含主要观点", "bullet": "以 bullet points 形式列出主要要点" } return styles.get(self.style, styles["concise"]) def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]: text = inputs["text"] style_instruction = self._get_style_instruction() prompt = f"""请对以下文本进行摘要。{style_instruction},不超过{self.max_length}字。 文本: {text} 摘要:""" summary = self.llm.predict(prompt).strip() word_count = len(summary) return { "summary": summary, "word_count": word_count, "style_used": self.style } async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]: return self._call(inputs) # 使用 summarizer = SummarizationChain( llm=llm, max_length=50, style="bullet" ) long_text = """ 人工智能(Artificial Intelligence,简称AI)是计算机科学的一个分支, 致力于创造能够模拟人类智能的系统。这些系统可以学习、推理、感知环境、 理解语言并做出决策。AI技术包括机器学习、深度学习、自然语言处理、 计算机视觉等多个领域。近年来,随着计算能力的提升和数据量的增加, AI技术取得了突破性进展,在医疗诊断、自动驾驶、金融分析等领域 展现出巨大潜力。 """ result = summarizer.invoke({"text": long_text}) print(f"摘要: {result['summary']}") print(f"字数: {result['word_count']}") print(f"风格: {result['style_used']}")
4.6.3 完整示例:智能客服 Chain
下面是一个完整的自定义 Chain 示例,模拟智能客服系统:
from langchain.chains.base import Chain from langchain.llms.base import BaseLLM from langchain.memory import ConversationBufferMemory from pydantic import Field from typing import Dict, List, Any import re class CustomerServiceChain(Chain): """ 智能客服 Chain 功能: 1. 情感分析 - 判断客户情绪 2. 意图识别 - 识别客户需求类型 3. 生成回复 - 根据情绪和意图生成合适回复 4. 维护对话历史 """ llm: BaseLLM = Field(..., description="LLM 实例") memory: ConversationBufferMemory = Field( default_factory=ConversationBufferMemory, description="对话记忆" ) company_name: str = Field(default="我们的公司", description="公司名称") # 内部状态 emotion_threshold: float = 0.5 @property def input_keys(self) -> List[str]: return ["customer_message"] @property def output_keys(self) -> List[str]: return [ "response", "detected_emotion", "detected_intent", "needs_escalation" ] def _analyze_emotion(self, message: str) -> str: """分析客户情绪""" # 负面关键词 negative_words = ['生气', '愤怒', '失望', '糟糕', '差', '慢', '坏', '投诉', '退货', '退款'] # 正面关键词 positive_words = ['满意', '好', '棒', '感谢', '喜欢', '推荐', '不错'] message_lower = message.lower() negative_count = sum(1 for w in negative_words if w in message_lower) positive_count = sum(1 for w in positive_words if w in message_lower) if negative_count > positive_count: return "negative" elif positive_count > negative_count: return "positive" else: return "neutral" def _detect_intent(self, message: str) -> str: """识别客户意图""" intent_patterns = { "inquiry": ['怎么', '如何', '什么', '多少', '价格'], "complaint": ['投诉', '不满', '问题', '故障', '坏'], "purchase": ['买', '订购', '购买', '下单'], "refund": ['退', '退款', '退货', '换货'], "technical": ['安装', '设置', '配置', '连接'] } message_lower = message.lower() for intent, patterns in intent_patterns.items(): if any(p in message_lower for p in patterns): return intent return "general" def _generate_response(self, message: str, emotion: str, intent: str) -> str: """生成客服回复""" # 准备上下文 history = self.memory.load_memory_variables({}).get("history", "") # 根据情绪和意图调整语气 tone_instruction = { "negative": "客户情绪负面,需要表达歉意和同理心,语气要诚恳", "positive": "客户情绪积极,可以友好热情地回应", "neutral": "保持专业、礼貌的语气" }.get(emotion, "保持专业语气") intent_instruction = { "inquiry": "回答客户的询问", "complaint": "处理投诉,表示理解并提供解决方案", "purchase": "协助完成购买流程", "refund": "处理退款/退货请求,了解具体情况", "technical": "提供技术支持", "general": "提供一般性帮助" }.get(intent, "提供帮助") prompt = f"""你是{self.company_name}的智能客服助手。 对话历史: {history} 客户消息:{message} 客户情绪:{emotion} 客户意图:{intent} 回复要求: - {tone_instruction} - {intent_instruction} - 简洁明了,不超过100字 - 使用中文 你的回复:""" response = self.llm.predict(prompt).strip() return response def _needs_escalation(self, emotion: str, message: str) -> bool: """判断是否需要人工介入""" # 如果情绪非常负面或提到投诉/法律相关词汇 escalation_keywords = ['投诉', '法律', '律师', '曝光', '媒体', '工商局', '消协'] if emotion == "negative" and any(k in message for k in escalation_keywords): return True return False def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]: message = inputs["customer_message"] # 1. 情感分析 emotion = self._analyze_emotion(message) # 2. 意图识别 intent = self._detect_intent(message) # 3. 判断是否需要升级 needs_escalation = self._needs_escalation(emotion, message) # 4. 生成回复 if needs_escalation: response = f"非常抱歉给您带来不好的体验。我已经记录了您的问题,将立即转接给人工客服专员处理,请稍候。" else: response = self._generate_response(message, emotion, intent) # 5. 更新对话历史 self.memory.save_context( {"input": message}, {"output": response} ) return { "response": response, "detected_emotion": emotion, "detected_intent": intent, "needs_escalation": needs_escalation } async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]: return self._call(inputs) def reset_conversation(self): """重置对话""" self.memory.clear() # 使用智能客服 Chain llm = OpenAI(temperature=0.7) customer_service = CustomerServiceChain( llm=llm, company_name="智选科技" ) # 模拟对话 print("=== 对话 1:一般询问 ===") result = customer_service.invoke({"customer_message": "请问你们的产品怎么安装?"}) print(f"客户意图: {result['detected_intent']}") print(f"客户情绪: {result['detected_emotion']}") print(f"客服回复: {result['response']}") print("\n=== 对话 2:投诉 ===") result = customer_service.invoke({"customer_message": "你们的产品太差了,我要退货!"}) print(f"客户意图: {result['detected_intent']}") print(f"客户情绪: {result['detected_emotion']}") print(f"需要升级: {result['needs_escalation']}") print(f"客服回复: {result['response']}") print("\n=== 对话 3:上下文理解 ===") result = customer_service.invoke({"customer_message": "好的,那退款什么时候到账?"}) print(f"客服回复: {result['response']}")
4.7 LCEL (LangChain Expression Language)
LCEL 是 LangChain 提供的一种声明式语法,用于以更简洁、更直观的方式组合 Chain。它是 LangChain 的新一代链式调用语法,推荐在新项目中使用。
4.7.1 LCEL 基础
LCEL 使用 管道操作符(|) 来连接各个组件,类似于 Unix 管道或 Python 的函数式编程风格。
LCEL vs 传统方式对比:
# 传统方式 from langchain.chains import LLMChain from langchain.prompts import PromptTemplate prompt = PromptTemplate.from_template("告诉我关于{topic}的趣事") chain = LLMChain(llm=llm, prompt=prompt) result = chain.predict(topic="猫") # LCEL 方式 from langchain.prompts import ChatPromptTemplate from langchain.schema.output_parser import StrOutputParser prompt = ChatPromptTemplate.from_template("告诉我关于{topic}的趣事") chain = prompt | llm | StrOutputParser() result = chain.invoke({"topic": "猫"})
4.7.2 核心组件
LCEL 支持多种核心组件,每个组件都可以作为管道的一部分:
1. PromptTemplate
from langchain.prompts import ChatPromptTemplate # 简单模板 prompt = ChatPromptTemplate.from_template("将以下文本翻译成{language}:{text}") # 多消息模板 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的翻译助手。"), ("human", "将以下文本翻译成{language}:{text}"), ])
2. LLM / ChatModel
from langchain.chat_models import ChatOpenAI from langchain.llms import OpenAI # Chat 模型 chat_model = ChatOpenAI(model="gpt-3.5-turbo") # 文本模型 llm = OpenAI(model="gpt-3.5-turbo-instruct")
3. Output Parser
from langchain.schema.output_parser import StrOutputParser from langchain.output_parsers import PydanticOutputParser # 字符串输出解析器 str_parser = StrOutputParser() # Pydantic 输出解析器 pydantic_parser = PydanticOutputParser(pydantic_object=MyModel)
4. Retriever(检索器)
from langchain.vectorstores import Chroma from langchain.embeddings import OpenAIEmbeddings # 创建向量存储 vectorstore = Chroma.from_documents(documents, OpenAIEmbeddings()) retriever = vectorstore.as_retriever()
4.7.3 LCEL 语法详解
基础管道:
from langchain.prompts import ChatPromptTemplate from langchain.chat_models import ChatOpenAI from langchain.schema.output_parser import StrOutputParser # 创建组件 prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话") model = ChatOpenAI() output_parser = StrOutputParser() # 使用管道连接 chain = prompt | model | output_parser # 执行 result = chain.invoke({"topic": "程序员"}) print(result)
带上下文的 RAG 链:
from langchain.prompts import ChatPromptTemplate from langchain.chat_models import ChatOpenAI from langchain.schema.output_parser import StrOutputParser from langchain.schema.runnable import RunnablePassthrough # 准备检索器 docs = [ "LangChain 是一个用于构建 LLM 应用的框架", "Python 是一种流行的编程语言", "机器学习是人工智能的一个分支" ] from langchain.vectorstores import Chroma from langchain.embeddings import OpenAIEmbeddings vectorstore = Chroma.from_texts(docs, OpenAIEmbeddings()) retriever = vectorstore.as_retriever() # 创建 RAG 模板 template = """基于以下上下文回答问题: 上下文: {context} 问题:{question} 回答:""" prompt = ChatPromptTemplate.from_template(template) # 辅助函数:格式化文档 def format_docs(docs): return "\n\n".join([d.page_content for d in docs]) # 构建 RAG Chain rag_chain = ( { "context": retriever | format_docs, "question": RunnablePassthrough() } | prompt | model | output_parser ) # 使用 result = rag_chain.invoke("LangChain 是什么?") print(result)
RunnablePassthrough 详解:
from langchain.schema.runnable import RunnablePassthrough, RunnableParallel # RunnablePassthrough 将输入原样传递 # 常用于同时传递原始输入和处理后的数据 # 示例:同时获取原始问题和检索结果 chain = RunnableParallel({ "question": RunnablePassthrough(), # 原样传递问题 "context": retriever | format_docs # 检索相关文档 }) result = chain.invoke("什么是机器学习?") # 结果:{"question": "什么是机器学习?", "context": "机器学习是..."}
RunnableParallel(并行执行):
from langchain.schema.runnable import RunnableParallel # 并行执行多个分支 parallel_chain = RunnableParallel({ "branch_a": prompt_a | model | output_parser, "branch_b": prompt_b | model | output_parser, "original": RunnablePassthrough() }) result = parallel_chain.invoke({"input": "测试输入"}) # 结果:{"branch_a": "...", "branch_b": "...", "original": "..."}
4.7.4 LCEL 高级用法
1. 条件分支:
from langchain.schema.runnable import RunnableBranch # 创建条件分支链 branch = RunnableBranch( # (条件, 分支) (lambda x: "问题" in x["input"], qa_chain), (lambda x: "总结" in x["input"], summary_chain), # 默认分支 default_chain ) result = branch.invoke({"input": "请总结这段文本"})
2. 绑定运行时参数:
# 使用 bind 方法传递额外参数 chain = ( prompt | model.bind(stop=["\nObservation:"]) # 绑定停止词 | output_parser )
3. 自定义 Runnable:
from langchain.schema.runnable import RunnableLambda # 将普通函数转换为 Runnable def my_transform(data: dict) -> dict: return {"transformed": data["input"].upper()} runnable_transform = RunnableLambda(my_transform) # 使用在管道中 chain = prompt | model | output_parser | runnable_transform
4. 链式调用多个输入输出:
from operator import itemgetter # 使用 itemgetter 提取特定字段 chain = ( { "context": itemgetter("question") | retriever | format_docs, "question": itemgetter("question"), "language": itemgetter("language") } | prompt | model | output_parser ) result = chain.invoke({ "question": "什么是AI?", "language": "中文" })
5. 批处理和流式:
# 批处理 inputs = [{"topic": "猫"}, {"topic": "狗"}, {"topic": "鸟"}] results = chain.batch(inputs) # 流式输出 for chunk in chain.stream({"topic": "人工智能"}): print(chunk, end="", flush=True) # 异步调用 result = await chain.ainvoke({"topic": "量子计算"})
4.8 Chain 的组合与嵌套
Chain 的强大之处在于可以灵活组合和嵌套,构建出复杂的应用架构。
4.8.1 Chain 嵌套
Chain 可以嵌套在其他 Chain 中,实现分层处理。
基础嵌套:
from langchain.chains import LLMChain, SequentialChain, TransformChain from langchain.prompts import PromptTemplate # 内层 Chain:分析文本情感 sentiment_prompt = PromptTemplate( input_variables=["text"], template="分析以下文本的情感(positive/negative/neutral):\n\n{text}\n\n情感:" ) sentiment_chain = LLMChain( llm=llm, prompt=sentiment_prompt, output_key="sentiment" ) # 内层 Chain:提取关键词 keyword_prompt = PromptTemplate( input_variables=["text"], template="从以下文本中提取3个关键词:\n\n{text}\n\n关键词:" ) keyword_chain = LLMChain( llm=llm, prompt=keyword_prompt, output_key="keywords" ) # 外层 Chain:根据情感和关键词生成回复 response_prompt = PromptTemplate( input_variables=["sentiment", "keywords", "original_text"], template="""基于以下信息生成回复: 原文情感:{sentiment} 关键词:{keywords} 原文:{original_text} 请生成适当的回复:""" ) response_chain = LLMChain( llm=llm, prompt=response_prompt, output_key="response" ) # 使用 SequentialChain 组合 analysis_chain = SequentialChain( chains=[sentiment_chain, keyword_chain, response_chain], input_variables=["text"], output_variables=["sentiment", "keywords", "response"], verbose=True ) result = analysis_chain.invoke({ "text": "这个产品真的太棒了,完全超出我的预期!" }) print(result)
4.8.2 复杂 Chain 组合模式
模式1:分治处理
from langchain.chains import LLMChain from langchain.prompts import PromptTemplate class DivideAndConquerChain: """ 分治处理 Chain 将长文本分割,分别处理,然后合并结果 """ def __init__(self, llm, chunk_size=1000): self.llm = llm self.chunk_size = chunk_size # 子任务 Chain self.summarize_prompt = PromptTemplate( input_variables=["chunk"], template="请总结以下内容:\n\n{chunk}\n\n摘要:" ) self.summarize_chain = LLMChain(llm=llm, prompt=self.summarize_prompt) # 合并 Chain self.merge_prompt = PromptTemplate( input_variables=["summaries"], template="基于以下摘要,生成一个统一的总结:\n\n{summaries}\n\n统一总结:" ) self.merge_chain = LLMChain(llm=llm, prompt=self.merge_prompt) def split_text(self, text): """分割文本""" words = text.split() chunks = [] current_chunk = [] current_size = 0 for word in words: current_chunk.append(word) current_size += len(word) + 1 if current_size >= self.chunk_size: chunks.append(" ".join(current_chunk)) current_chunk = [] current_size = 0 if current_chunk: chunks.append(" ".join(current_chunk)) return chunks def process(self, text): """执行分治处理""" # 1. 分割文本 chunks = self.split_text(text) # 2. 分别总结 summaries = [] for chunk in chunks: result = self.summarize_chain.invoke({"chunk": chunk}) summaries.append(result["text"]) # 3. 合并结果 combined = "\n\n".join([f"片段 {i+1}: {s}" for i, s in enumerate(summaries)]) final = self.merge_chain.invoke({"summaries": combined}) return { "chunk_count": len(chunks), "chunk_summaries": summaries, "final_summary": final["text"] } # 使用 long_text = """[这里是一段很长的文本...]""" dac_chain = DivideAndConquerChain(llm=llm, chunk_size=500) result = dac_chain.process(long_text) print(f"分成了 {result['chunk_count']} 个片段") print(f"最终总结: {result['final_summary']}")
模式2:多路径处理(类似 MapReduce)
from concurrent.futures import ThreadPoolExecutor from langchain.chains import LLMChain from langchain.prompts import PromptTemplate class MultiPathChain: """ 多路径处理 Chain 同时从多个角度处理问题,然后综合结果 """ def __init__(self, llm): self.llm = llm # 多个不同角度的 Chain self.perspectives = { "technical": LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["topic"], template="从技术角度分析 {topic}:" ), output_key="text" ), "business": LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["topic"], template="从商业角度分析 {topic}:" ), output_key="text" ), "social": LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["topic"], template="从社会影响角度分析 {topic}:" ), output_key="text" ) } # 综合 Chain self.synthesis_chain = LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["technical", "business", "social", "topic"], template="""基于以下多角度分析,为"{topic}"生成一个全面的评估报告: 技术分析: {technical} 商业分析: {business} 社会影响分析: {social} 综合报告:""" ), output_key="text" ) def process(self, topic): """执行多路径处理""" # 并行执行多个视角的分析 results = {} for name, chain in self.perspectives.items(): result = chain.invoke({"topic": topic}) results[name] = result["text"] # 综合结果 synthesis = self.synthesis_chain.invoke({ "topic": topic, **results }) return { "topic": topic, **results, "synthesis": synthesis["text"] } # 使用 multi_chain = MultiPathChain(llm=llm) result = multi_chain.process("人工智能") print("=== 技术分析 ===") print(result["technical"]) print("\n=== 商业分析 ===") print(result["business"]) print("\n=== 社会影响分析 ===") print(result["social"]) print("\n=== 综合报告 ===") print(result["synthesis"])
模式3:反馈循环 Chain
from langchain.chains import LLMChain from langchain.prompts import PromptTemplate class IterativeRefinementChain: """ 迭代优化 Chain 通过多次迭代逐步改进结果 """ def __init__(self, llm, max_iterations=3): self.llm = llm self.max_iterations = max_iterations # 生成 Chain self.generate_chain = LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["task", "feedback"], template="""任务:{task} 上一轮反馈:{feedback} 请根据反馈改进你的回答:""" ), output_key="text" ) # 评估 Chain self.evaluate_chain = LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["task", "output"], template="""任务:{task} 当前输出:{output} 请评估这个输出,指出需要改进的地方。如果没有问题,回复"满意"。 评估:""" ), output_key="text" ) def process(self, task): """执行迭代优化""" current_output = "这是初始版本。" feedback = "请开始生成。" history = [] for i in range(self.max_iterations): # 生成 result = self.generate_chain.invoke({ "task": task, "feedback": feedback }) current_output = result["text"] history.append({ "iteration": i + 1, "output": current_output }) # 评估 eval_result = self.evaluate_chain.invoke({ "task": task, "output": current_output }) feedback = eval_result["text"] # 检查是否满意 if "满意" in feedback: break return { "final_output": current_output, "iterations": len(history), "history": history, "final_feedback": feedback } # 使用 refinement_chain = IterativeRefinementChain(llm=llm, max_iterations=3) result = refinement_chain.process("写一篇关于环保的文章开头(100字)") print(f"迭代次数: {result['iterations']}") print(f"最终结果: {result['final_output']}")
4.8.3 Chain 组合最佳实践
1. 单一职责原则
每个 Chain 应该只负责一个明确的任务,避免过于复杂的 Chain。
# 不好的做法:一个 Chain 做太多事情 bad_chain = LLMChain( prompt=PromptTemplate( template="翻译、总结并评价以下内容:{text}", # 做了三件事! input_variables=["text"] ), llm=llm ) # 好的做法:分解为多个单一职责的 Chain translate_chain = LLMChain(prompt=translate_prompt, llm=llm, output_key="translated") summarize_chain = LLMChain(prompt=summarize_prompt, llm=llm, output_key="summary") evaluate_chain = LLMChain(prompt=evaluate_prompt, llm=llm, output_key="evaluation") # 用 SequentialChain 组合 good_chain = SequentialChain( chains=[translate_chain, summarize_chain, evaluate_chain], input_variables=["text"], output_variables=["translated", "summary", "evaluation"] )
2. 合理命名变量
使用清晰、有意义的输入输出键名。
# 不好的命名 chain = LLMChain( prompt=prompt, llm=llm, output_key="x" # 不清晰 ) # 好的命名 chain = LLMChain( prompt=prompt, llm=llm, output_key="extracted_entities" # 清晰表达含义 )
3. 错误处理
为 Chain 添加适当的错误处理机制。
from langchain.chains.base import Chain class SafeChain(Chain): """带错误处理的 Chain 包装器""" base_chain: Chain fallback_value: str = "处理失败" @property def input_keys(self): return self.base_chain.input_keys @property def output_keys(self): return self.base_chain.output_keys def _call(self, inputs): try: return self.base_chain.invoke(inputs) except Exception as e: # 记录错误 print(f"Chain 执行错误: {e}") # 返回默认值 return {key: self.fallback_value for key in self.output_keys}
4. 使用 Memory 管理状态
对于需要维护上下文的 Chain,使用 Memory 组件。
from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationChain # 创建带记忆的 Chain memory = ConversationBufferMemory() conversation = ConversationChain( llm=llm, memory=memory, verbose=True ) # 多次对话保持上下文 conversation.predict(input="你好,我叫张三") conversation.predict(input="我叫什么名字?") # 会记住之前的对话
4.9 本章小结
本章深入讲解了 LangChain 中的 Chain(链)组件,这是构建复杂 LLM 应用的核心工具。
关键知识点回顾:
4.1 Chain 基础概念 * Chain 是将多个组件串联执行的抽象概念 * 实现了统一的接口(input_keys、output_keys、_call、_acall) * 优势包括模块化设计、可组合性、统一接口、内置功能支持
4.2 LLMChain * 最基础的 Chain 类型,组合了 PromptTemplate 和 LLM * 支持丰富的参数配置(output_key、verbose、callbacks 等) * 可以通过 OutputParser 将输出解析为结构化数据
4.3 SequentialChain * SimpleSequentialChain:简单顺序执行,单输入单输出传递 * SequentialChain:更灵活,支持多输入多输出和变量映射 * 理解输出传递机制对于构建复杂流水线至关重要
4.4 RouterChain * 根据输入特征动态选择处理路径 * MultiPromptChain:内置的多目的地路由实现 * 可以自定义路由逻辑实现更复杂的决策
4.5 TransformChain * 用于 Chain 之间的数据转换 * 可以实现文本清洗、格式转换、数据解析等功能 * 是连接不同 Chain 的“胶水”
4.6 自定义 Chain * 通过继承 BaseChain 创建自定义 Chain * 必须实现 input_keys、output_keys、_call、_acall 方法 * 可以添加自定义配置参数和方法
4.7 LCEL (LangChain Expression Language) * LangChain 的新一代声明式语法 * 使用管道操作符(|)连接组件 * 支持 RunnablePassthrough、RunnableParallel、RunnableBranch 等高级特性
4.8 Chain 的组合与嵌套 * Chain 可以灵活组合和嵌套 * 常见模式:分治处理、多路径处理、迭代优化 * 遵循最佳实践:单一职责、清晰命名、错误处理、内存管理
学习建议:
1. 从简单开始:先掌握 LLMChain 和 SequentialChain 的基础用法 2. 实践为主:通过实际项目练习 Chain 的组合和嵌套 3. 理解原理:深入理解 Chain 的执行流程和数据传递机制 4. 关注新版本:LCEL 是未来趋势,建议在新项目中优先使用 5. 善用调试:利用 verbose=True 和回调函数调试 Chain
下一步学习:
掌握了 Chain 之后,建议继续学习: * Memory 组件:为 Chain 添加记忆能力 * Agent:让 LLM 自主决策并调用工具 * Document Loaders 和 Vector Stores:构建 RAG 应用 * Evaluation:评估 Chain 的性能和输出质量
Chain 是 LangChain 的骨架,熟练运用 Chain 将使你能够构建出功能强大、结构清晰的 LLM 应用程序。
本章练习
1. 创建一个 LLMChain,将用户的输入翻译成指定语言,并使用 PydanticOutputParser 解析出翻译结果和语言检测信息。
2. 使用 SequentialChain 构建一个内容创作流水线:先生成文章标题,再基于标题生成大纲,最后基于大纲生成完整文章。
3. 实现一个 RouterChain,能够根据用户问题类型(编程、写作、翻译)路由到不同的处理 Chain。
4. 使用 TransformChain 创建一个数据清洗 Chain,能够去除文本中的 HTML 标签、多余空格和特殊字符。
5. 使用 LCEL 语法重构本章中的任意一个复杂 Chain,对比传统方式和 LCEL 方式的代码差异。
6. 创建一个自定义 Chain,实现一个简单的问答系统,包含文档检索、答案生成和答案验证三个步骤。