====== 第十一章:工程能力详解 - Streaming 与 Middleware ====== 当 LLM 应用进入真实生产环境后,用户最先感知到的往往不是“模型强不强”,而是“响应快不快、过程可不可见、系统稳不稳定”。Streaming 和 Middleware 正是生产体验的关键:前者解决实时反馈,后者解决运行时治理。 本章将从流式输出的原理、消息流和事件流、Agent 轨迹可视化、中间件的职责、模型路由、安全治理、工具监控,到生产环境中如何设计日志与限流展开,并给出更具体的代码示例。 ===== 11.1 为什么需要 Streaming ===== 如果一个回答需要 8 秒,用户要等 8 秒后才看到全部结果,体验通常很差;但如果第 1 秒就看到模型开始输出,主观等待感会明显降低。Streaming 的价值包括: * 提前反馈,降低焦虑; * 提升“系统正在工作”的可感知性; * 适合长文本、长回答、多步骤推理; * 有利于前端逐步渲染。 ===== 11.2 流式输出的层次 ===== 根据 LangChain 官方文档,流式通常可以理解成三个层次: * **文本块流式**:逐 chunk 输出模型内容; * **消息流式**:输出消息块或工具调用块; * **事件流式**:输出节点开始、工具执行、链路更新等运行时事件。 对 Agent 和 LangGraph 来说,事件流尤其重要,因为用户关心的不只是“最后一句话”,还关心“系统在做什么”。 ===== 11.3 模型流式输出示例 ===== ==== 11.3.1 最小流式文本示例 ==== from langchain_openai import ChatOpenAI model = ChatOpenAI(model="gpt-4o-mini", streaming=True) full = None for chunk in model.stream("请用三句话解释为什么企业知识库问答要做引用。"): print(chunk.text, end="") full = chunk if full is None else full + chunk print(" ---完整内容---") print(full.content) 这个例子展示了两件事: * 你可以边接收边输出; * 你也可以在最后把所有 chunk 聚合成完整消息。 ===== 11.4 Agent 流式轨迹 ===== LangChain 官方文档支持通过 `stream()` 观察 Agent 的运行过程。 ==== 11.4.1 查看更新流 ==== for update in agent.stream( {"messages": [{"role": "user", "content": "查询北京天气,并给出明天出行建议。"}]}, stream_mode="updates", ): print(update) 在真实项目中,这些更新可以映射成前端状态: * 正在理解问题; * 正在调用天气工具; * 正在汇总结论; * 已完成。 ==== 11.4.2 查看消息流 ==== 官方文档还提到可以使用 `stream_mode="messages"`。这类模式更适合前端边生成边渲染回复文本。 for token, metadata in agent.stream( {"messages": [{"role": "user", "content": "帮我写一封请假邮件草稿。"}]}, stream_mode="messages", ): print(token, metadata) ===== 11.5 什么是 Middleware ===== Middleware 可以理解为“运行时中间层”。它不直接定义业务能力,而是在调用前后插入治理逻辑,例如: * 日志; * 模型路由; * 限流; * 输入输出过滤; * 安全审计; * 工具监控; * 统一错误处理。 如果说 Prompt 决定“让模型怎么回答”,Middleware 更像是“系统如何监管整个调用过程”。 ===== 11.6 模型路由中间件 ===== LangChain 官方中间件文档提供了 `@wrap_model_call` 的模式,可以动态切换模型。 from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse from langchain_openai import ChatOpenAI basic_model = ChatOpenAI(model="gpt-4o-mini") advanced_model = ChatOpenAI(model="gpt-4.1") @wrap_model_call def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse: # 一个简单示意:消息越多,使用越强模型 request.model = advanced_model if len(request.state["messages"]) > 10 else basic_model return handler(request) 这种模式非常适合: * 会话越复杂越切到高质量模型; * 普通问题走低成本模型; * 某模型不可用时切换备用模型。 ===== 11.7 工具监控中间件 ===== 对于 Agent 项目,工具调用往往是最需要被观测的部分。官方自定义中间件文档支持 `@wrap_tool_call`。 from collections.abc import Callable from langchain.agents.middleware import wrap_tool_call from langchain.messages import ToolMessage from langchain.tools.tool_node import ToolCallRequest from langgraph.types import Command import time @wrap_tool_call def monitor_tool( request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command], ) -> ToolMessage | Command: start = time.time() try: result = handler(request) elapsed = time.time() - start print(f"tool={request.tool_call['name']} cost={elapsed:.2f}s") return result except Exception as e: print(f"tool={request.tool_call['name']} failed: {e}") raise 你可以在这里扩展: * 写 Prometheus 指标; * 打 trace; * 记录慢调用; * 审计高风险工具。 ===== 11.8 输入安全与 Guardrails ===== Middleware 很适合做输入、输出层的安全治理,例如: * 敏感词检测; * PII 脱敏; * Prompt Injection 检查; * 越权请求拦截; * 高风险动作黑名单。 ==== 11.8.1 一个简单输入检查示例 ==== def contains_sensitive_command(text: str) -> bool: blocked = ["删除所有数据", "导出全部客户信息", "绕过审批"] return any(word in text for word in blocked) 如果在消息进入模型前发现命中,可以: * 直接阻断; * 转人工审核; * 打高风险标签。 ===== 11.9 生产环境中的 Streaming 体验设计 ===== 一个好的流式 UI 不应只显示 token,而要显示状态感知: * 正在理解问题; * 正在检索知识库; * 正在调用订单系统; * 正在生成回复; * 需要你补充订单号。 这样做的好处是: * 用户不容易误以为系统卡死; * Agent 的行为更透明; * 更适合复杂任务场景。 ===== 11.10 生产日志建议 ===== 真实项目至少记录: * `trace_id` / `request_id`; * 用户输入摘要; * 使用的模型和版本; * 总 token; * 工具调用列表; * 检索命中文档; * 总耗时; * 错误类型。 log_record = { "trace_id": "trace-20260403-001", "model": "gpt-4o-mini", "tool_calls": ["query_order", "query_inventory"], "latency_ms": 1820, "prompt_tokens": 1350, "completion_tokens": 268, "status": "success" } ===== 11.11 一个“流式 + 中间件”的综合示例 ===== from langchain.agents import create_agent from langchain_openai import ChatOpenAI from langchain.agents.middleware import wrap_model_call basic_model = ChatOpenAI(model="gpt-4o-mini") advanced_model = ChatOpenAI(model="gpt-4.1") @wrap_model_call def dynamic_model_selection(request, handler): request.model = advanced_model if len(request.state["messages"]) > 8 else basic_model return handler(request) agent = create_agent( model=basic_model, tools=[], middleware=[dynamic_model_selection], system_prompt="你是企业知识助手。" ) for update in agent.stream( {"messages": [{"role": "user", "content": "请总结一下 2026 年一季度销售复盘要点。"}]}, stream_mode="updates", ): print(update) 这个例子说明: * 模型可以在运行时动态切换; * 运行过程可以通过流式方式观察; * Middleware 与 Streaming 可以天然组合。 ===== 11.12 本章小结 ===== * Streaming 提升的是可感知速度和过程透明度; * Middleware 提升的是运行时治理能力; * 模型路由、工具监控、安全拦截都适合做成中间件; * 生产系统不应只返回最终答案,还应能观测中间过程; * Agent、RAG、结构化输出进入生产后,都离不开 Streaming 和 Middleware。 ===== 练习 ===== 1. 为一个知识问答系统设计流式展示方案,区分“检索中”“生成中”“完成”三种状态。 2. 使用 `@wrap_model_call` 编写一个模型路由中间件:简单问题走低成本模型,复杂问题走高质量模型。 3. 使用 `@wrap_tool_call` 为你的工具调用增加耗时日志。 4. 列出你业务中的 5 个高风险输入,并设计相应的输入拦截规则。 5. 设计一份日志字段规范,说明哪些字段用于排错,哪些字段用于成本分析。 ===== 参考资源 ===== * [[https://docs.langchain.com/oss/python/langchain/streaming|LangChain 官方文档:Streaming]] * [[https://docs.langchain.com/oss/python/langchain/middleware|LangChain 官方文档:Middleware]] * [[https://docs.langchain.com/oss/python/langchain/middleware/custom|LangChain 官方文档:Custom middleware]] ===== 11.13 补充案例:FastAPI 中实现流式输出 ===== 如果你准备把 LangChain 项目接到 Web 前端,一个常见需求是通过 HTTP 持续把生成内容推给浏览器。下面给出一个极简 FastAPI 流式接口示意: from fastapi import FastAPI from fastapi.responses import StreamingResponse from langchain_openai import ChatOpenAI app = FastAPI() model = ChatOpenAI(model="gpt-4o-mini", streaming=True) @app.get("/stream") def stream_answer(q: str): def generate(): for chunk in model.stream(q): if chunk.text: yield chunk.text return StreamingResponse(generate(), media_type="text/plain") 生产环境里你通常会改成: * SSE; * WebSocket; * 带 trace_id 的事件流; * 同时推送“步骤状态 + 文本块”。 ===== 11.14 补充案例:输入安全中间件 ===== from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse @wrap_model_call def block_sensitive_input(request: ModelRequest, handler) -> ModelResponse: last_message = request.state["messages"][-1] text = getattr(last_message, "content", str(last_message)) blocked_phrases = ["导出全部客户信息", "绕过审批", "删除所有订单"] if any(phrase in text for phrase in blocked_phrases): raise ValueError("检测到高风险请求,已阻止执行") return handler(request) 这类中间件适合作为第一道“输入闸门”。即使它不完美,也比完全裸奔强很多。