langchain二次开发:核心组件详解_chains

这是本文档旧的修订版!


第四章:核心组件详解 - Chains(链)

在 LangChain 框架中,Chain(链)是最核心、最重要的概念之一。它将多个组件连接在一起,形成一个可执行的工作流程。通过 Chain,我们可以将 LLM、提示模板、输出解析器、外部工具等各种组件有机地组合起来,构建出功能强大的应用程序。

本章将深入讲解 LangChain 中 Chain 的各个方面,从基础概念到高级用法,帮助你全面掌握这一核心组件。

Chain(链)是 LangChain 中用于将多个组件串联执行的抽象概念。简单来说,Chain 就是一系列按顺序执行的步骤,每个步骤接收输入、进行处理、产生输出,并将输出传递给下一个步骤。

可以把 Chain 想象成工厂的生产流水线:

  • 原材料(输入)从流水线的一端进入
  • 经过各个工位(Chain 中的各个组件)的加工处理
  • 最终在流水线的另一端产出成品(输出)

在 LangChain 中,Chain 可以包含以下类型的组件:

  • LLM(大型语言模型):进行文本生成、推理等操作
  • Prompt Templates(提示模板):管理和格式化输入提示
  • Output Parsers(输出解析器):解析和结构化 LLM 的输出
  • Tools(工具):与外部 API、数据库等进行交互
  • 其他 Chain:实现 Chain 的嵌套和组合

Chain 的工作流程遵循输入-处理-输出的模式:

# Chain 的基本工作流程示意
输入数据 → [组件1处理] → 中间结果1[组件2处理] → 中间结果2 → ... → [组件N处理] → 最终输出

每个 Chain 都实现了两个核心方法:

1. _call 方法(同步调用)

def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    执行 Chain 的核心逻辑
 
    Args:
        inputs: 输入字典,包含 Chain 需要的所有输入参数
 
    Returns:
        输出字典,包含 Chain 产生的所有输出结果
    """
    # 实现具体的处理逻辑
    pass

2. _acall 方法(异步调用)

async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    异步执行 Chain 的核心逻辑
 
    Args:
        inputs: 输入字典,包含 Chain 需要的所有输入参数
 
    Returns:
        输出字典,包含 Chain 产生的所有输出结果
    """
    # 实现具体的异步处理逻辑
    pass

Chain 的基类 BaseChain 定义了所有 Chain 必须实现的接口:

from langchain.chains.base import Chain
from typing import Dict, List, Any
 
class MyCustomChain(Chain):
    """自定义 Chain 示例"""
 
    @property
    def input_keys(self) -> List[str]:
        """定义 Chain 需要的输入键"""
        return ["input_text"]
 
    @property
    def output_keys(self) -> List[str]:
        """定义 Chain 产生的输出键"""
        return ["output_text"]
 
    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """同步执行逻辑"""
        input_text = inputs["input_text"]
        # 处理逻辑
        output_text = f"处理后的结果: {input_text}"
        return {"output_text": output_text}
 
    async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """异步执行逻辑"""
        # 通常调用 _call 或实现真正的异步逻辑
        return self._call(inputs)

使用 Chain 有以下几个显著优势:

1. 模块化设计 Chain 将复杂的任务分解为独立的、可重用的组件,每个组件只负责特定的功能。这使得代码更易理解、维护和测试。

# 不使用 Chain:所有逻辑混杂在一起
def process_document(text):
    # 格式化提示
    prompt = f"请总结以下文本:\n\n{text}"
    # 调用 LLM
    response = llm.predict(prompt)
    # 解析输出
    summary = response.strip()
    return summary
 
# 使用 Chain:逻辑清晰分离
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
 
summary_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        template="请总结以下文本:\n\n{text}",
        input_variables=["text"]
    )
)
result = summary_chain.predict(text=input_text)

2. 可组合性 多个简单的 Chain 可以组合成复杂的 Chain,实现更强大的功能。

from langchain.chains import SimpleSequentialChain
 
# 定义两个简单的 Chain
chain1 = LLMChain(llm=llm, prompt=prompt1)
chain2 = LLMChain(llm=llm, prompt=prompt2)
 
# 组合成一个 SequentialChain
combined_chain = SimpleSequentialChain(chains=[chain1, chain2])

3. 统一的接口 所有 Chain 都遵循相同的接口规范(input_keys、output_keys、_call、_acall),这使得不同类型的 Chain 可以无缝协作。

4. 内置功能支持 LangChain 的 Chain 内置了以下功能:

  • 内存管理:通过 Memory 组件维护对话历史
  • 回调机制:支持回调函数进行日志记录、监控等
  • 错误处理:提供统一的错误处理机制
  • 配置管理:支持从配置文件加载 Chain

5. 便于调试和监控 Chain 的结构化设计使得调试更加容易,可以逐个组件检查输入输出,快速定位问题。

# 启用详细模式查看 Chain 的执行过程
from langchain.globals import set_debug
set_debug(True)
 
# 执行 Chain
result = chain.invoke({"input": "测试输入"})

LLMChain 是 LangChain 中最基础、最常用的 Chain 类型。它将 PromptTemplateLLM 组合在一起,实现从格式化输入到调用 LLM 的完整流程。

最简单的 LLMChain 示例:

from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
 
# 1. 创建 LLM 实例
llm = OpenAI(temperature=0.7)
 
# 2. 创建 PromptTemplate
prompt = PromptTemplate(
    input_variables=["product"],
    template="为以下产品写一个吸引人的产品描述:\n\n产品名称:{product}\n\n产品描述:"
)
 
# 3. 创建 LLMChain
chain = LLMChain(llm=llm, prompt=prompt)
 
# 4. 执行 Chain
result = chain.predict(product="智能手表")
print(result)

输出示例:

这款智能手表融合了时尚设计与先进技术,是您日常生活的完美伴侣。它不仅能精准记录您的运动数据、监测健康指标,还能与您的智能手机无缝连接,让您不错过任何重要信息。超长续航、防水设计,无论是商务场合还是户外运动,都能轻松应对。

使用 invoke 方法(推荐方式):

# LangChain 推荐使用 invoke 方法
result = chain.invoke({"product": "无线耳机"})
print(result["text"])

LLMChain 支持多种参数配置,下面是详细说明:

1. 构造函数参数

from langchain.chains import LLMChain
 
chain = LLMChain(
    # 必需参数
    llm=llm,                    # LLM 实例,用于生成文本
    prompt=prompt,              # PromptTemplate 实例
 
    # 可选参数
    output_key="text",          # 输出结果的键名,默认为 "text"
    verbose=False,              # 是否输出详细日志
    callback_manager=None,      # 回调管理器
    callbacks=None,             # 回调函数列表
    tags=None,                  # 标签列表,用于追踪
    metadata=None,              # 元数据字典
)

2. 输出键(output_key)

可以通过 output_key 参数自定义输出的键名:

from langchain.chains import LLMChain
 
# 自定义输出键为 "description"
chain = LLMChain(
    llm=llm,
    prompt=prompt,
    output_key="description"  # 输出将使用 "description" 作为键
)
 
result = chain.invoke({"product": "蓝牙耳机"})
print(result["description"])  # 使用自定义的键名访问结果

3. 详细模式(verbose)

启用 verbose 模式可以查看 Chain 的执行过程:

chain = LLMChain(
    llm=llm,
    prompt=prompt,
    verbose=True  # 启用详细输出
)
 
# 执行时会打印详细的执行信息
result = chain.invoke({"product": "平板电脑"})

输出示例:

