LangChain 3 工作流编排
1. 工作流编排是什么?
- 就是把多个步骤(如提示词构建、模型推理、输出解析、工具调用等)组合成一条自动化流水线。
- 你可以像搭积木一样,把 Prompt、LLM、OutputParser、甚至外部 API、数据库、工具(Tool/Agent)串起来,实现复杂业务流程。
2. 编排方式
LangChain 推荐的工作流编排方式有两种主流:
-
Pipe(管道式)编排
- 用
|把每一步组件串联起来,数据自动在各环节流转。 - 常见于 Prompt → LLM → OutputParser。
- 代码举例(最基础串联):
1
2chain = prompt | llm | output_parser
result = chain.invoke({"input": "帮我写一篇AI科普文章"})
- 用
-
控制流(Chain/Router/Branch)编排
- 用 Chain 对象(如 SequentialChain, SimpleSequentialChain, MultiRouteChain 等),灵活实现多步、条件分支、循环、工具调用等更复杂的流程。
- 可以实现:
- 串行多步任务
- 动态分支
- 结果汇总/路由/多模型混合
3. 单模型的同步流式输出(Streaming)
适用场景: 只需要调用单一模型(如 ChatOpenAI),获取输出的每一小段(chunk)并实时处理,无需异步。
1 | |
说明:
model.stream()方法可以实时获取 LLM 输出的每一小段内容(chunk)。- 适合做“打字机”效果、对话流式显示、前端体验增强等场景。
- 你可以边收集结果(append),边在终端/网页实时展示。
- 属于同步方式调用
4. 基于 Chain 的异步流式输出
适用场景: 多步链式工作流(如 prompt → model → output_parser),需要异步处理和流式消费大模型输出。
1 | |
说明:
chain.astream()支持异步流式消费,可以实时输出每一段内容,适合 WebSocket、前端“打字机”体验。- 多步链路自动传递数据,链式结构可扩展性强。
5. LangChain 异步流式输出 JSON 格式
基本用法
- 通过
JsonOutputParser让大模型输出自动解析为 JSON 格式。 - 结合异步流式 (
chain.astream) 实现边生成边消费,适合大数据量、实时展示等场景。
1 | |
输出:
1 | |
- 如果不用
JsonOutputParser()会返回一堆很乱的东西
6. Stream events(事件流)
-
事件流(Stream Events) 是 LangChain 新引入的功能(beta API),用于追踪/监听 LLM 流式输出过程中的事件,比如每一步产出的 token、消息或阶段性结果。
-
这相比传统的
stream和astream更细粒度,适合更高级的流式控制、前端交互、链路追踪等需求。 -
需
langchain-core >= 0.2,推荐使用 async/await,流式操作建议用.astream()。 -
事件流的典型应用与注意事项
- 用于前端进度条、可视化展示、链路调试和详细日志追踪。
- 在流式传输中,只有输入全部消费完成才会触发 end 事件(inputs 通常只在 end 事件里完整返回)。
-
常见事件参考表
| 事件 | 名称 | 输入(示例) | 输出(示例) |
|---|---|---|---|
| on_chat_model_start | [模型名称] | {"messages": [[SystemMessage, HumanMessage]]} |
|
| on_chat_model_end | [模型名称] | {"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
| on_llm_start | [模型名称] | {'input': 'hello'} |
|
| on_llm_stream | [模型名称] | 'Hello' |
|
| on_llm_end | [模型名称] | 'Hello human!' |
|
| on_chain_start | format_docs | ||
| on_chain_stream | format_docs | "hello world!, goodbye world!" |
|
| on_chain_end | format_docs | [Document(...)] |
"hello world!, goodbye world!" |
| on_tool_start | some_tool | {"x": 1, "y": "2"} |
|
| on_tool_end | some_tool | {"x": 1, "y": "2"} |
|
| on_retriever_start | [检索器名称] | {"query": "hello"} |
|
| on_retriever_end | [检索器名称] | {"query": "hello"} |
[Document(...), ..] |
| on_prompt_start | [模板名称] | {"question": "hello"} |
|
| on_prompt_end | [模板名称] | {"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
- LangChain 事件流(astream_events)监听示例: 实时监听 LLM(如 gpt-4)推理过程中的所有事件,将事件对象逐一收集到列表(如日志、调试、追踪用途)
1 | |
7. LangChain + asyncio 同步/异步任务串行与并发调用
场景:需要依次执行多个 LLM 相关异步任务(比如先问 GPT-4 一个问题,再问 GPT-4o 另一个问题),每次等上一个任务流式结束才开始下一个。
Python asyncio ≠ 多线程,不是“真正的并行”!
- asyncio 提供的是协程(coroutine),本质上是单线程+多任务,任务之间是并发(concurrent),不是并行(parallel)。
- 并发:任务可以你一下我一下轮流执行,遇到I/O就切换(适合网络、文件流等“等”的场景)。
- 并行:同一时刻多个CPU核心真的同时干活,只有多线程/多进程才是真并行。
总结:
- asyncio == 协程 == 并发 ≠ 并行(不是多线程/多进程)
- 真正多核并行运算要用多线程/多进程(但API型I/O任务,协程足够了)。
1 | |
-
关键点
- await task1() / await task2():表示串行/顺序调用,等 task1 完全执行结束后,才会执行 task2,适合有前后依赖关系的流程。
- asyncio.gather(task1(), task2()):表示协程并发(不是多线程/并行),让多个任务“看起来同时”运行,I/O等待时自动切换,适合多个互不依赖的 I/O 密集型任务。
- 流式输出:
async for chunk in ... astream(...):能逐步获取 LLM 生成内容,每一块(chunk)立即处理,提高响应速度和用户体验。
-
应用场景
- 有前后依赖/顺序要求的 LLM 数据处理/分析,建议用“串行”方式(await task1(); await task2())。
- 多个任务互不依赖、需要加速整体效率时,可以用 asyncio.gather 实现“协程并发”。
- 记住:CPU密集型并行要用多线程/多进程;API/网络/流式I/O密集用协程即可。
参考资料
“觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”
微信支付
支付宝支付
LangChain 3 工作流编排
http://neurowave.tech/2025/06/27/2-12-LLM-Langchain3/