====== 第四章:核心组件详解 - Chains(链) ======
在 LangChain 框架中,**Chain(链)**是最核心、最重要的概念之一。它将多个组件连接在一起,形成一个可执行的工作流程。通过 Chain,我们可以将 LLM、提示模板、输出解析器、外部工具等各种组件有机地组合起来,构建出功能强大的应用程序。
本章将深入讲解 LangChain 中 Chain 的各个方面,从基础概念到高级用法,帮助你全面掌握这一核心组件。
===== 4.1 Chain 基础概念 =====
==== 4.1.1 什么是 Chain ====
**Chain(链)**是 LangChain 中用于将多个组件串联执行的抽象概念。简单来说,Chain 就是一系列按顺序执行的步骤,每个步骤接收输入、进行处理、产生输出,并将输出传递给下一个步骤。
可以把 Chain 想象成工厂的生产流水线:
* 原材料(输入)从流水线的一端进入
* 经过各个工位(Chain 中的各个组件)的加工处理
* 最终在流水线的另一端产出成品(输出)
在 LangChain 中,Chain 可以包含以下类型的组件:
* **LLM(大型语言模型)**:进行文本生成、推理等操作
* **Prompt Templates(提示模板)**:管理和格式化输入提示
* **Output Parsers(输出解析器)**:解析和结构化 LLM 的输出
* **Tools(工具)**:与外部 API、数据库等进行交互
* **其他 Chain**:实现 Chain 的嵌套和组合
==== 4.1.2 Chain 的工作原理 ====
Chain 的工作流程遵循**输入-处理-输出**的模式:
# Chain 的基本工作流程示意
输入数据 → [组件1处理] → 中间结果1 → [组件2处理] → 中间结果2 → ... → [组件N处理] → 最终输出
每个 Chain 都实现了两个核心方法:
**1. _call 方法(同步调用)**
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
执行 Chain 的核心逻辑
Args:
inputs: 输入字典,包含 Chain 需要的所有输入参数
Returns:
输出字典,包含 Chain 产生的所有输出结果
"""
# 实现具体的处理逻辑
pass
**2. _acall 方法(异步调用)**
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
异步执行 Chain 的核心逻辑
Args:
inputs: 输入字典,包含 Chain 需要的所有输入参数
Returns:
输出字典,包含 Chain 产生的所有输出结果
"""
# 实现具体的异步处理逻辑
pass
Chain 的基类 **BaseChain** 定义了所有 Chain 必须实现的接口:
from langchain.chains.base import Chain
from typing import Dict, List, Any
class MyCustomChain(Chain):
"""自定义 Chain 示例"""
@property
def input_keys(self) -> List[str]:
"""定义 Chain 需要的输入键"""
return ["input_text"]
@property
def output_keys(self) -> List[str]:
"""定义 Chain 产生的输出键"""
return ["output_text"]
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""同步执行逻辑"""
input_text = inputs["input_text"]
# 处理逻辑
output_text = f"处理后的结果: {input_text}"
return {"output_text": output_text}
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""异步执行逻辑"""
# 通常调用 _call 或实现真正的异步逻辑
return self._call(inputs)
==== 4.1.3 Chain 的优势 ====
使用 Chain 有以下几个显著优势:
**1. 模块化设计**
Chain 将复杂的任务分解为独立的、可重用的组件,每个组件只负责特定的功能。这使得代码更易理解、维护和测试。
# 不使用 Chain:所有逻辑混杂在一起
def process_document(text):
# 格式化提示
prompt = f"请总结以下文本:\n\n{text}"
# 调用 LLM
response = llm.predict(prompt)
# 解析输出
summary = response.strip()
return summary
# 使用 Chain:逻辑清晰分离
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
summary_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
template="请总结以下文本:\n\n{text}",
input_variables=["text"]
)
)
result = summary_chain.predict(text=input_text)
**2. 可组合性**
多个简单的 Chain 可以组合成复杂的 Chain,实现更强大的功能。
from langchain.chains import SimpleSequentialChain
# 定义两个简单的 Chain
chain1 = LLMChain(llm=llm, prompt=prompt1)
chain2 = LLMChain(llm=llm, prompt=prompt2)
# 组合成一个 SequentialChain
combined_chain = SimpleSequentialChain(chains=[chain1, chain2])
**3. 统一的接口**
所有 Chain 都遵循相同的接口规范(input_keys、output_keys、_call、_acall),这使得不同类型的 Chain 可以无缝协作。
**4. 内置功能支持**
LangChain 的 Chain 内置了以下功能:
* **内存管理**:通过 Memory 组件维护对话历史
* **回调机制**:支持回调函数进行日志记录、监控等
* **错误处理**:提供统一的错误处理机制
* **配置管理**:支持从配置文件加载 Chain
**5. 便于调试和监控**
Chain 的结构化设计使得调试更加容易,可以逐个组件检查输入输出,快速定位问题。
# 启用详细模式查看 Chain 的执行过程
from langchain.globals import set_debug
set_debug(True)
# 执行 Chain
result = chain.invoke({"input": "测试输入"})
===== 4.2 LLMChain =====
**LLMChain** 是 LangChain 中最基础、最常用的 Chain 类型。它将 **PromptTemplate** 和 **LLM** 组合在一起,实现从格式化输入到调用 LLM 的完整流程。
==== 4.2.1 基础使用 ====
**最简单的 LLMChain 示例:**
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
# 1. 创建 LLM 实例
llm = OpenAI(temperature=0.7)
# 2. 创建 PromptTemplate
prompt = PromptTemplate(
input_variables=["product"],
template="为以下产品写一个吸引人的产品描述:\n\n产品名称:{product}\n\n产品描述:"
)
# 3. 创建 LLMChain
chain = LLMChain(llm=llm, prompt=prompt)
# 4. 执行 Chain
result = chain.predict(product="智能手表")
print(result)
**输出示例:**
这款智能手表融合了时尚设计与先进技术,是您日常生活的完美伴侣。它不仅能精准记录您的运动数据、监测健康指标,还能与您的智能手机无缝连接,让您不错过任何重要信息。超长续航、防水设计,无论是商务场合还是户外运动,都能轻松应对。
**使用 invoke 方法(推荐方式):**
# LangChain 推荐使用 invoke 方法
result = chain.invoke({"product": "无线耳机"})
print(result["text"])
==== 4.2.2 参数详解 ====
LLMChain 支持多种参数配置,下面是详细说明:
**1. 构造函数参数**
from langchain.chains import LLMChain
chain = LLMChain(
# 必需参数
llm=llm, # LLM 实例,用于生成文本
prompt=prompt, # PromptTemplate 实例
# 可选参数
output_key="text", # 输出结果的键名,默认为 "text"
verbose=False, # 是否输出详细日志
callback_manager=None, # 回调管理器
callbacks=None, # 回调函数列表
tags=None, # 标签列表,用于追踪
metadata=None, # 元数据字典
)
**2. 输出键(output_key)**
可以通过 output_key 参数自定义输出的键名:
from langchain.chains import LLMChain
# 自定义输出键为 "description"
chain = LLMChain(
llm=llm,
prompt=prompt,
output_key="description" # 输出将使用 "description" 作为键
)
result = chain.invoke({"product": "蓝牙耳机"})
print(result["description"]) # 使用自定义的键名访问结果
**3. 详细模式(verbose)**
启用 verbose 模式可以查看 Chain 的执行过程:
chain = LLMChain(
llm=llm,
prompt=prompt,
verbose=True # 启用详细输出
)
# 执行时会打印详细的执行信息
result = chain.invoke({"product": "平板电脑"})
**输出示例:**
> Entering new LLMChain chain...
Prompt after formatting:
为以下产品写一个吸引人的产品描述:
产品名称:平板电脑
产品描述:
> Finished chain.
**4. 回调函数(callbacks)**
可以通过回调函数监听 Chain 的执行事件:
from langchain.callbacks import StdOutCallbackHandler
# 创建回调处理器
handler = StdOutCallbackHandler()
chain = LLMChain(
llm=llm,
prompt=prompt,
callbacks=[handler] # 添加回调
)
result = chain.invoke({"product": "智能音箱"})
**自定义回调处理器:**
from langchain.callbacks.base import BaseCallbackHandler
class MyCallbackHandler(BaseCallbackHandler):
"""自定义回调处理器"""
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"Chain 开始执行,输入: {inputs}")
def on_chain_end(self, outputs, **kwargs):
print(f"Chain 执行完成,输出: {outputs}")
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"LLM 开始生成,提示: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM 生成完成,响应: {response}")
# 使用自定义回调
chain = LLMChain(
llm=llm,
prompt=prompt,
callbacks=[MyCallbackHandler()]
)
==== 4.2.3 输出解析 ====
LLMChain 默认返回原始文本,但通常我们需要将输出解析为结构化数据。
**1. 使用 OutputParser**
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
# 创建列表输出解析器
output_parser = CommaSeparatedListOutputParser()
# 获取格式指令
format_instructions = output_parser.get_format_instructions()
# 创建带格式指令的提示模板
prompt = PromptTemplate(
template="列出5种{category}。\n{format_instructions}",
input_variables=["category"],
partial_variables={"format_instructions": format_instructions}
)
# 创建 LLMChain
chain = LLMChain(
llm=llm,
prompt=prompt,
output_parser=output_parser, # 添加输出解析器
output_key="items"
)
# 执行并获取结构化结果
result = chain.invoke({"category": "水果"})
print(result["items"]) # 输出: ['苹果', '香蕉', '橙子', '葡萄', '西瓜']
**2. 使用 PydanticOutputParser**
对于更复杂的结构化输出,可以使用 PydanticOutputParser:
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
# 定义数据模型
class ProductInfo(BaseModel):
name: str = Field(description="产品名称")
price: float = Field(description="产品价格")
features: list = Field(description="产品特点列表")
target_audience: str = Field(description="目标用户群体")
# 创建解析器
parser = PydanticOutputParser(pydantic_object=ProductInfo)
# 创建提示模板
prompt = PromptTemplate(
template="""根据以下产品描述,提取产品信息。
产品描述:{description}
{format_instructions}
""",
input_variables=["description"],
partial_variables={"format_instructions": parser.get_format_instructions()}
)
# 创建 Chain
chain = LLMChain(
llm=llm,
prompt=prompt,
output_parser=parser,
output_key="product_info"
)
# 执行
description = """
苹果最新推出的 iPhone 15 Pro,售价999美元起。
主要特点包括:钛金属设计、A17 Pro芯片、4800万像素主摄、USB-C接口。
主要面向追求高端体验的专业用户和科技爱好者。
"""
result = chain.invoke({"description": description})
product = result["product_info"]
print(f"产品名称: {product.name}")
print(f"价格: ${product.price}")
print(f"特点: {product.features}")
print(f"目标用户: {product.target_audience}")
**3. 自定义输出解析器**
from langchain.schema import BaseOutputParser
import json
class JsonOutputParser(BaseOutputParser[dict]):
"""自定义 JSON 输出解析器"""
def parse(self, text: str) -> dict:
"""解析 LLM 输出为 JSON 对象"""
# 尝试从文本中提取 JSON
try:
# 查找 JSON 代码块
if "```json" in text:
json_str = text.split("```json")[1].split("```")[0].strip()
elif "```" in text:
json_str = text.split("```")[1].split("```")[0].strip()
else:
json_str = text.strip()
return json.loads(json_str)
except json.JSONDecodeError as e:
# 解析失败时返回原始文本
return {"raw_text": text, "error": str(e)}
def get_format_instructions(self) -> str:
return "请以 JSON 格式输出结果。"
# 使用自定义解析器
parser = JsonOutputParser()
prompt = PromptTemplate(
template="""分析以下评论的情感。
评论:{review}
请以以下 JSON 格式返回结果:
{{
"sentiment": "positive/negative/neutral",
"confidence": 0.0-1.0,
"keywords": ["关键词1", "关键词2"]
}}
""",
input_variables=["review"]
)
chain = LLMChain(
llm=llm,
prompt=prompt,
output_parser=parser,
output_key="analysis"
)
result = chain.invoke({"review": "这个产品太棒了,完全超出预期!"})
print(result["analysis"])
===== 4.3 SequentialChain =====
**SequentialChain** 允许我们将多个 Chain 串联起来,形成一个执行流水线。前一个 Chain 的输出可以作为后一个 Chain 的输入。
==== 4.3.1 SimpleSequentialChain ====
**SimpleSequentialChain** 是最简单的顺序链,它按顺序执行多个 Chain,每个 Chain 只有一个输出,这个输出直接作为下一个 Chain 的输入。
**基本使用:**
from langchain.chains import LLMChain, SimpleSequentialChain
from langchain.prompts import PromptTemplate
# 第一步:生成产品名称
first_prompt = PromptTemplate(
input_variables=["product_type"],
template="为一款{product_type}起一个吸引人的产品名称:"
)
chain_one = LLMChain(llm=llm, prompt=first_prompt)
# 第二步:根据产品名称生成广告文案
second_prompt = PromptTemplate(
input_variables=["product_name"], # 这里接收 chain_one 的输出
template="为名为'{product_name}'的产品写一段20字的广告语:"
)
chain_two = LLMChain(llm=llm, prompt=second_prompt)
# 创建 SimpleSequentialChain
overall_chain = SimpleSequentialChain(
chains=[chain_one, chain_two],
verbose=True
)
# 执行
result = overall_chain.run("运动鞋")
print(result)
**执行过程说明:**
1. 输入 "运动鞋" 传递给 chain_one
2. chain_one 生成产品名称(如 "疾风跑鞋")
3. "疾风跑鞋" 自动传递给 chain_two 作为输入
4. chain_two 生成广告语并返回
==== 4.3.2 SequentialChain ====
**SequentialChain** 比 SimpleSequentialChain 更灵活,它允许:
* 指定多个输入和输出变量
* 控制变量在 Chain 之间的传递方式
* 访问中间步骤的输出
**基本使用:**
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
# 第一步:生成产品名称
template1 = """你是一个产品经理。请为以下产品创建一个吸引人的名称。
产品类型:{product_type}
目标客户:{target_audience}
产品名称:"""
prompt1 = PromptTemplate(
input_variables=["product_type", "target_audience"],
template=template1
)
chain1 = LLMChain(
llm=llm,
prompt=prompt1,
output_key="product_name" # 指定输出键名
)
# 第二步:生成产品描述
template2 = """你是一个文案撰写专家。请为以下产品写一段描述。
产品名称:{product_name}
产品类型:{product_type}
产品描述:"""
prompt2 = PromptTemplate(
input_variables=["product_name", "product_type"],
template=template2
)
chain2 = LLMChain(
llm=llm,
prompt=prompt2,
output_key="description"
)
# 第三步:生成营销文案
template3 = """你是一个营销专家。请根据以下信息创建一个营销口号。
产品名称:{product_name}
产品描述:{description}
营销口号:"""
prompt3 = PromptTemplate(
input_variables=["product_name", "description"],
template=template3
)
chain3 = LLMChain(
llm=llm,
prompt=prompt3,
output_key="slogan"
)
# 创建 SequentialChain
overall_chain = SequentialChain(
chains=[chain1, chain2, chain3],
input_variables=["product_type", "target_audience"],
output_variables=["product_name", "description", "slogan"],
verbose=True
)
# 执行
result = overall_chain({
"product_type": "智能手环",
"target_audience": "运动爱好者"
})
print("产品名称:", result["product_name"])
print("产品描述:", result["description"])
print("营销口号:", result["slogan"])
**SequentialChain 参数说明:**
| 参数 | 说明 | 必需 |
| chains | Chain 列表,按执行顺序排列 | 是 |
| input_variables | 整个 Chain 的输入变量列表 | 是 |
| output_variables | 最终输出的变量列表 | 否(默认输出所有) |
| verbose | 是否输出详细日志 | 否 |
==== 4.3.3 输出传递机制 ====
理解 SequentialChain 的输出传递机制对于构建复杂的 Chain 流水线至关重要。
**变量传递规则:**
1. 每个 Chain 的输出会自动添加到**全局变量池**中
2. 下游 Chain 可以从变量池中获取所需的输入
3. 变量名冲突时,后面的值会覆盖前面的值
**示例:变量传递详解**
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
# Chain A: 输入 type,输出 name 和 price
prompt_a = PromptTemplate(
input_variables=["type"],
template="为{type}生成产品名称和价格(格式:名称|价格):"
)
chain_a = LLMChain(llm=llm, prompt=prompt_a, output_key="name_price")
# Chain B: 输入 name_price,解析出 name
prompt_b = PromptTemplate(
input_variables=["name_price"],
template="从'{name_price}'中提取产品名称:"
)
chain_b = LLMChain(llm=llm, prompt=prompt_b, output_key="name")
# Chain C: 输入 name 和 type,输出 slogan
prompt_c = PromptTemplate(
input_variables=["name", "type"],
template="为{name}({type})写广告语:"
)
chain_c = LLMChain(llm=llm, prompt=prompt_c, output_key="slogan")
# 组装 SequentialChain
chain = SequentialChain(
chains=[chain_a, chain_b, chain_c],
input_variables=["type"],
output_variables=["name_price", "name", "slogan"],
verbose=True
)
result = chain({"type": "咖啡机"})
print(result)
**中间结果访问:**
# SequentialChain 可以返回中间步骤的结果
result = overall_chain({
"product_type": "智能音箱",
"target_audience": "年轻家庭"
})
# 访问所有输出(包括中间结果)
print("中间结果 - 产品名称:", result["product_name"])
print("中间结果 - 产品描述:", result["description"])
print("最终结果 - 营销口号:", result["slogan"])
===== 4.4 RouterChain =====
**RouterChain(路由链)** 能够根据输入内容动态选择不同的处理路径。这类似于编程中的 switch-case 或 if-else 逻辑。
==== 4.4.1 路由链的概念 ====
路由链的核心思想是:
* 接收用户输入
* 根据输入特征决定使用哪个"目的地"(destination)
* 将输入路由到对应的处理 Chain
**典型应用场景:**
* **客服系统**:根据问题类型路由到不同部门(技术支持、售后服务、销售咨询)
* **内容生成**:根据主题路由到不同的写作风格(科技、财经、娱乐)
* **数据处理**:根据数据类型选择不同的处理方法(文本、数字、日期)
**路由链的工作流程:**
用户输入
↓
[路由判断] → 选择目的地
↓
[对应 Chain 处理]
↓
返回结果
==== 4.4.2 根据输入选择不同处理路径 ====
**基础路由链示例:**
from langchain.chains.router import MultiPromptChain
from langchain.chains import LLMChain, ConversationChain
from langchain.prompts import PromptTemplate
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE
# 定义多个不同的提示模板(目的地)
physics_template = """你是一个物理学专家。请用简单易懂的方式解释以下物理概念。
概念:{input}
解释:"""
math_template = """你是一个数学家。请详细解答以下数学问题,并展示计算过程。
问题:{input}
解答:"""
history_template = """你是一个历史学家。请从多角度分析以下历史事件。
事件:{input}
分析:"""
computer_science_template = """你是一个计算机科学专家。请用代码示例说明以下概念。
概念:{input}
说明:"""
# 创建目的地信息列表
prompt_infos = [
{
"name": "physics",
"description": "适合回答物理学相关问题",
"prompt_template": physics_template,
},
{
"name": "math",
"description": "适合回答数学问题",
"prompt_template": math_template,
},
{
"name": "history",
"description": "适合回答历史相关问题",
"prompt_template": history_template,
},
{
"name": "computer_science",
"description": "适合回答计算机科学问题",
"prompt_template": computer_science_template,
},
]
# 为每个目的地创建 Chain
destination_chains = {}
for p_info in prompt_infos:
name = p_info["name"]
prompt_template = p_info["prompt_template"]
prompt = PromptTemplate(template=prompt_template, input_variables=["input"])
chain = LLMChain(llm=llm, prompt=prompt)
destination_chains[name] = chain
# 创建默认 Chain(当路由无法确定目的地时使用)
default_chain = ConversationChain(llm=llm, output_key="text")
==== 4.4.3 MultiPromptChain ====
**MultiPromptChain** 是 LangChain 提供的完整路由链实现,它结合了 LLMRouterChain 和多个目的地 Chain。
**完整实现:**
from langchain.chains.router import MultiPromptChain
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
from langchain.prompts import PromptTemplate
# 构建路由提示模板
destinations = [f"{p['name']}: {p['description']}" for p in prompt_infos]
destinations_str = "\n".join(destinations)
router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format(destinations=destinations_str)
# 创建路由 Chain
router_prompt = PromptTemplate(
template=router_template,
input_variables=["input"],
output_parser=RouterOutputParser(),
)
router_chain = LLMRouterChain.from_llm(llm, router_prompt)
# 创建 MultiPromptChain
chain = MultiPromptChain(
router_chain=router_chain,
destination_chains=destination_chains,
default_chain=default_chain,
verbose=True
)
# 测试不同输入
print("=== 物理问题 ===")
result = chain.invoke({"input": "什么是相对论?"})
print(result["text"])
print("\n=== 数学问题 ===")
result = chain.invoke({"input": "求解方程 x^2 - 5x + 6 = 0"})
print(result["text"])
print("\n=== 计算机问题 ===")
result = chain.invoke({"input": "解释什么是递归函数"})
print(result["text"])
print("\n=== 通用问题 ===")
result = chain.invoke({"input": "今天天气怎么样?"})
print(result["text"])
**路由决策解析:**
当执行上述代码时,LLMRouterChain 会根据输入决定目的地:
# 输入 "什么是相对论?"
# 路由决策:destination: physics
# 输入 "求解方程 x^2 - 5x + 6 = 0"
# 路由决策:destination: math
# 输入 "今天天气怎么样?"
# 路由决策:destination: DEFAULT(因为没有匹配的目的地)
**自定义路由逻辑:**
如果需要更复杂的路由逻辑,可以自定义 RouterChain:
from langchain.chains import LLMChain
from langchain.chains.base import Chain
from typing import Dict, List
class CustomRouterChain(Chain):
"""自定义路由链 - 基于关键词匹配"""
destination_chains: Dict[str, Chain]
default_chain: Chain
@property
def input_keys(self) -> List[str]:
return ["input"]
@property
def output_keys(self) -> List[str]:
return ["text"]
def _call(self, inputs: Dict[str, str]) -> Dict[str, str]:
input_text = inputs["input"].lower()
# 定义关键词映射
keyword_map = {
"physics": ["物理", "相对论", "量子", "牛顿", "能量"],
"math": ["数学", "方程", "计算", "求解", "微积分"],
"code": ["代码", "编程", "函数", "算法", "python", "java"],
}
# 匹配目的地
for destination, keywords in keyword_map.items():
if any(keyword in input_text for keyword in keywords):
if destination in self.destination_chains:
return self.destination_chains[destination](inputs)
# 默认 Chain
return self.default_chain(inputs)
async def _acall(self, inputs: Dict[str, str]) -> Dict[str, str]:
return self._call(inputs)
# 使用自定义路由链
custom_router = CustomRouterChain(
destination_chains=destination_chains,
default_chain=default_chain
)
result = custom_router.invoke({"input": "请解释Python的装饰器"})
print(result)
===== 4.5 TransformChain =====
**TransformChain** 用于在 Chain 流水线中进行数据转换。当你需要在两个 Chain 之间对数据进行清洗、格式化或转换时,TransformChain 非常有用。
==== 4.5.1 数据转换 ====
TransformChain 的核心功能是对输入数据进行自定义转换。
**基本结构:**
from langchain.chains import TransformChain
# 定义转换函数
def transform_func(inputs: dict) -> dict:
"""
转换函数接收输入字典,返回转换后的字典
"""
original_text = inputs["text"]
# 执行转换操作
transformed_text = original_text.upper() # 示例:转为大写
return {"transformed_text": transformed_text}
# 创建 TransformChain
transform_chain = TransformChain(
input_variables=["text"],
output_variables=["transformed_text"],
transform=transform_func
)
# 执行
result = transform_chain.invoke({"text": "hello world"})
print(result) # {'text': 'hello world', 'transformed_text': 'HELLO WORLD'}
==== 4.5.2 自定义转换函数 ====
TransformChain 的真正威力在于可以定义任意复杂的转换逻辑。
**示例1:文本清洗**
import re
from langchain.chains import TransformChain, LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
def clean_text(inputs: dict) -> dict:
"""清洗和格式化文本"""
text = inputs["raw_text"]
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符
text = re.sub(r'[^\w\s\.\,\?\!\u4e00-\u9fff]', '', text)
# 限制长度
text = text[:500]
return {"cleaned_text": text.strip()}
# 创建 TransformChain
clean_chain = TransformChain(
input_variables=["raw_text"],
output_variables=["cleaned_text"],
transform=clean_text
)
# 后续 LLMChain 使用清洗后的文本
prompt = PromptTemplate(
input_variables=["cleaned_text"],
template="请总结以下内容:\n\n{cleaned_text}\n\n总结:"
)
summary_chain = LLMChain(llm=llm, prompt=prompt, output_key="summary")
# 组合成 SequentialChain
overall_chain = SequentialChain(
chains=[clean_chain, summary_chain],
input_variables=["raw_text"],
output_variables=["cleaned_text", "summary"],
verbose=True
)
# 测试
dirty_text = """
这是一段包含 大量多余空格和特殊字符!!!@@@###的文本。
需要清洗后才能使用。
"""
result = overall_chain.invoke({"raw_text": dirty_text})
print("清洗后:", result["cleaned_text"])
print("总结:", result["summary"])
**示例2:数据格式转换**
import json
from langchain.chains import TransformChain
def parse_json_input(inputs: dict) -> dict:
"""将 JSON 字符串解析为结构化数据"""
json_str = inputs["json_data"]
try:
data = json.loads(json_str)
return {
"parsed_data": data,
"name": data.get("name", ""),
"age": data.get("age", 0),
"valid": True
}
except json.JSONDecodeError:
return {
"parsed_data": None,
"name": "",
"age": 0,
"valid": False,
"error": "Invalid JSON"
}
def format_output(inputs: dict) -> dict:
"""将输出格式化为特定格式"""
text = inputs["llm_output"]
return {
"formatted_output": f"【AI回复】\n{text}\n【结束】",
"word_count": len(text.split())
}
# 创建 TransformChains
parse_chain = TransformChain(
input_variables=["json_data"],
output_variables=["parsed_data", "name", "age", "valid"],
transform=parse_json_input
)
format_chain = TransformChain(
input_variables=["llm_output"],
output_variables=["formatted_output", "word_count"],
transform=format_output
)
# 示例 JSON 数据
json_data = '{"name": "张三", "age": 28, "interests": ["编程", "阅读"]}'
result = parse_chain.invoke({"json_data": json_data})
print(result)
**示例3:结合 TransformChain 和 LLMChain**
from langchain.chains import TransformChain, LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
def extract_keywords(inputs: dict) -> dict:
"""提取文本关键词"""
import jieba
text = inputs["content"]
# 使用 jieba 分词提取关键词
words = jieba.lcut(text)
# 过滤短词和停用词
keywords = [w for w in words if len(w) > 1]
return {
"keywords": keywords[:10], # 取前10个关键词
"keyword_str": ", ".join(keywords[:10])
}
# TransformChain 提取关键词
keyword_chain = TransformChain(
input_variables=["content"],
output_variables=["keywords", "keyword_str"],
transform=extract_keywords
)
# LLMChain 基于关键词生成标题
title_prompt = PromptTemplate(
input_variables=["keyword_str"],
template="""根据以下关键词,生成一个吸引人的文章标题(不超过20字):
关键词:{keyword_str}
标题:"""
)
title_chain = LLMChain(llm=llm, prompt=title_prompt, output_key="title")
# 组合 Chain
content_chain = SequentialChain(
chains=[keyword_chain, title_chain],
input_variables=["content"],
output_variables=["keywords", "keyword_str", "title"],
verbose=True
)
# 测试
article = """
人工智能(AI)正在改变我们的生活方式。从智能手机助手到自动驾驶汽车,
AI技术已经渗透到日常生活的方方面面。机器学习、深度学习等技术的发展,
使得AI能够处理越来越复杂的任务。
"""
result = content_chain.invoke({"content": article})
print(f"关键词: {result['keyword_str']}")
print(f"生成标题: {result['title']}")
===== 4.6 自定义 Chain =====
当 LangChain 内置的 Chain 无法满足需求时,可以通过继承 **BaseChain** 创建自定义 Chain。
==== 4.6.1 继承 BaseChain ====
创建自定义 Chain 需要继承 BaseChain 并实现以下方法:
| 方法/属性 | 说明 |
| input_keys | 返回 Chain 需要的输入键列表 |
| output_keys | 返回 Chain 产生的输出键列表 |
| _call | 同步执行逻辑 |
| _acall | 异步执行逻辑(可选) |
**最简单的自定义 Chain:**
from langchain.chains.base import Chain
from typing import Dict, List, Any
class EchoChain(Chain):
"""简单的回显 Chain - 将输入原样返回"""
@property
def input_keys(self) -> List[str]:
"""定义输入键"""
return ["message"]
@property
def output_keys(self) -> List[str]:
"""定义输出键"""
return ["echo"]
def _call(self, inputs: Dict[str, Any]) -> Dict[str, str]:
"""同步执行"""
message = inputs["message"]
return {"echo": f"Echo: {message}"}
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, str]:
"""异步执行"""
# 通常直接调用 _call 或实现真正的异步逻辑
return self._call(inputs)
# 使用自定义 Chain
echo = EchoChain()
result = echo.invoke({"message": "Hello!"})
print(result) # {'message': 'Hello!', 'echo': 'Echo: Hello!'}
==== 4.6.2 实现必要方法 ====
**完整的自定义 Chain 示例:**
from langchain.chains.base import Chain
from langchain.llms.base import BaseLLM
from langchain.prompts import PromptTemplate
from typing import Dict, List, Any, Optional
import time
class TranslationChain(Chain):
"""
多语言翻译 Chain
支持指定源语言和目标语言进行翻译
"""
# 定义 Chain 的配置参数
llm: BaseLLM
source_lang: str = "中文"
target_lang: str = "英文"
verbose_timing: bool = False
@property
def input_keys(self) -> List[str]:
"""输入需要一个文本"""
return ["text"]
@property
def output_keys(self) -> List[str]:
"""输出包含翻译结果和元信息"""
return ["translation", "source_lang", "target_lang", "processing_time"]
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""执行翻译"""
start_time = time.time()
text = inputs["text"]
# 构建翻译提示
prompt_text = f"""请将以下{self.source_lang}翻译成{self.target_lang}:
{self.source_lang}:{text}
{self.target_lang}:"""
# 调用 LLM
translation = self.llm.predict(prompt_text)
processing_time = time.time() - start_time
if self.verbose_timing:
print(f"翻译耗时: {processing_time:.2f}秒")
return {
"translation": translation.strip(),
"source_lang": self.source_lang,
"target_lang": self.target_lang,
"processing_time": processing_time
}
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""异步执行"""
# 实际应用中可以使用 ainvoke 或异步 LLM 调用
return self._call(inputs)
def translate(self, text: str) -> str:
"""便捷方法 - 只返回翻译结果"""
result = self.invoke({"text": text})
return result["translation"]
# 使用自定义 TranslationChain
from langchain.llms import OpenAI
llm = OpenAI(temperature=0.3)
translator = TranslationChain(
llm=llm,
source_lang="中文",
target_lang="英文",
verbose_timing=True
)
# 方式1:使用 invoke
result = translator.invoke({"text": "人工智能正在改变世界"})
print(f"原文: 人工智能正在改变世界")
print(f"译文: {result['translation']}")
print(f"耗时: {result['processing_time']:.2f}秒")
# 方式2:使用便捷方法
translation = translator.translate("机器学习是人工智能的一个重要分支")
print(f"便捷翻译: {translation}")
**带可选参数的自定义 Chain:**
from pydantic import Field
class SummarizationChain(Chain):
"""
文本摘要 Chain
支持指定摘要长度和风格
"""
llm: BaseLLM = Field(..., description="LLM 实例")
max_length: int = Field(default=100, description="摘要最大字数")
style: str = Field(default="concise", description="摘要风格: concise/detailed/bullet")
@property
def input_keys(self) -> List[str]:
return ["text"]
@property
def output_keys(self) -> List[str]:
return ["summary", "word_count", "style_used"]
def _get_style_instruction(self) -> str:
"""根据风格获取指令"""
styles = {
"concise": "提供一个简洁的摘要",
"detailed": "提供一个详细的摘要,包含主要观点",
"bullet": "以 bullet points 形式列出主要要点"
}
return styles.get(self.style, styles["concise"])
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
text = inputs["text"]
style_instruction = self._get_style_instruction()
prompt = f"""请对以下文本进行摘要。{style_instruction},不超过{self.max_length}字。
文本:
{text}
摘要:"""
summary = self.llm.predict(prompt).strip()
word_count = len(summary)
return {
"summary": summary,
"word_count": word_count,
"style_used": self.style
}
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return self._call(inputs)
# 使用
summarizer = SummarizationChain(
llm=llm,
max_length=50,
style="bullet"
)
long_text = """
人工智能(Artificial Intelligence,简称AI)是计算机科学的一个分支,
致力于创造能够模拟人类智能的系统。这些系统可以学习、推理、感知环境、
理解语言并做出决策。AI技术包括机器学习、深度学习、自然语言处理、
计算机视觉等多个领域。近年来,随着计算能力的提升和数据量的增加,
AI技术取得了突破性进展,在医疗诊断、自动驾驶、金融分析等领域
展现出巨大潜力。
"""
result = summarizer.invoke({"text": long_text})
print(f"摘要: {result['summary']}")
print(f"字数: {result['word_count']}")
print(f"风格: {result['style_used']}")
==== 4.6.3 完整示例:智能客服 Chain ====
下面是一个完整的自定义 Chain 示例,模拟智能客服系统:
from langchain.chains.base import Chain
from langchain.llms.base import BaseLLM
from langchain.memory import ConversationBufferMemory
from pydantic import Field
from typing import Dict, List, Any
import re
class CustomerServiceChain(Chain):
"""
智能客服 Chain
功能:
1. 情感分析 - 判断客户情绪
2. 意图识别 - 识别客户需求类型
3. 生成回复 - 根据情绪和意图生成合适回复
4. 维护对话历史
"""
llm: BaseLLM = Field(..., description="LLM 实例")
memory: ConversationBufferMemory = Field(
default_factory=ConversationBufferMemory,
description="对话记忆"
)
company_name: str = Field(default="我们的公司", description="公司名称")
# 内部状态
emotion_threshold: float = 0.5
@property
def input_keys(self) -> List[str]:
return ["customer_message"]
@property
def output_keys(self) -> List[str]:
return [
"response",
"detected_emotion",
"detected_intent",
"needs_escalation"
]
def _analyze_emotion(self, message: str) -> str:
"""分析客户情绪"""
# 负面关键词
negative_words = ['生气', '愤怒', '失望', '糟糕', '差', '慢', '坏', '投诉', '退货', '退款']
# 正面关键词
positive_words = ['满意', '好', '棒', '感谢', '喜欢', '推荐', '不错']
message_lower = message.lower()
negative_count = sum(1 for w in negative_words if w in message_lower)
positive_count = sum(1 for w in positive_words if w in message_lower)
if negative_count > positive_count:
return "negative"
elif positive_count > negative_count:
return "positive"
else:
return "neutral"
def _detect_intent(self, message: str) -> str:
"""识别客户意图"""
intent_patterns = {
"inquiry": ['怎么', '如何', '什么', '多少', '价格'],
"complaint": ['投诉', '不满', '问题', '故障', '坏'],
"purchase": ['买', '订购', '购买', '下单'],
"refund": ['退', '退款', '退货', '换货'],
"technical": ['安装', '设置', '配置', '连接']
}
message_lower = message.lower()
for intent, patterns in intent_patterns.items():
if any(p in message_lower for p in patterns):
return intent
return "general"
def _generate_response(self, message: str, emotion: str, intent: str) -> str:
"""生成客服回复"""
# 准备上下文
history = self.memory.load_memory_variables({}).get("history", "")
# 根据情绪和意图调整语气
tone_instruction = {
"negative": "客户情绪负面,需要表达歉意和同理心,语气要诚恳",
"positive": "客户情绪积极,可以友好热情地回应",
"neutral": "保持专业、礼貌的语气"
}.get(emotion, "保持专业语气")
intent_instruction = {
"inquiry": "回答客户的询问",
"complaint": "处理投诉,表示理解并提供解决方案",
"purchase": "协助完成购买流程",
"refund": "处理退款/退货请求,了解具体情况",
"technical": "提供技术支持",
"general": "提供一般性帮助"
}.get(intent, "提供帮助")
prompt = f"""你是{self.company_name}的智能客服助手。
对话历史:
{history}
客户消息:{message}
客户情绪:{emotion}
客户意图:{intent}
回复要求:
- {tone_instruction}
- {intent_instruction}
- 简洁明了,不超过100字
- 使用中文
你的回复:"""
response = self.llm.predict(prompt).strip()
return response
def _needs_escalation(self, emotion: str, message: str) -> bool:
"""判断是否需要人工介入"""
# 如果情绪非常负面或提到投诉/法律相关词汇
escalation_keywords = ['投诉', '法律', '律师', '曝光', '媒体', '工商局', '消协']
if emotion == "negative" and any(k in message for k in escalation_keywords):
return True
return False
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
message = inputs["customer_message"]
# 1. 情感分析
emotion = self._analyze_emotion(message)
# 2. 意图识别
intent = self._detect_intent(message)
# 3. 判断是否需要升级
needs_escalation = self._needs_escalation(emotion, message)
# 4. 生成回复
if needs_escalation:
response = f"非常抱歉给您带来不好的体验。我已经记录了您的问题,将立即转接给人工客服专员处理,请稍候。"
else:
response = self._generate_response(message, emotion, intent)
# 5. 更新对话历史
self.memory.save_context(
{"input": message},
{"output": response}
)
return {
"response": response,
"detected_emotion": emotion,
"detected_intent": intent,
"needs_escalation": needs_escalation
}
async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return self._call(inputs)
def reset_conversation(self):
"""重置对话"""
self.memory.clear()
# 使用智能客服 Chain
llm = OpenAI(temperature=0.7)
customer_service = CustomerServiceChain(
llm=llm,
company_name="智选科技"
)
# 模拟对话
print("=== 对话 1:一般询问 ===")
result = customer_service.invoke({"customer_message": "请问你们的产品怎么安装?"})
print(f"客户意图: {result['detected_intent']}")
print(f"客户情绪: {result['detected_emotion']}")
print(f"客服回复: {result['response']}")
print("\n=== 对话 2:投诉 ===")
result = customer_service.invoke({"customer_message": "你们的产品太差了,我要退货!"})
print(f"客户意图: {result['detected_intent']}")
print(f"客户情绪: {result['detected_emotion']}")
print(f"需要升级: {result['needs_escalation']}")
print(f"客服回复: {result['response']}")
print("\n=== 对话 3:上下文理解 ===")
result = customer_service.invoke({"customer_message": "好的,那退款什么时候到账?"})
print(f"客服回复: {result['response']}")
===== 4.7 LCEL (LangChain Expression Language) =====
**LCEL** 是 LangChain 提供的一种声明式语法,用于以更简洁、更直观的方式组合 Chain。它是 LangChain 的新一代链式调用语法,推荐在新项目中使用。
==== 4.7.1 LCEL 基础 ====
LCEL 使用 **管道操作符(|)** 来连接各个组件,类似于 Unix 管道或 Python 的函数式编程风格。
**LCEL vs 传统方式对比:**
# 传统方式
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
prompt = PromptTemplate.from_template("告诉我关于{topic}的趣事")
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.predict(topic="猫")
# LCEL 方式
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
prompt = ChatPromptTemplate.from_template("告诉我关于{topic}的趣事")
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "猫"})
==== 4.7.2 核心组件 ====
LCEL 支持多种核心组件,每个组件都可以作为管道的一部分:
**1. PromptTemplate**
from langchain.prompts import ChatPromptTemplate
# 简单模板
prompt = ChatPromptTemplate.from_template("将以下文本翻译成{language}:{text}")
# 多消息模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的翻译助手。"),
("human", "将以下文本翻译成{language}:{text}"),
])
**2. LLM / ChatModel**
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI
# Chat 模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")
# 文本模型
llm = OpenAI(model="gpt-3.5-turbo-instruct")
**3. Output Parser**
from langchain.schema.output_parser import StrOutputParser
from langchain.output_parsers import PydanticOutputParser
# 字符串输出解析器
str_parser = StrOutputParser()
# Pydantic 输出解析器
pydantic_parser = PydanticOutputParser(pydantic_object=MyModel)
**4. Retriever(检索器)**
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
# 创建向量存储
vectorstore = Chroma.from_documents(documents, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
==== 4.7.3 LCEL 语法详解 ====
**基础管道:**
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
# 创建组件
prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
model = ChatOpenAI()
output_parser = StrOutputParser()
# 使用管道连接
chain = prompt | model | output_parser
# 执行
result = chain.invoke({"topic": "程序员"})
print(result)
**带上下文的 RAG 链:**
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
# 准备检索器
docs = [
"LangChain 是一个用于构建 LLM 应用的框架",
"Python 是一种流行的编程语言",
"机器学习是人工智能的一个分支"
]
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
vectorstore = Chroma.from_texts(docs, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
# 创建 RAG 模板
template = """基于以下上下文回答问题:
上下文:
{context}
问题:{question}
回答:"""
prompt = ChatPromptTemplate.from_template(template)
# 辅助函数:格式化文档
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
# 构建 RAG Chain
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough()
}
| prompt
| model
| output_parser
)
# 使用
result = rag_chain.invoke("LangChain 是什么?")
print(result)
**RunnablePassthrough 详解:**
from langchain.schema.runnable import RunnablePassthrough, RunnableParallel
# RunnablePassthrough 将输入原样传递
# 常用于同时传递原始输入和处理后的数据
# 示例:同时获取原始问题和检索结果
chain = RunnableParallel({
"question": RunnablePassthrough(), # 原样传递问题
"context": retriever | format_docs # 检索相关文档
})
result = chain.invoke("什么是机器学习?")
# 结果:{"question": "什么是机器学习?", "context": "机器学习是..."}
**RunnableParallel(并行执行):**
from langchain.schema.runnable import RunnableParallel
# 并行执行多个分支
parallel_chain = RunnableParallel({
"branch_a": prompt_a | model | output_parser,
"branch_b": prompt_b | model | output_parser,
"original": RunnablePassthrough()
})
result = parallel_chain.invoke({"input": "测试输入"})
# 结果:{"branch_a": "...", "branch_b": "...", "original": "..."}
==== 4.7.4 LCEL 高级用法 ====
**1. 条件分支:**
from langchain.schema.runnable import RunnableBranch
# 创建条件分支链
branch = RunnableBranch(
# (条件, 分支)
(lambda x: "问题" in x["input"], qa_chain),
(lambda x: "总结" in x["input"], summary_chain),
# 默认分支
default_chain
)
result = branch.invoke({"input": "请总结这段文本"})
**2. 绑定运行时参数:**
# 使用 bind 方法传递额外参数
chain = (
prompt
| model.bind(stop=["\nObservation:"]) # 绑定停止词
| output_parser
)
**3. 自定义 Runnable:**
from langchain.schema.runnable import RunnableLambda
# 将普通函数转换为 Runnable
def my_transform(data: dict) -> dict:
return {"transformed": data["input"].upper()}
runnable_transform = RunnableLambda(my_transform)
# 使用在管道中
chain = prompt | model | output_parser | runnable_transform
**4. 链式调用多个输入输出:**
from operator import itemgetter
# 使用 itemgetter 提取特定字段
chain = (
{
"context": itemgetter("question") | retriever | format_docs,
"question": itemgetter("question"),
"language": itemgetter("language")
}
| prompt
| model
| output_parser
)
result = chain.invoke({
"question": "什么是AI?",
"language": "中文"
})
**5. 批处理和流式:**
# 批处理
inputs = [{"topic": "猫"}, {"topic": "狗"}, {"topic": "鸟"}]
results = chain.batch(inputs)
# 流式输出
for chunk in chain.stream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
# 异步调用
result = await chain.ainvoke({"topic": "量子计算"})
===== 4.8 Chain 的组合与嵌套 =====
Chain 的强大之处在于可以灵活组合和嵌套,构建出复杂的应用架构。
==== 4.8.1 Chain 嵌套 ====
Chain 可以嵌套在其他 Chain 中,实现分层处理。
**基础嵌套:**
from langchain.chains import LLMChain, SequentialChain, TransformChain
from langchain.prompts import PromptTemplate
# 内层 Chain:分析文本情感
sentiment_prompt = PromptTemplate(
input_variables=["text"],
template="分析以下文本的情感(positive/negative/neutral):\n\n{text}\n\n情感:"
)
sentiment_chain = LLMChain(
llm=llm,
prompt=sentiment_prompt,
output_key="sentiment"
)
# 内层 Chain:提取关键词
keyword_prompt = PromptTemplate(
input_variables=["text"],
template="从以下文本中提取3个关键词:\n\n{text}\n\n关键词:"
)
keyword_chain = LLMChain(
llm=llm,
prompt=keyword_prompt,
output_key="keywords"
)
# 外层 Chain:根据情感和关键词生成回复
response_prompt = PromptTemplate(
input_variables=["sentiment", "keywords", "original_text"],
template="""基于以下信息生成回复:
原文情感:{sentiment}
关键词:{keywords}
原文:{original_text}
请生成适当的回复:"""
)
response_chain = LLMChain(
llm=llm,
prompt=response_prompt,
output_key="response"
)
# 使用 SequentialChain 组合
analysis_chain = SequentialChain(
chains=[sentiment_chain, keyword_chain, response_chain],
input_variables=["text"],
output_variables=["sentiment", "keywords", "response"],
verbose=True
)
result = analysis_chain.invoke({
"text": "这个产品真的太棒了,完全超出我的预期!"
})
print(result)
==== 4.8.2 复杂 Chain 组合模式 ====
**模式1:分治处理**
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
class DivideAndConquerChain:
"""
分治处理 Chain
将长文本分割,分别处理,然后合并结果
"""
def __init__(self, llm, chunk_size=1000):
self.llm = llm
self.chunk_size = chunk_size
# 子任务 Chain
self.summarize_prompt = PromptTemplate(
input_variables=["chunk"],
template="请总结以下内容:\n\n{chunk}\n\n摘要:"
)
self.summarize_chain = LLMChain(llm=llm, prompt=self.summarize_prompt)
# 合并 Chain
self.merge_prompt = PromptTemplate(
input_variables=["summaries"],
template="基于以下摘要,生成一个统一的总结:\n\n{summaries}\n\n统一总结:"
)
self.merge_chain = LLMChain(llm=llm, prompt=self.merge_prompt)
def split_text(self, text):
"""分割文本"""
words = text.split()
chunks = []
current_chunk = []
current_size = 0
for word in words:
current_chunk.append(word)
current_size += len(word) + 1
if current_size >= self.chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def process(self, text):
"""执行分治处理"""
# 1. 分割文本
chunks = self.split_text(text)
# 2. 分别总结
summaries = []
for chunk in chunks:
result = self.summarize_chain.invoke({"chunk": chunk})
summaries.append(result["text"])
# 3. 合并结果
combined = "\n\n".join([f"片段 {i+1}: {s}" for i, s in enumerate(summaries)])
final = self.merge_chain.invoke({"summaries": combined})
return {
"chunk_count": len(chunks),
"chunk_summaries": summaries,
"final_summary": final["text"]
}
# 使用
long_text = """[这里是一段很长的文本...]"""
dac_chain = DivideAndConquerChain(llm=llm, chunk_size=500)
result = dac_chain.process(long_text)
print(f"分成了 {result['chunk_count']} 个片段")
print(f"最终总结: {result['final_summary']}")
**模式2:多路径处理(类似 MapReduce)**
from concurrent.futures import ThreadPoolExecutor
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
class MultiPathChain:
"""
多路径处理 Chain
同时从多个角度处理问题,然后综合结果
"""
def __init__(self, llm):
self.llm = llm
# 多个不同角度的 Chain
self.perspectives = {
"technical": LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["topic"],
template="从技术角度分析 {topic}:"
),
output_key="text"
),
"business": LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["topic"],
template="从商业角度分析 {topic}:"
),
output_key="text"
),
"social": LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["topic"],
template="从社会影响角度分析 {topic}:"
),
output_key="text"
)
}
# 综合 Chain
self.synthesis_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["technical", "business", "social", "topic"],
template="""基于以下多角度分析,为"{topic}"生成一个全面的评估报告:
技术分析:
{technical}
商业分析:
{business}
社会影响分析:
{social}
综合报告:"""
),
output_key="text"
)
def process(self, topic):
"""执行多路径处理"""
# 并行执行多个视角的分析
results = {}
for name, chain in self.perspectives.items():
result = chain.invoke({"topic": topic})
results[name] = result["text"]
# 综合结果
synthesis = self.synthesis_chain.invoke({
"topic": topic,
**results
})
return {
"topic": topic,
**results,
"synthesis": synthesis["text"]
}
# 使用
multi_chain = MultiPathChain(llm=llm)
result = multi_chain.process("人工智能")
print("=== 技术分析 ===")
print(result["technical"])
print("\n=== 商业分析 ===")
print(result["business"])
print("\n=== 社会影响分析 ===")
print(result["social"])
print("\n=== 综合报告 ===")
print(result["synthesis"])
**模式3:反馈循环 Chain**
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
class IterativeRefinementChain:
"""
迭代优化 Chain
通过多次迭代逐步改进结果
"""
def __init__(self, llm, max_iterations=3):
self.llm = llm
self.max_iterations = max_iterations
# 生成 Chain
self.generate_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["task", "feedback"],
template="""任务:{task}
上一轮反馈:{feedback}
请根据反馈改进你的回答:"""
),
output_key="text"
)
# 评估 Chain
self.evaluate_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["task", "output"],
template="""任务:{task}
当前输出:{output}
请评估这个输出,指出需要改进的地方。如果没有问题,回复"满意"。
评估:"""
),
output_key="text"
)
def process(self, task):
"""执行迭代优化"""
current_output = "这是初始版本。"
feedback = "请开始生成。"
history = []
for i in range(self.max_iterations):
# 生成
result = self.generate_chain.invoke({
"task": task,
"feedback": feedback
})
current_output = result["text"]
history.append({
"iteration": i + 1,
"output": current_output
})
# 评估
eval_result = self.evaluate_chain.invoke({
"task": task,
"output": current_output
})
feedback = eval_result["text"]
# 检查是否满意
if "满意" in feedback:
break
return {
"final_output": current_output,
"iterations": len(history),
"history": history,
"final_feedback": feedback
}
# 使用
refinement_chain = IterativeRefinementChain(llm=llm, max_iterations=3)
result = refinement_chain.process("写一篇关于环保的文章开头(100字)")
print(f"迭代次数: {result['iterations']}")
print(f"最终结果: {result['final_output']}")
==== 4.8.3 Chain 组合最佳实践 ====
**1. 单一职责原则**
每个 Chain 应该只负责一个明确的任务,避免过于复杂的 Chain。
# 不好的做法:一个 Chain 做太多事情
bad_chain = LLMChain(
prompt=PromptTemplate(
template="翻译、总结并评价以下内容:{text}", # 做了三件事!
input_variables=["text"]
),
llm=llm
)
# 好的做法:分解为多个单一职责的 Chain
translate_chain = LLMChain(prompt=translate_prompt, llm=llm, output_key="translated")
summarize_chain = LLMChain(prompt=summarize_prompt, llm=llm, output_key="summary")
evaluate_chain = LLMChain(prompt=evaluate_prompt, llm=llm, output_key="evaluation")
# 用 SequentialChain 组合
good_chain = SequentialChain(
chains=[translate_chain, summarize_chain, evaluate_chain],
input_variables=["text"],
output_variables=["translated", "summary", "evaluation"]
)
**2. 合理命名变量**
使用清晰、有意义的输入输出键名。
# 不好的命名
chain = LLMChain(
prompt=prompt,
llm=llm,
output_key="x" # 不清晰
)
# 好的命名
chain = LLMChain(
prompt=prompt,
llm=llm,
output_key="extracted_entities" # 清晰表达含义
)
**3. 错误处理**
为 Chain 添加适当的错误处理机制。
from langchain.chains.base import Chain
class SafeChain(Chain):
"""带错误处理的 Chain 包装器"""
base_chain: Chain
fallback_value: str = "处理失败"
@property
def input_keys(self):
return self.base_chain.input_keys
@property
def output_keys(self):
return self.base_chain.output_keys
def _call(self, inputs):
try:
return self.base_chain.invoke(inputs)
except Exception as e:
# 记录错误
print(f"Chain 执行错误: {e}")
# 返回默认值
return {key: self.fallback_value for key in self.output_keys}
**4. 使用 Memory 管理状态**
对于需要维护上下文的 Chain,使用 Memory 组件。
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# 创建带记忆的 Chain
memory = ConversationBufferMemory()
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True
)
# 多次对话保持上下文
conversation.predict(input="你好,我叫张三")
conversation.predict(input="我叫什么名字?") # 会记住之前的对话
===== 4.9 本章小结 =====
本章深入讲解了 LangChain 中的 Chain(链)组件,这是构建复杂 LLM 应用的核心工具。
**关键知识点回顾:**
**4.1 Chain 基础概念**
* Chain 是将多个组件串联执行的抽象概念
* 实现了统一的接口(input_keys、output_keys、_call、_acall)
* 优势包括模块化设计、可组合性、统一接口、内置功能支持
**4.2 LLMChain**
* 最基础的 Chain 类型,组合了 PromptTemplate 和 LLM
* 支持丰富的参数配置(output_key、verbose、callbacks 等)
* 可以通过 OutputParser 将输出解析为结构化数据
**4.3 SequentialChain**
* **SimpleSequentialChain**:简单顺序执行,单输入单输出传递
* **SequentialChain**:更灵活,支持多输入多输出和变量映射
* 理解输出传递机制对于构建复杂流水线至关重要
**4.4 RouterChain**
* 根据输入特征动态选择处理路径
* **MultiPromptChain**:内置的多目的地路由实现
* 可以自定义路由逻辑实现更复杂的决策
**4.5 TransformChain**
* 用于 Chain 之间的数据转换
* 可以实现文本清洗、格式转换、数据解析等功能
* 是连接不同 Chain 的"胶水"
**4.6 自定义 Chain**
* 通过继承 BaseChain 创建自定义 Chain
* 必须实现 input_keys、output_keys、_call、_acall 方法
* 可以添加自定义配置参数和方法
**4.7 LCEL (LangChain Expression Language)**
* LangChain 的新一代声明式语法
* 使用管道操作符(|)连接组件
* 支持 RunnablePassthrough、RunnableParallel、RunnableBranch 等高级特性
**4.8 Chain 的组合与嵌套**
* Chain 可以灵活组合和嵌套
* 常见模式:分治处理、多路径处理、迭代优化
* 遵循最佳实践:单一职责、清晰命名、错误处理、内存管理
**学习建议:**
1. **从简单开始**:先掌握 LLMChain 和 SequentialChain 的基础用法
2. **实践为主**:通过实际项目练习 Chain 的组合和嵌套
3. **理解原理**:深入理解 Chain 的执行流程和数据传递机制
4. **关注新版本**:LCEL 是未来趋势,建议在新项目中优先使用
5. **善用调试**:利用 verbose=True 和回调函数调试 Chain
**下一步学习:**
掌握了 Chain 之后,建议继续学习:
* **Memory 组件**:为 Chain 添加记忆能力
* **Agent**:让 LLM 自主决策并调用工具
* **Document Loaders 和 Vector Stores**:构建 RAG 应用
* **Evaluation**:评估 Chain 的性能和输出质量
Chain 是 LangChain 的骨架,熟练运用 Chain 将使你能够构建出功能强大、结构清晰的 LLM 应用程序。
==== 本章练习 ====
1. 创建一个 LLMChain,将用户的输入翻译成指定语言,并使用 PydanticOutputParser 解析出翻译结果和语言检测信息。
2. 使用 SequentialChain 构建一个内容创作流水线:先生成文章标题,再基于标题生成大纲,最后基于大纲生成完整文章。
3. 实现一个 RouterChain,能够根据用户问题类型(编程、写作、翻译)路由到不同的处理 Chain。
4. 使用 TransformChain 创建一个数据清洗 Chain,能够去除文本中的 HTML 标签、多余空格和特殊字符。
5. 使用 LCEL 语法重构本章中的任意一个复杂 Chain,对比传统方式和 LCEL 方式的代码差异。
6. 创建一个自定义 Chain,实现一个简单的问答系统,包含文档检索、答案生成和答案验证三个步骤。