> Entering new LLMChain chain...
Prompt after formatting:
为以下产品写一个吸引人的产品描述:

产品名称:平板电脑

产品描述:

> Finished chain.

4. 回调函数(callbacks)

可以通过回调函数监听 Chain 的执行事件:

from langchain.callbacks import StdOutCallbackHandler
 
# 创建回调处理器
handler = StdOutCallbackHandler()
 
chain = LLMChain(
    llm=llm,
    prompt=prompt,
    callbacks=[handler]  # 添加回调
)
 
result = chain.invoke({"product": "智能音箱"})

自定义回调处理器:

from langchain.callbacks.base import BaseCallbackHandler
 
class MyCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器"""
 
    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Chain 开始执行,输入: {inputs}")
 
    def on_chain_end(self, outputs, **kwargs):
        print(f"Chain 执行完成,输出: {outputs}")
 
    def on_llm_start(self, serialized, prompts, **kwargs):
        print(f"LLM 开始生成,提示: {prompts}")
 
    def on_llm_end(self, response, **kwargs):
        print(f"LLM 生成完成,响应: {response}")
 
# 使用自定义回调
chain = LLMChain(
    llm=llm,
    prompt=prompt,
    callbacks=[MyCallbackHandler()]
)

LLMChain 默认返回原始文本,但通常我们需要将输出解析为结构化数据。

1. 使用 OutputParser

from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
 
# 创建列表输出解析器
output_parser = CommaSeparatedListOutputParser()
 
# 获取格式指令
format_instructions = output_parser.get_format_instructions()
 
# 创建带格式指令的提示模板
prompt = PromptTemplate(
    template="列出5种{category}。\n{format_instructions}",
    input_variables=["category"],
    partial_variables={"format_instructions": format_instructions}
)
 
# 创建 LLMChain
chain = LLMChain(
    llm=llm,
    prompt=prompt,
    output_parser=output_parser,  # 添加输出解析器
    output_key="items"
)
 
# 执行并获取结构化结果
result = chain.invoke({"category": "水果"})
print(result["items"])  # 输出: ['苹果', '香蕉', '橙子', '葡萄', '西瓜']

2. 使用 PydanticOutputParser

对于更复杂的结构化输出,可以使用 PydanticOutputParser:

from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
 
# 定义数据模型
class ProductInfo(BaseModel):
    name: str = Field(description="产品名称")
    price: float = Field(description="产品价格")
    features: list = Field(description="产品特点列表")
    target_audience: str = Field(description="目标用户群体")
 
# 创建解析器
parser = PydanticOutputParser(pydantic_object=ProductInfo)
 
# 创建提示模板
prompt = PromptTemplate(
    template="""根据以下产品描述,提取产品信息。
 
产品描述:{description}
 
{format_instructions}
""",
    input_variables=["description"],
    partial_variables={"format_instructions": parser.get_format_instructions()}
)
 
# 创建 Chain
chain = LLMChain(
    llm=llm,
    prompt=prompt,
    output_parser=parser,
    output_key="product_info"
)
 
# 执行
description = """
苹果最新推出的 iPhone 15 Pro,售价999美元起。
主要特点包括:钛金属设计、A17 Pro芯片、4800万像素主摄、USB-C接口。
主要面向追求高端体验的专业用户和科技爱好者。
"""
 
result = chain.invoke({"description": description})
product = result["product_info"]
 
print(f"产品名称: {product.name}")
print(f"价格: ${product.price}")
print(f"特点: {product.features}")
print(f"目标用户: {product.target_audience}")

3. 自定义输出解析器

from langchain.schema import BaseOutputParser
import json
 
class JsonOutputParser(BaseOutputParser[dict]):
    """自定义 JSON 输出解析器"""
 
    def parse(self, text: str) -> dict:
        """解析 LLM 输出为 JSON 对象"""
        # 尝试从文本中提取 JSON
        try:
            # 查找 JSON 代码块
            if "```json" in text:
                json_str = text.split("```json")[1].split("```")[0].strip()
            elif "```" in text:
                json_str = text.split("```")[1].split("```")[0].strip()
            else:
                json_str = text.strip()
 
            return json.loads(json_str)
        except json.JSONDecodeError as e:
            # 解析失败时返回原始文本
            return {"raw_text": text, "error": str(e)}
 
    def get_format_instructions(self) -> str:
        return "请以 JSON 格式输出结果。"
 
# 使用自定义解析器
parser = JsonOutputParser()
 
prompt = PromptTemplate(
    template="""分析以下评论的情感。
评论:{review}
 
请以以下 JSON 格式返回结果:
{{
    "sentiment": "positive/negative/neutral",
    "confidence": 0.0-1.0,
    "keywords": ["关键词1", "关键词2"]
}}
""",
    input_variables=["review"]
)
 
chain = LLMChain(
    llm=llm,
    prompt=prompt,
    output_parser=parser,
    output_key="analysis"
)
 
result = chain.invoke({"review": "这个产品太棒了,完全超出预期!"})
print(result["analysis"])

SequentialChain 允许我们将多个 Chain 串联起来,形成一个执行流水线。前一个 Chain 的输出可以作为后一个 Chain 的输入。

SimpleSequentialChain 是最简单的顺序链,它按顺序执行多个 Chain,每个 Chain 只有一个输出,这个输出直接作为下一个 Chain 的输入。

基本使用:

from langchain.chains import LLMChain, SimpleSequentialChain
from langchain.prompts import PromptTemplate
 
# 第一步:生成产品名称
first_prompt = PromptTemplate(
    input_variables=["product_type"],
    template="为一款{product_type}起一个吸引人的产品名称:"
)
chain_one = LLMChain(llm=llm, prompt=first_prompt)
 
# 第二步:根据产品名称生成广告文案
second_prompt = PromptTemplate(
    input_variables=["product_name"],  # 这里接收 chain_one 的输出
    template="为名为'{product_name}'的产品写一段20字的广告语:"
)
chain_two = LLMChain(llm=llm, prompt=second_prompt)
 
# 创建 SimpleSequentialChain
overall_chain = SimpleSequentialChain(
    chains=[chain_one, chain_two],
    verbose=True
)
 
# 执行
result = overall_chain.run("运动鞋")
print(result)

执行过程说明:

1. 输入 “运动鞋” 传递给 chain_one

2. chain_one 生成产品名称(如 “疾风跑鞋”)

3. “疾风跑鞋” 自动传递给 chain_two 作为输入

4. chain_two 生成广告语并返回

SequentialChain 比 SimpleSequentialChain 更灵活,它允许:

  • 指定多个输入和输出变量
  • 控制变量在 Chain 之间的传递方式
  • 访问中间步骤的输出

基本使用:

from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
 
# 第一步:生成产品名称
template1 = """你是一个产品经理。请为以下产品创建一个吸引人的名称。
 
产品类型:{product_type}
目标客户:{target_audience}
 
产品名称:"""
prompt1 = PromptTemplate(
    input_variables=["product_type", "target_audience"],
    template=template1
)
chain1 = LLMChain(
    llm=llm, 
    prompt=prompt1, 
    output_key="product_name"  # 指定输出键名
)
 
# 第二步:生成产品描述
template2 = """你是一个文案撰写专家。请为以下产品写一段描述。
 
产品名称:{product_name}
产品类型:{product_type}
 
产品描述:"""
prompt2 = PromptTemplate(
    input_variables=["product_name", "product_type"],
    template=template2
)
chain2 = LLMChain(
    llm=llm, 
    prompt=prompt2, 
    output_key="description"
)
 
# 第三步:生成营销文案
template3 = """你是一个营销专家。请根据以下信息创建一个营销口号。
 
