目录

第十一章:工程能力详解 - Streaming 与 Middleware

当 LLM 应用进入真实生产环境后,用户最先感知到的往往不是“模型强不强”,而是“响应快不快、过程可不可见、系统稳不稳定”。Streaming 和 Middleware 正是生产体验的关键:前者解决实时反馈,后者解决运行时治理。

本章将从流式输出的原理、消息流和事件流、Agent 轨迹可视化、中间件的职责、模型路由、安全治理、工具监控,到生产环境中如何设计日志与限流展开,并给出更具体的代码示例。

11.1 为什么需要 Streaming

如果一个回答需要 8 秒,用户要等 8 秒后才看到全部结果,体验通常很差;但如果第 1 秒就看到模型开始输出,主观等待感会明显降低。Streaming 的价值包括:

11.2 流式输出的层次

根据 LangChain 官方文档,流式通常可以理解成三个层次:

对 Agent 和 LangGraph 来说,事件流尤其重要,因为用户关心的不只是“最后一句话”,还关心“系统在做什么”。

11.3 模型流式输出示例

11.3.1 最小流式文本示例

from langchain_openai import ChatOpenAI
 
model = ChatOpenAI(model="gpt-4o-mini", streaming=True)
full = None
for chunk in model.stream("请用三句话解释为什么企业知识库问答要做引用。"):
    print(chunk.text, end="")
    full = chunk if full is None else full + chunk
 
print("
---完整内容---")
print(full.content)

这个例子展示了两件事:

11.4 Agent 流式轨迹

LangChain 官方文档支持通过 `stream()` 观察 Agent 的运行过程。

11.4.1 查看更新流

for update in agent.stream(
    {"messages": [{"role": "user", "content": "查询北京天气,并给出明天出行建议。"}]},
    stream_mode="updates",
):
    print(update)

在真实项目中,这些更新可以映射成前端状态:

11.4.2 查看消息流

官方文档还提到可以使用 `stream_mode=“messages”`。这类模式更适合前端边生成边渲染回复文本。

for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "帮我写一封请假邮件草稿。"}]},
    stream_mode="messages",
):
    print(token, metadata)

11.5 什么是 Middleware

Middleware 可以理解为“运行时中间层”。它不直接定义业务能力,而是在调用前后插入治理逻辑,例如:

如果说 Prompt 决定“让模型怎么回答”,Middleware 更像是“系统如何监管整个调用过程”。

11.6 模型路由中间件

LangChain 官方中间件文档提供了 `@wrap_model_call` 的模式,可以动态切换模型。

from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain_openai import ChatOpenAI
 
basic_model = ChatOpenAI(model="gpt-4o-mini")
advanced_model = ChatOpenAI(model="gpt-4.1")
 
@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
    # 一个简单示意:消息越多,使用越强模型
    request.model = advanced_model if len(request.state["messages"]) > 10 else basic_model
    return handler(request)

这种模式非常适合:

11.7 工具监控中间件

对于 Agent 项目,工具调用往往是最需要被观测的部分。官方自定义中间件文档支持 `@wrap_tool_call`。

from collections.abc import Callable
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
from langchain.tools.tool_node import ToolCallRequest
from langgraph.types import Command
import time
 
@wrap_tool_call
def monitor_tool(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
    start = time.time()
    try:
        result = handler(request)
        elapsed = time.time() - start
        print(f"tool={request.tool_call['name']} cost={elapsed:.2f}s")
        return result
    except Exception as e:
        print(f"tool={request.tool_call['name']} failed: {e}")
        raise

你可以在这里扩展:

11.8 输入安全与 Guardrails

Middleware 很适合做输入、输出层的安全治理,例如:

11.8.1 一个简单输入检查示例

def contains_sensitive_command(text: str) -> bool:
    blocked = ["删除所有数据", "导出全部客户信息", "绕过审批"]
    return any(word in text for word in blocked)

如果在消息进入模型前发现命中,可以:

11.9 生产环境中的 Streaming 体验设计

一个好的流式 UI 不应只显示 token,而要显示状态感知:

这样做的好处是:

11.10 生产日志建议

真实项目至少记录:

log_record = {
    "trace_id": "trace-20260403-001",
    "model": "gpt-4o-mini",
    "tool_calls": ["query_order", "query_inventory"],
    "latency_ms": 1820,
    "prompt_tokens": 1350,
    "completion_tokens": 268,
    "status": "success"
}

11.11 一个“流式 + 中间件”的综合示例

from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.agents.middleware import wrap_model_call
 
basic_model = ChatOpenAI(model="gpt-4o-mini")
advanced_model = ChatOpenAI(model="gpt-4.1")
 
@wrap_model_call
def dynamic_model_selection(request, handler):
    request.model = advanced_model if len(request.state["messages"]) > 8 else basic_model
    return handler(request)
 
agent = create_agent(
    model=basic_model,
    tools=[],
    middleware=[dynamic_model_selection],
    system_prompt="你是企业知识助手。"
)
 
for update in agent.stream(
    {"messages": [{"role": "user", "content": "请总结一下 2026 年一季度销售复盘要点。"}]},
    stream_mode="updates",
):
    print(update)

这个例子说明:

11.12 本章小结

练习

1. 为一个知识问答系统设计流式展示方案,区分“检索中”“生成中”“完成”三种状态。

2. 使用 `@wrap_model_call` 编写一个模型路由中间件:简单问题走低成本模型,复杂问题走高质量模型。

3. 使用 `@wrap_tool_call` 为你的工具调用增加耗时日志。

4. 列出你业务中的 5 个高风险输入,并设计相应的输入拦截规则。

5. 设计一份日志字段规范,说明哪些字段用于排错,哪些字段用于成本分析。

参考资源

11.13 补充案例:FastAPI 中实现流式输出

如果你准备把 LangChain 项目接到 Web 前端,一个常见需求是通过 HTTP 持续把生成内容推给浏览器。下面给出一个极简 FastAPI 流式接口示意:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
 
app = FastAPI()
model = ChatOpenAI(model="gpt-4o-mini", streaming=True)
 
@app.get("/stream")
def stream_answer(q: str):
    def generate():
        for chunk in model.stream(q):
            if chunk.text:
                yield chunk.text
    return StreamingResponse(generate(), media_type="text/plain")

生产环境里你通常会改成:

11.14 补充案例:输入安全中间件

from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
 
@wrap_model_call
def block_sensitive_input(request: ModelRequest, handler) -> ModelResponse:
    last_message = request.state["messages"][-1]
    text = getattr(last_message, "content", str(last_message))
 
    blocked_phrases = ["导出全部客户信息", "绕过审批", "删除所有订单"]
    if any(phrase in text for phrase in blocked_phrases):
        raise ValueError("检测到高风险请求,已阻止执行")
 
    return handler(request)

这类中间件适合作为第一道“输入闸门”。即使它不完美,也比完全裸奔强很多。