产品名称:{product_name}
产品描述:{description}
 
营销口号:"""
prompt3 = PromptTemplate(
    input_variables=["product_name", "description"],
    template=template3
)
chain3 = LLMChain(
    llm=llm, 
    prompt=prompt3, 
    output_key="slogan"
)
 
# 创建 SequentialChain
overall_chain = SequentialChain(
    chains=[chain1, chain2, chain3],
    input_variables=["product_type", "target_audience"],
    output_variables=["product_name", "description", "slogan"],
    verbose=True
)
 
# 执行
result = overall_chain({
    "product_type": "智能手环",
    "target_audience": "运动爱好者"
})
 
print("产品名称:", result["product_name"])
print("产品描述:", result["description"])
print("营销口号:", result["slogan"])

SequentialChain 参数说明:

参数 说明 必需
chains Chain 列表,按执行顺序排列
input_variables 整个 Chain 的输入变量列表
output_variables 最终输出的变量列表 否(默认输出所有)
verbose 是否输出详细日志

理解 SequentialChain 的输出传递机制对于构建复杂的 Chain 流水线至关重要。

变量传递规则:

1. 每个 Chain 的输出会自动添加到全局变量池

2. 下游 Chain 可以从变量池中获取所需的输入

3. 变量名冲突时,后面的值会覆盖前面的值

示例:变量传递详解

from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
 
# Chain A: 输入 type,输出 name 和 price
prompt_a = PromptTemplate(
    input_variables=["type"],
    template="为{type}生成产品名称和价格(格式:名称|价格):"
)
chain_a = LLMChain(llm=llm, prompt=prompt_a, output_key="name_price")
 
# Chain B: 输入 name_price,解析出 name
prompt_b = PromptTemplate(
    input_variables=["name_price"],
    template="从'{name_price}'中提取产品名称:"
)
chain_b = LLMChain(llm=llm, prompt=prompt_b, output_key="name")
 
# Chain C: 输入 name 和 type,输出 slogan
prompt_c = PromptTemplate(
    input_variables=["name", "type"],
    template="为{name}({type})写广告语:"
)
chain_c = LLMChain(llm=llm, prompt=prompt_c, output_key="slogan")
 
# 组装 SequentialChain
chain = SequentialChain(
    chains=[chain_a, chain_b, chain_c],
    input_variables=["type"],
    output_variables=["name_price", "name", "slogan"],
    verbose=True
)
 
result = chain({"type": "咖啡机"})
print(result)

中间结果访问:

# SequentialChain 可以返回中间步骤的结果
result = overall_chain({
    "product_type": "智能音箱",
    "target_audience": "年轻家庭"
})
 
# 访问所有输出(包括中间结果)
print("中间结果 - 产品名称:", result["product_name"])
print("中间结果 - 产品描述:", result["description"])
print("最终结果 - 营销口号:", result["slogan"])

RouterChain(路由链) 能够根据输入内容动态选择不同的处理路径。这类似于编程中的 switch-case 或 if-else 逻辑。

路由链的核心思想是:

  • 接收用户输入
  • 根据输入特征决定使用哪个“目的地”(destination)
  • 将输入路由到对应的处理 Chain

典型应用场景:

  • 客服系统:根据问题类型路由到不同部门(技术支持、售后服务、销售咨询)
  • 内容生成:根据主题路由到不同的写作风格(科技、财经、娱乐)
  • 数据处理:根据数据类型选择不同的处理方法(文本、数字、日期)

路由链的工作流程:

用户输入 
    ↓
[路由判断] → 选择目的地
    ↓
[对应 Chain 处理]
    ↓
返回结果

基础路由链示例:

from langchain.chains.router import MultiPromptChain
from langchain.chains import LLMChain, ConversationChain
from langchain.prompts import PromptTemplate
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE
 
# 定义多个不同的提示模板(目的地)
physics_template = """你是一个物理学专家。请用简单易懂的方式解释以下物理概念。
 
概念:{input}
 
解释:"""
 
math_template = """你是一个数学家。请详细解答以下数学问题,并展示计算过程。
 
问题:{input}
 
解答:"""
 
history_template = """你是一个历史学家。请从多角度分析以下历史事件。
 
事件:{input}
 
分析:"""
 
computer_science_template = """你是一个计算机科学专家。请用代码示例说明以下概念。
 
概念:{input}
 
说明:"""
 
# 创建目的地信息列表
prompt_infos = [
    {
        "name": "physics",
        "description": "适合回答物理学相关问题",
        "prompt_template": physics_template,
    },
    {
        "name": "math",
        "description": "适合回答数学问题",
        "prompt_template": math_template,
    },
    {
        "name": "history",
        "description": "适合回答历史相关问题",
        "prompt_template": history_template,
    },
    {
        "name": "computer_science",
        "description": "适合回答计算机科学问题",
        "prompt_template": computer_science_template,
    },
]
 
# 为每个目的地创建 Chain
destination_chains = {}
for p_info in prompt_infos:
    name = p_info["name"]
    prompt_template = p_info["prompt_template"]
    prompt = PromptTemplate(template=prompt_template, input_variables=["input"])
    chain = LLMChain(llm=llm, prompt=prompt)
    destination_chains[name] = chain
 
# 创建默认 Chain(当路由无法确定目的地时使用)
default_chain = ConversationChain(llm=llm, output_key="text")

MultiPromptChain 是 LangChain 提供的完整路由链实现,它结合了 LLMRouterChain 和多个目的地 Chain。

完整实现:

from langchain.chains.router import MultiPromptChain
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
from langchain.prompts import PromptTemplate
 
# 构建路由提示模板
destinations = [f"{p['name']}: {p['description']}" for p in prompt_infos]
destinations_str = "\n".join(destinations)
 
router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format(destinations=destinations_str)
 
# 创建路由 Chain
router_prompt = PromptTemplate(
    template=router_template,
    input_variables=["input"],
    output_parser=RouterOutputParser(),
)
 
router_chain = LLMRouterChain.from_llm(llm, router_prompt)
 
# 创建 MultiPromptChain
chain = MultiPromptChain(
    router_chain=router_chain,
    destination_chains=destination_chains,
    default_chain=default_chain,
    verbose=True
)
 
# 测试不同输入
print("=== 物理问题 ===")
result = chain.invoke({"input": "什么是相对论?"})
print(result["text"])
 
print("\n=== 数学问题 ===")
result = chain.invoke({"input": "求解方程 x^2 - 5x + 6 = 0"})
print(result["text"])
 
print("\n=== 计算机问题 ===")
result = chain.invoke({"input": "解释什么是递归函数"})
print(result["text"])
 
print("\n=== 通用问题 ===")
result = chain.invoke({"input": "今天天气怎么样?"})
print(result["text"])

路由决策解析:

当执行上述代码时,LLMRouterChain 会根据输入决定目的地:

# 输入 "什么是相对论?"
# 路由决策:destination: physics
 
# 输入 "求解方程 x^2 - 5x + 6 = 0"
# 路由决策:destination: math
 
# 输入 "今天天气怎么样?"
# 路由决策:destination: DEFAULT(因为没有匹配的目的地)

自定义路由逻辑:

如果需要更复杂的路由逻辑,可以自定义 RouterChain:

from langchain.chains import LLMChain
from langchain.chains.base import Chain
from typing import Dict, List
 
class CustomRouterChain(Chain):
    """自定义路由链 - 基于关键词匹配"""
 
    destination_chains: Dict[str, Chain]
    default_chain: Chain
 
    @property
    def input_keys(self) -> List[str]:
        return ["input"]
 
    @property
    def output_keys(self) -> List[str]:
        return ["text"]
 
    def _call(self, inputs: Dict[str, str]) -> Dict[str, str]:
        input_text = inputs["input"].lower()
 
        # 定义关键词映射
        keyword_map = {
            "physics": ["物理", "相对论", "量子", "牛顿", "能量"],
            "math": ["数学", "方程", "计算", "求解", "微积分"],
            "code": ["代码", "编程", "函数", "算法", "python", "java"],
        }
 
        # 匹配目的地
        for destination, keywords in keyword_map.items():
            if any(keyword in input_text for keyword in keywords):
                if destination in self.destination_chains:
                    return self.destination_chains[destination](inputs)
 
        # 默认 Chain
        return self.default_chain(inputs)
 
    async def _acall(self, inputs: Dict[str, str]) -> Dict[str, str]:
        return self._call(inputs)
 
# 使用自定义路由链
custom_router = CustomRouterChain(
    destination_chains=destination_chains,
    default_chain=default_chain
)
 
result = custom_router.invoke({"input": "请解释Python的装饰器"})
print(result)

TransformChain 用于在 Chain 流水线中进行数据转换。当你需要在两个 Chain 之间对数据进行清洗、格式化或转换时,TransformChain 非常有用。

TransformChain 的核心功能是对输入数据进行自定义转换。

基本结构:

from langchain.chains import TransformChain
 
# 定义转换函数
def transform_func(inputs: dict) -> dict:
    """
    转换函数接收输入字典,返回转换后的字典
    """
    original_text = inputs["text"]
    # 执行转换操作
    transformed_text = original_text.upper()  # 示例:转为大写
    return {"transformed_text": transformed_text}
 
# 创建 TransformChain
transform_chain = TransformChain(
    input_variables=["text"],
    output_variables=["transformed_text"],
    transform=transform_func
)
 
# 执行
result = transform_chain.invoke({"text": "hello world"})
print(result)  # {'text': 'hello world', 'transformed_text': 'HELLO WORLD'}

TransformChain 的真正威力在于可以定义任意复杂的转换逻辑。

示例1:文本清洗

import re
from langchain.chains import TransformChain, LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
 
def clean_text(inputs: dict) -> dict:
    """清洗和格式化文本"""
    text = inputs["raw_text"]
 
    # 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    # 移除特殊字符
    text = re.sub(r'[^\w\s\.\,\?\!\u4e00-\u9fff]', '', text)
    # 限制长度
    text = text[:500]
 
    return {"cleaned_text": text.strip()}
 
# 创建 TransformChain
clean_chain = TransformChain(
    input_variables=["raw_text"],
    output_variables=["cleaned_text"],
    transform=clean_text
)
 
# 后续 LLMChain 使用清洗后的文本
prompt = PromptTemplate(
    input_variables=["cleaned_text"],
    template="请总结以下内容:\n\n{cleaned_text}\n\n总结:"
)
summary_chain = LLMChain(llm=llm, prompt=prompt, output_key="summary")
 
# 组合成 SequentialChain
overall_chain = SequentialChain(
    chains=[clean_chain, summary_chain],
    input_variables=["raw_text"],
    output_variables=["cleaned_text", "summary"],
    verbose=True
)
 
# 测试
dirty_text = """
  这是一段包含   大量多余空格和特殊字符!!!@@@###的文本。
  需要清洗后才能使用。
"""
 
result = overall_chain.invoke({"raw_text": dirty_text})
print("清洗后:", result["cleaned_text"])
print("总结:", result["summary"])

示例2:数据格式转换

import json
from langchain.chains import TransformChain
 
def parse_json_input(inputs: dict) -> dict:
    """将 JSON 字符串解析为结构化数据"""
    json_str = inputs["json_data"]
    try:
        data = json.loads(json_str)
        return {
            "parsed_data": data,
            "name": data.get("name", ""),
            "age": data.get("age", 0),
            "valid": True
        }
    except json.JSONDecodeError:
        return {
            "parsed_data": None,
            "name": "",
            "age": 0,
            "valid": False,
            "error": "Invalid JSON"
        }
 
def format_output(inputs: dict) -> dict:
    """将输出格式化为特定格式"""
    text = inputs["llm_output"]
    return {
        "formatted_output": f"【AI回复】\n{text}\n【结束】",
        "word_count": len(text.split())
    }
 
# 创建 TransformChains
parse_chain = TransformChain(
    input_variables=["json_data"],
    output_variables=["parsed_data", "name", "age", "valid"],
    transform=parse_json_input
)
 
format_chain = TransformChain(
    input_variables=["llm_output"],
    output_variables=["formatted_output", "word_count"],
    transform=format_output
)
 
# 示例 JSON 数据
json_data = '{"name": "张三", "age": 28, "interests": ["编程", "阅读"]}'
result = parse_chain.invoke({"json_data": json_data})
print(result)

示例3:结合 TransformChain 和 LLMChain

from langchain.chains import TransformChain, LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
 
def extract_keywords(inputs: dict) -> dict:
    """提取文本关键词"""
    import jieba
    text = inputs["content"]
    # 使用 jieba 分词提取关键词
    words = jieba.lcut(text)
    # 过滤短词和停用词
    keywords = [w for w in words if len(w) > 1]
    return {
        "keywords": keywords[:10],  # 取前10个关键词
        "keyword_str": ", ".join(keywords[:10])
    }
 
# TransformChain 提取关键词
keyword_chain = TransformChain(
    input_variables=["content"],
    output_variables=["keywords", "keyword_str"],
    transform=extract_keywords
)
 
# LLMChain 基于关键词生成标题
title_prompt = PromptTemplate(
    input_variables=["keyword_str"],
    template="""根据以下关键词,生成一个吸引人的文章标题(不超过20字):
 
关键词:{keyword_str}
 
标题:"""
)
title_chain = LLMChain(llm=llm, prompt=title_prompt, output_key="title")
 
# 组合 Chain
content_chain = SequentialChain(
    chains=[keyword_chain, title_chain],
    input_variables=["content"],
    output_variables=["keywords", "keyword_str", "title"],
    verbose=True
)
 
# 测试
article = """
人工智能(AI)正在改变我们的生活方式。从智能手机助手到自动驾驶汽车,
AI技术已经渗透到日常生活的方方面面。机器学习、深度学习等技术的发展,
使得AI能够处理越来越复杂的任务。
"""
 
result = content_chain.invoke({"content": article})
print(f"关键词: {result['keyword_str']}")
print(f"生成标题: {result['title']}")

当 LangChain 内置的 Chain 无法满足需求时,可以通过继承 BaseChain 创建自定义 Chain。

创建自定义 Chain 需要继承 BaseChain 并实现以下方法:

方法/属性 说明
input_keys 返回 Chain 需要的输入键列表
output_keys 返回 Chain 产生的输出键列表
_call 同步执行逻辑
_acall 异步执行逻辑(可选)

最简单的自定义 Chain:

from langchain.chains.base import Chain
from typing import Dict, List, Any
 
class EchoChain(Chain):
    """简单的回显 Chain - 将输入原样返回"""
 
    @property
    def input_keys(self) -> List[str]:
        """定义输入键"""
        return ["message"]
 
    @property
    def output_keys(self) -> List[str]:
        """定义输出键"""
        return ["echo"]
 
    def _call(self, inputs: Dict[str, Any]) -> Dict[str, str]:
        """同步执行"""
        message = inputs["message"]
        return {"echo": f"Echo: {message}"}
 
    async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, str]:
        """异步执行"""
        # 通常直接调用 _call 或实现真正的异步逻辑
        return self._call(inputs)
 
# 使用自定义 Chain
echo = EchoChain()
result = echo.invoke({"message": "Hello!"})
print(result)  # {'message': 'Hello!', 'echo': 'Echo: Hello!'}

完整的自定义 Chain 示例:

from langchain.chains.base import Chain
from langchain.llms.base import BaseLLM
from langchain.prompts import PromptTemplate
from typing import Dict, List, Any, Optional
import time
 
class TranslationChain(Chain):
    """
    多语言翻译 Chain
    支持指定源语言和目标语言进行翻译
    """
 
    # 定义 Chain 的配置参数
    llm: BaseLLM
    source_lang: str = "中文"
    target_lang: str = "英文"
    verbose_timing: bool = False
 
    @property
    def input_keys(self) -> List[str]:
        """输入需要一个文本"""
        return ["text"]
 
    @property
    def output_keys(self) -> List[str]:
        """输出包含翻译结果和元信息"""
        return ["translation", "source_lang", "target_lang", "processing_time"]
 
    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """执行翻译"""
        start_time = time.time()
 
        text = inputs["text"]
 
        # 构建翻译提示
        prompt_text = f"""请将以下{self.source_lang}翻译成{self.target_lang}:
 
{self.source_lang}:{text}
 
{self.target_lang}:"""
 
        # 调用 LLM
        translation = self.llm.predict(prompt_text)
 
        processing_time = time.time() - start_time
 
        if self.verbose_timing:
            print(f"翻译耗时: {processing_time:.2f}秒")
 
        return {
            "translation": translation.strip(),
            "source_lang": self.source_lang,
            "target_lang": self.target_lang,
            "processing_time": processing_time
        }
 
    async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """异步执行"""
        # 实际应用中可以使用 ainvoke 或异步 LLM 调用
        return self._call(inputs)
 
    def translate(self, text: str) -> str:
        """便捷方法 - 只返回翻译结果"""
        result = self.invoke({"text": text})
        return result["translation"]
 
# 使用自定义 TranslationChain
from langchain.llms import OpenAI
 
llm = OpenAI(temperature=0.3)
 
translator = TranslationChain(
    llm=llm,
    source_lang="中文",
    target_lang="英文",
    verbose_timing=True
)
 
# 方式1:使用 invoke
result = translator.invoke({"text": "人工智能正在改变世界"})
print(f"原文: 人工智能正在改变世界")
print(f"译文: {result['translation']}")
print(f"耗时: {result['processing_time']:.2f}秒")
 
# 方式2:使用便捷方法
translation = translator.translate("机器学习是人工智能的一个重要分支")
print(f"便捷翻译: {translation}")

带可选参数的自定义 Chain:

from pydantic import Field
 
class SummarizationChain(Chain):
    """
    文本摘要 Chain
    支持指定摘要长度和风格
    """
 
    llm: BaseLLM = Field(..., description="LLM 实例")
    max_length: int = Field(default=100, description="摘要最大字数")
    style: str = Field(default="concise", description="摘要风格: concise/detailed/bullet")
 
    @property
    def input_keys(self) -> List[str]:
        return ["text"]
 
    @property
    def output_keys(self) -> List[str]:
        return ["summary", "word_count", "style_used"]
 
    def _get_style_instruction(self) -> str:
        """根据风格获取指令"""
        styles = {
            "concise": "提供一个简洁的摘要",
            "detailed": "提供一个详细的摘要,包含主要观点",
            "bullet": "以 bullet points 形式列出主要要点"
        }
        return styles.get(self.style, styles["concise"])
 
    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        text = inputs["text"]
        style_instruction = self._get_style_instruction()
 
        prompt = f"""请对以下文本进行摘要。{style_instruction},不超过{self.max_length}字。
 
文本:
{text}
 
摘要:"""
 
        summary = self.llm.predict(prompt).strip()
        word_count = len(summary)
 
        return {
            "summary": summary,
            "word_count": word_count,
            "style_used": self.style
        }
 
    async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        return self._call(inputs)
 
# 使用
summarizer = SummarizationChain(
    llm=llm,
    max_length=50,
    style="bullet"
)
 
long_text = """
人工智能(Artificial Intelligence,简称AI)是计算机科学的一个分支,
致力于创造能够模拟人类智能的系统。这些系统可以学习、推理、感知环境、
理解语言并做出决策。AI技术包括机器学习、深度学习、自然语言处理、
计算机视觉等多个领域。近年来,随着计算能力的提升和数据量的增加,
AI技术取得了突破性进展,在医疗诊断、自动驾驶、金融分析等领域
展现出巨大潜力。
"""
 
result = summarizer.invoke({"text": long_text})
print(f"摘要: {result['summary']}")
print(f"字数: {result['word_count']}")
print(f"风格: {result['style_used']}")

下面是一个完整的自定义 Chain 示例,模拟智能客服系统:

from langchain.chains.base import Chain
from langchain.llms.base import BaseLLM
from langchain.memory import ConversationBufferMemory
from pydantic import Field
from typing import Dict, List, Any
import re
 
class CustomerServiceChain(Chain):
    """
    智能客服 Chain
    功能:
    1. 情感分析 - 判断客户情绪
    2. 意图识别 - 识别客户需求类型
    3. 生成回复 - 根据情绪和意图生成合适回复
    4. 维护对话历史
    """
 
    llm: BaseLLM = Field(..., description="LLM 实例")
    memory: ConversationBufferMemory = Field(
        default_factory=ConversationBufferMemory,
        description="对话记忆"
    )
    company_name: str = Field(default="我们的公司", description="公司名称")
 
    # 内部状态
    emotion_threshold: float = 0.5
 
    @property
    def input_keys(self) -> List[str]:
        return ["customer_message"]
 
    @property
    def output_keys(self) -> List[str]:
        return [
            "response", 
            "detected_emotion", 
            "detected_intent",
            "needs_escalation"
        ]
 
    def _analyze_emotion(self, message: str) -> str:
        """分析客户情绪"""
        # 负面关键词
        negative_words = ['生气', '愤怒', '失望', '糟糕', '差', '慢', '坏', '投诉', '退货', '退款']
        # 正面关键词  
        positive_words = ['满意', '好', '棒', '感谢', '喜欢', '推荐', '不错']
 
        message_lower = message.lower()
        negative_count = sum(1 for w in negative_words if w in message_lower)
        positive_count = sum(1 for w in positive_words if w in message_lower)
 
        if negative_count > positive_count:
            return "negative"
        elif positive_count > negative_count:
            return "positive"
        else:
            return "neutral"
 
    def _detect_intent(self, message: str) -> str:
        """识别客户意图"""
        intent_patterns = {
            "inquiry": ['怎么', '如何', '什么', '多少', '价格'],
            "complaint": ['投诉', '不满', '问题', '故障', '坏'],
            "purchase": ['买', '订购', '购买', '下单'],
            "refund": ['退', '退款', '退货', '换货'],
            "technical": ['安装', '设置', '配置', '连接']
        }
 
        message_lower = message.lower()
        for intent, patterns in intent_patterns.items():
            if any(p in message_lower for p in patterns):
                return intent
 
        return "general"
 
    def _generate_response(self, message: str, emotion: str, intent: str) -> str:
        """生成客服回复"""
 
        # 准备上下文
        history = self.memory.load_memory_variables({}).get("history", "")
 
        # 根据情绪和意图调整语气
        tone_instruction = {
            "negative": "客户情绪负面,需要表达歉意和同理心,语气要诚恳",
            "positive": "客户情绪积极,可以友好热情地回应",
            "neutral": "保持专业、礼貌的语气"
        }.get(emotion, "保持专业语气")
 
        intent_instruction = {
            "inquiry": "回答客户的询问",
            "complaint": "处理投诉,表示理解并提供解决方案",
            "purchase": "协助完成购买流程",
            "refund": "处理退款/退货请求,了解具体情况",
            "technical": "提供技术支持",
            "general": "提供一般性帮助"
        }.get(intent, "提供帮助")
 
        prompt = f"""你是{self.company_name}的智能客服助手。
 
对话历史:
{history}
 
客户消息:{message}
 
客户情绪:{emotion}
客户意图:{intent}
 
回复要求:
- {tone_instruction}
- {intent_instruction}
- 简洁明了,不超过100字
- 使用中文
 
你的回复:"""
 
        response = self.llm.predict(prompt).strip()
        return response
 
    def _needs_escalation(self, emotion: str, message: str) -> bool:
        """判断是否需要人工介入"""
        # 如果情绪非常负面或提到投诉/法律相关词汇
        escalation_keywords = ['投诉', '法律', '律师', '曝光', '媒体', '工商局', '消协']
        if emotion == "negative" and any(k in message for k in escalation_keywords):
            return True
        return False
 
    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        message = inputs["customer_message"]
 
        # 1. 情感分析
        emotion = self._analyze_emotion(message)
 
        # 2. 意图识别
        intent = self._detect_intent(message)
 
        # 3. 判断是否需要升级
        needs_escalation = self._needs_escalation(emotion, message)
 
        # 4. 生成回复
        if needs_escalation:
            response = f"非常抱歉给您带来不好的体验。我已经记录了您的问题,将立即转接给人工客服专员处理,请稍候。"
        else:
            response = self._generate_response(message, emotion, intent)
 
        # 5. 更新对话历史
        self.memory.save_context(
            {"input": message},
            {"output": response}
        )
 
        return {
            "response": response,
            "detected_emotion": emotion,
            "detected_intent": intent,
            "needs_escalation": needs_escalation
        }
 
    async def _acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        return self._call(inputs)
 
    def reset_conversation(self):
        """重置对话"""
        self.memory.clear()
 
# 使用智能客服 Chain
llm = OpenAI(temperature=0.7)
 
customer_service = CustomerServiceChain(
    llm=llm,
    company_name="智选科技"
)
 
# 模拟对话
print("=== 对话 1:一般询问 ===")
result = customer_service.invoke({"customer_message": "请问你们的产品怎么安装?"})
print(f"客户意图: {result['detected_intent']}")
print(f"客户情绪: {result['detected_emotion']}")
print(f"客服回复: {result['response']}")
 
print("\n=== 对话 2:投诉 ===")
result = customer_service.invoke({"customer_message": "你们的产品太差了,我要退货!"})
print(f"客户意图: {result['detected_intent']}")
print(f"客户情绪: {result['detected_emotion']}")
print(f"需要升级: {result['needs_escalation']}")
print(f"客服回复: {result['response']}")
 
print("\n=== 对话 3:上下文理解 ===")
result = customer_service.invoke({"customer_message": "好的,那退款什么时候到账?"})
print(f"客服回复: {result['response']}")

LCEL 是 LangChain 提供的一种声明式语法,用于以更简洁、更直观的方式组合 Chain。它是 LangChain 的新一代链式调用语法,推荐在新项目中使用。

LCEL 使用 管道操作符(|) 来连接各个组件,类似于 Unix 管道或 Python 的函数式编程风格。

LCEL vs 传统方式对比:

# 传统方式
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
 
prompt = PromptTemplate.from_template("告诉我关于{topic}的趣事")
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.predict(topic="猫")
 
# LCEL 方式
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
 
prompt = ChatPromptTemplate.from_template("告诉我关于{topic}的趣事")
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "猫"})

LCEL 支持多种核心组件,每个组件都可以作为管道的一部分:

1. PromptTemplate

from langchain.prompts import ChatPromptTemplate
 
# 简单模板
prompt = ChatPromptTemplate.from_template("将以下文本翻译成{language}:{text}")
 
# 多消息模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的翻译助手。"),
    ("human", "将以下文本翻译成{language}:{text}"),
])

2. LLM / ChatModel

from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI
 
# Chat 模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")
 
# 文本模型
llm = OpenAI(model="gpt-3.5-turbo-instruct")

3. Output Parser

from langchain.schema.output_parser import StrOutputParser
from langchain.output_parsers import PydanticOutputParser
 
# 字符串输出解析器
str_parser = StrOutputParser()
 
# Pydantic 输出解析器
pydantic_parser = PydanticOutputParser(pydantic_object=MyModel)

4. Retriever(检索器)

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
 
# 创建向量存储
vectorstore = Chroma.from_documents(documents, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()

基础管道:

from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
 
# 创建组件
prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
model = ChatOpenAI()
output_parser = StrOutputParser()
 
# 使用管道连接
chain = prompt | model | output_parser
 
# 执行
result = chain.invoke({"topic": "程序员"})
print(result)

带上下文的 RAG 链:

from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
 
# 准备检索器
docs = [
    "LangChain 是一个用于构建 LLM 应用的框架",
    "Python 是一种流行的编程语言",
    "机器学习是人工智能的一个分支"
]
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
vectorstore = Chroma.from_texts(docs, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
 
# 创建 RAG 模板
template = """基于以下上下文回答问题:
 
上下文:
{context}
 
问题:{question}
 
回答:"""
 
prompt = ChatPromptTemplate.from_template(template)
 
# 辅助函数:格式化文档
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])
 
# 构建 RAG Chain
rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough()
    }
    | prompt
    | model
    | output_parser
)
 
# 使用
result = rag_chain.invoke("LangChain 是什么?")
print(result)

RunnablePassthrough 详解:

from langchain.schema.runnable import RunnablePassthrough, RunnableParallel
 
# RunnablePassthrough 将输入原样传递
# 常用于同时传递原始输入和处理后的数据
 
# 示例:同时获取原始问题和检索结果
chain = RunnableParallel({
    "question": RunnablePassthrough(),  # 原样传递问题
    "context": retriever | format_docs  # 检索相关文档
})
 
result = chain.invoke("什么是机器学习?")
# 结果:{"question": "什么是机器学习?", "context": "机器学习是..."}

RunnableParallel(并行执行):

from langchain.schema.runnable import RunnableParallel
 
# 并行执行多个分支
parallel_chain = RunnableParallel({
    "branch_a": prompt_a | model | output_parser,
    "branch_b": prompt_b | model | output_parser,
    "original": RunnablePassthrough()
})
 
result = parallel_chain.invoke({"input": "测试输入"})
# 结果:{"branch_a": "...", "branch_b": "...", "original": "..."}

1. 条件分支:

from langchain.schema.runnable import RunnableBranch
 
# 创建条件分支链
branch = RunnableBranch(
    # (条件, 分支)
    (lambda x: "问题" in x["input"], qa_chain),
    (lambda x: "总结" in x["input"], summary_chain),
    # 默认分支
    default_chain
)
 
result = branch.invoke({"input": "请总结这段文本"})

2. 绑定运行时参数:

# 使用 bind 方法传递额外参数
chain = (
    prompt 
    | model.bind(stop=["\nObservation:"])  # 绑定停止词
    | output_parser
)

3. 自定义 Runnable:

from langchain.schema.runnable import RunnableLambda
 
# 将普通函数转换为 Runnable
def my_transform(data: dict) -> dict:
    return {"transformed": data["input"].upper()}
 
runnable_transform = RunnableLambda(my_transform)
 
# 使用在管道中
chain = prompt | model | output_parser | runnable_transform

4. 链式调用多个输入输出:

from operator import itemgetter
 
# 使用 itemgetter 提取特定字段
chain = (
    {
        "context": itemgetter("question") | retriever | format_docs,
        "question": itemgetter("question"),
        "language": itemgetter("language")
    }
    | prompt
    | model
    | output_parser
)
 
result = chain.invoke({
    "question": "什么是AI?",
    "language": "中文"
})

5. 批处理和流式:

# 批处理
inputs = [{"topic": "猫"}, {"topic": "狗"}, {"topic": "鸟"}]
results = chain.batch(inputs)
 
# 流式输出
for chunk in chain.stream({"topic": "人工智能"}):
    print(chunk, end="", flush=True)
 
# 异步调用
result = await chain.ainvoke({"topic": "量子计算"})

Chain 的强大之处在于可以灵活组合和嵌套,构建出复杂的应用架构。

Chain 可以嵌套在其他 Chain 中,实现分层处理。

基础嵌套:

from langchain.chains import LLMChain, SequentialChain, TransformChain
from langchain.prompts import PromptTemplate
 
# 内层 Chain:分析文本情感
sentiment_prompt = PromptTemplate(
    input_variables=["text"],
    template="分析以下文本的情感(positive/negative/neutral):\n\n{text}\n\n情感:"
)
sentiment_chain = LLMChain(
    llm=llm, 
    prompt=sentiment_prompt, 
    output_key="sentiment"
)
 
# 内层 Chain:提取关键词
keyword_prompt = PromptTemplate(
    input_variables=["text"],
    template="从以下文本中提取3个关键词:\n\n{text}\n\n关键词:"
)
keyword_chain = LLMChain(
    llm=llm, 
    prompt=keyword_prompt, 
    output_key="keywords"
)
 
# 外层 Chain:根据情感和关键词生成回复
response_prompt = PromptTemplate(
    input_variables=["sentiment", "keywords", "original_text"],
    template="""基于以下信息生成回复:
原文情感:{sentiment}
关键词:{keywords}
原文:{original_text}
 
请生成适当的回复:"""
)
response_chain = LLMChain(
    llm=llm,
    prompt=response_prompt,
    output_key="response"
)
 
# 使用 SequentialChain 组合
analysis_chain = SequentialChain(
    chains=[sentiment_chain, keyword_chain, response_chain],
    input_variables=["text"],
    output_variables=["sentiment", "keywords", "response"],
    verbose=True
)
 
result = analysis_chain.invoke({
    "text": "这个产品真的太棒了,完全超出我的预期!"
})
print(result)

模式1:分治处理

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
 
class DivideAndConquerChain:
    """
    分治处理 Chain
    将长文本分割,分别处理,然后合并结果
    """
 
    def __init__(self, llm, chunk_size=1000):
        self.llm = llm
        self.chunk_size = chunk_size
 
        # 子任务 Chain
        self.summarize_prompt = PromptTemplate(
            input_variables=["chunk"],
            template="请总结以下内容:\n\n{chunk}\n\n摘要:"
        )
        self.summarize_chain = LLMChain(llm=llm, prompt=self.summarize_prompt)
 
        # 合并 Chain
        self.merge_prompt = PromptTemplate(
            input_variables=["summaries"],
            template="基于以下摘要,生成一个统一的总结:\n\n{summaries}\n\n统一总结:"
        )
        self.merge_chain = LLMChain(llm=llm, prompt=self.merge_prompt)
 
    def split_text(self, text):
        """分割文本"""
        words = text.split()
        chunks = []
        current_chunk = []
        current_size = 0
 
        for word in words:
            current_chunk.append(word)
            current_size += len(word) + 1
 
            if current_size >= self.chunk_size:
                chunks.append(" ".join(current_chunk))
                current_chunk = []
                current_size = 0
 
        if current_chunk:
            chunks.append(" ".join(current_chunk))
 
        return chunks
 
    def process(self, text):
        """执行分治处理"""
        # 1. 分割文本
        chunks = self.split_text(text)
 
        # 2. 分别总结
        summaries = []
        for chunk in chunks:
            result = self.summarize_chain.invoke({"chunk": chunk})
            summaries.append(result["text"])
 
        # 3. 合并结果
        combined = "\n\n".join([f"片段 {i+1}: {s}" for i, s in enumerate(summaries)])
        final = self.merge_chain.invoke({"summaries": combined})
 
        return {
            "chunk_count": len(chunks),
            "chunk_summaries": summaries,
            "final_summary": final["text"]
        }
 
# 使用
long_text = """[这里是一段很长的文本...]"""
 
dac_chain = DivideAndConquerChain(llm=llm, chunk_size=500)
result = dac_chain.process(long_text)
print(f"分成了 {result['chunk_count']} 个片段")
print(f"最终总结: {result['final_summary']}")

模式2:多路径处理(类似 MapReduce)

from concurrent.futures import ThreadPoolExecutor
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
 
class MultiPathChain:
    """
    多路径处理 Chain
    同时从多个角度处理问题,然后综合结果
    """
 
    def __init__(self, llm):
        self.llm = llm
 
        # 多个不同角度的 Chain
        self.perspectives = {
            "technical": LLMChain(
                llm=llm,
                prompt=PromptTemplate(
                    input_variables=["topic"],
                    template="从技术角度分析 {topic}:"
                ),
                output_key="text"
            ),
            "business": LLMChain(
                llm=llm,
                prompt=PromptTemplate(
                    input_variables=["topic"],
                    template="从商业角度分析 {topic}:"
                ),
                output_key="text"
            ),
            "social": LLMChain(
                llm=llm,
                prompt=PromptTemplate(
                    input_variables=["topic"],
                    template="从社会影响角度分析 {topic}:"
                ),
                output_key="text"
            )
        }
 
        # 综合 Chain
        self.synthesis_chain = LLMChain(
            llm=llm,
            prompt=PromptTemplate(
                input_variables=["technical", "business", "social", "topic"],
                template="""基于以下多角度分析,为"{topic}"生成一个全面的评估报告:
 
技术分析:
{technical}
 
商业分析:
{business}
 
社会影响分析:
{social}
 
综合报告:"""
            ),
            output_key="text"
        )
 
    def process(self, topic):
        """执行多路径处理"""
        # 并行执行多个视角的分析
        results = {}
        for name, chain in self.perspectives.items():
            result = chain.invoke({"topic": topic})
            results[name] = result["text"]
 
        # 综合结果
        synthesis = self.synthesis_chain.invoke({
            "topic": topic,
            **results
        })
 
        return {
            "topic": topic,
            **results,
            "synthesis": synthesis["text"]
        }
 
# 使用
multi_chain = MultiPathChain(llm=llm)
result = multi_chain.process("人工智能")
 
print("=== 技术分析 ===")
print(result["technical"])
print("\n=== 商业分析 ===")
print(result["business"])
print("\n=== 社会影响分析 ===")
print(result["social"])
print("\n=== 综合报告 ===")
print(result["synthesis"])

模式3:反馈循环 Chain

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
 
class IterativeRefinementChain:
    """
    迭代优化 Chain
    通过多次迭代逐步改进结果
    """
 
    def __init__(self, llm, max_iterations=3):
        self.llm = llm
        self.max_iterations = max_iterations
 
        # 生成 Chain
        self.generate_chain = LLMChain(
            llm=llm,
            prompt=PromptTemplate(
                input_variables=["task", "feedback"],
                template="""任务:{task}
 
上一轮反馈:{feedback}
 
请根据反馈改进你的回答:"""
            ),
            output_key="text"
        )
 
        # 评估 Chain
        self.evaluate_chain = LLMChain(
            llm=llm,
            prompt=PromptTemplate(
                input_variables=["task", "output"],
                template="""任务:{task}
 
当前输出:{output}
 
请评估这个输出,指出需要改进的地方。如果没有问题,回复"满意"。
 
评估:"""
            ),
            output_key="text"
        )
 
    def process(self, task):
        """执行迭代优化"""
        current_output = "这是初始版本。"
        feedback = "请开始生成。"
        history = []
 
        for i in range(self.max_iterations):
            # 生成
            result = self.generate_chain.invoke({
                "task": task,
                "feedback": feedback
            })
            current_output = result["text"]
 
            history.append({
                "iteration": i + 1,
                "output": current_output
            })
 
            # 评估
            eval_result = self.evaluate_chain.invoke({
                "task": task,
                "output": current_output
            })
            feedback = eval_result["text"]
 
            # 检查是否满意
            if "满意" in feedback:
                break
 
        return {
            "final_output": current_output,
            "iterations": len(history),
            "history": history,
            "final_feedback": feedback
        }
 
# 使用
refinement_chain = IterativeRefinementChain(llm=llm, max_iterations=3)
result = refinement_chain.process("写一篇关于环保的文章开头(100字)")
 
print(f"迭代次数: {result['iterations']}")
print(f"最终结果: {result['final_output']}")

1. 单一职责原则

每个 Chain 应该只负责一个明确的任务,避免过于复杂的 Chain。

# 不好的做法:一个 Chain 做太多事情
bad_chain = LLMChain(
    prompt=PromptTemplate(
        template="翻译、总结并评价以下内容:{text}",  # 做了三件事!
        input_variables=["text"]
    ),
    llm=llm
)
 
# 好的做法:分解为多个单一职责的 Chain
translate_chain = LLMChain(prompt=translate_prompt, llm=llm, output_key="translated")
summarize_chain = LLMChain(prompt=summarize_prompt, llm=llm, output_key="summary")
evaluate_chain = LLMChain(prompt=evaluate_prompt, llm=llm, output_key="evaluation")
 
# 用 SequentialChain 组合
good_chain = SequentialChain(
    chains=[translate_chain, summarize_chain, evaluate_chain],
    input_variables=["text"],
    output_variables=["translated", "summary", "evaluation"]
)

2. 合理命名变量

使用清晰、有意义的输入输出键名。

# 不好的命名
chain = LLMChain(
    prompt=prompt,
    llm=llm,
    output_key="x"  # 不清晰
)
 
# 好的命名
chain = LLMChain(
    prompt=prompt,
    llm=llm,
    output_key="extracted_entities"  # 清晰表达含义
)

3. 错误处理

为 Chain 添加适当的错误处理机制。

from langchain.chains.base import Chain
 
class SafeChain(Chain):
    """带错误处理的 Chain 包装器"""
 
    base_chain: Chain
    fallback_value: str = "处理失败"
 
    @property
    def input_keys(self):
        return self.base_chain.input_keys
 
    @property
    def output_keys(self):
        return self.base_chain.output_keys
 
    def _call(self, inputs):
        try:
            return self.base_chain.invoke(inputs)
        except Exception as e:
            # 记录错误
            print(f"Chain 执行错误: {e}")
            # 返回默认值
            return {key: self.fallback_value for key in self.output_keys}

4. 使用 Memory 管理状态

对于需要维护上下文的 Chain,使用 Memory 组件。

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
 
# 创建带记忆的 Chain
memory = ConversationBufferMemory()
conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True
)
 
# 多次对话保持上下文
conversation.predict(input="你好,我叫张三")
conversation.predict(input="我叫什么名字?")  # 会记住之前的对话

本章深入讲解了 LangChain 中的 Chain(链)组件,这是构建复杂 LLM 应用的核心工具。

关键知识点回顾:

4.1 Chain 基础概念 * Chain 是将多个组件串联执行的抽象概念 * 实现了统一的接口(input_keys、output_keys、_call、_acall) * 优势包括模块化设计、可组合性、统一接口、内置功能支持

4.2 LLMChain * 最基础的 Chain 类型,组合了 PromptTemplate 和 LLM * 支持丰富的参数配置(output_key、verbose、callbacks 等) * 可以通过 OutputParser 将输出解析为结构化数据

4.3 SequentialChain * SimpleSequentialChain:简单顺序执行,单输入单输出传递 * SequentialChain:更灵活,支持多输入多输出和变量映射 * 理解输出传递机制对于构建复杂流水线至关重要

4.4 RouterChain * 根据输入特征动态选择处理路径 * MultiPromptChain:内置的多目的地路由实现 * 可以自定义路由逻辑实现更复杂的决策

4.5 TransformChain * 用于 Chain 之间的数据转换 * 可以实现文本清洗、格式转换、数据解析等功能 * 是连接不同 Chain 的“胶水”

4.6 自定义 Chain * 通过继承 BaseChain 创建自定义 Chain * 必须实现 input_keys、output_keys、_call、_acall 方法 * 可以添加自定义配置参数和方法

4.7 LCEL (LangChain Expression Language) * LangChain 的新一代声明式语法 * 使用管道操作符(|)连接组件 * 支持 RunnablePassthrough、RunnableParallel、RunnableBranch 等高级特性

4.8 Chain 的组合与嵌套 * Chain 可以灵活组合和嵌套 * 常见模式:分治处理、多路径处理、迭代优化 * 遵循最佳实践:单一职责、清晰命名、错误处理、内存管理

学习建议:

1. 从简单开始:先掌握 LLMChain 和 SequentialChain 的基础用法 2. 实践为主:通过实际项目练习 Chain 的组合和嵌套 3. 理解原理:深入理解 Chain 的执行流程和数据传递机制 4. 关注新版本:LCEL 是未来趋势,建议在新项目中优先使用 5. 善用调试:利用 verbose=True 和回调函数调试 Chain

下一步学习:

掌握了 Chain 之后,建议继续学习: * Memory 组件:为 Chain 添加记忆能力 * Agent:让 LLM 自主决策并调用工具 * Document Loaders 和 Vector Stores:构建 RAG 应用 * Evaluation:评估 Chain 的性能和输出质量

Chain 是 LangChain 的骨架,熟练运用 Chain 将使你能够构建出功能强大、结构清晰的 LLM 应用程序。

1. 创建一个 LLMChain,将用户的输入翻译成指定语言,并使用 PydanticOutputParser 解析出翻译结果和语言检测信息。

2. 使用 SequentialChain 构建一个内容创作流水线:先生成文章标题,再基于标题生成大纲,最后基于大纲生成完整文章。

3. 实现一个 RouterChain,能够根据用户问题类型(编程、写作、翻译)路由到不同的处理 Chain。

4. 使用 TransformChain 创建一个数据清洗 Chain,能够去除文本中的 HTML 标签、多余空格和特殊字符。

5. 使用 LCEL 语法重构本章中的任意一个复杂 Chain,对比传统方式和 LCEL 方式的代码差异。

6. 创建一个自定义 Chain,实现一个简单的问答系统,包含文档检索、答案生成和答案验证三个步骤。

该主题尚不存在

您访问的页面并不存在。如果允许,您可以使用创建该页面按钮来创建它。

  • langchain二次开发/核心组件详解_chains.1775185100.txt.gz
  • 最后更改: 2026/04/03 10:58
  • 张叶安