LangChain 3 工作流编排

1. 工作流编排是什么?

  • 就是把多个步骤(如提示词构建、模型推理、输出解析、工具调用等)组合成一条自动化流水线
  • 你可以像搭积木一样,把 Prompt、LLM、OutputParser、甚至外部 API、数据库、工具(Tool/Agent)串起来,实现复杂业务流程。

2. 编排方式

LangChain 推荐的工作流编排方式有两种主流:

  1. Pipe(管道式)编排

    • | 把每一步组件串联起来,数据自动在各环节流转。
    • 常见于 Prompt → LLM → OutputParser。
    • 代码举例(最基础串联):
      1
      2
      chain = prompt | llm | output_parser
      result = chain.invoke({"input": "帮我写一篇AI科普文章"})
  2. 控制流(Chain/Router/Branch)编排

    • 用 Chain 对象(如 SequentialChain, SimpleSequentialChain, MultiRouteChain 等),灵活实现多步、条件分支、循环、工具调用等更复杂的流程。
    • 可以实现:
      • 串行多步任务
      • 动态分支
      • 结果汇总/路由/多模型混合

3. 单模型的同步流式输出(Streaming)

适用场景: 只需要调用单一模型(如 ChatOpenAI),获取输出的每一小段(chunk)并实时处理,无需异步。

1
2
3
4
5
6
7
8
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

chunks = []
for chunk in model.stream("天空是什么颜色?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)

说明:

  • model.stream() 方法可以实时获取 LLM 输出的每一小段内容(chunk)。
  • 适合做“打字机”效果、对话流式显示、前端体验增强等场景。
  • 你可以边收集结果(append),边在终端/网页实时展示。
  • 属于同步方式调用

4. 基于 Chain 的异步流式输出

适用场景: 多步链式工作流(如 prompt → model → output_parser),需要异步处理和流式消费大模型输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
import asyncio

prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
model = ChatOpenAI(model="gpt-4")
parser = StrOutputParser()
chain = prompt | model | parser

async def async_stream():
async for chunk in chain.astream({"topic": "鹦鹉"}):
print(chunk, end="|", flush=True)

asyncio.run(async_stream())

说明:

  • chain.astream() 支持异步流式消费,可以实时输出每一段内容,适合 WebSocket、前端“打字机”体验。
  • 多步链路自动传递数据,链式结构可扩展性强。

5. LangChain 异步流式输出 JSON 格式

基本用法

  • 通过 JsonOutputParser 让大模型输出自动解析为 JSON 格式。
  • 结合异步流式 (chain.astream) 实现边生成边消费,适合大数据量、实时展示等场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from langchain_core.output_parsers import JsonOutputParser
from langchain_openai import ChatOpenAI

# 初始化大模型
model = ChatOpenAI(model="gpt-4")

# 构建链式工作流:模型输出 | JSON解析器
chain = (
model | JsonOutputParser()
)

# 异步流式消费模型输出,自动解析为 JSON 结构
async def async_stream():
async for text in chain.astream(
"以JSON格式输出法国、西班牙和日本的国家及其人口列表。"
"使用一个带有'countries'外部键的字典,其中包含国家列表。"
"每个国家都应该有键'name'和'population'"
):
print(text, flush=True)

asyncio.run(async_stream())

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 670}]}
{'countries': [{'name': 'France', 'population': 670810}]}
{'countries': [{'name': 'France', 'population': 67081000}]}
{'countries': [{'name': 'France', 'population': 67081000}, {}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 467330}]}
  • 如果不用 JsonOutputParser() 会返回一堆很乱的东西

6. Stream events(事件流)

  • 事件流(Stream Events) 是 LangChain 新引入的功能(beta API),用于追踪/监听 LLM 流式输出过程中的事件,比如每一步产出的 token、消息或阶段性结果。

  • 这相比传统的 streamastream 更细粒度,适合更高级的流式控制、前端交互、链路追踪等需求。

  • langchain-core >= 0.2,推荐使用 async/await,流式操作建议用 .astream()

  • 事件流的典型应用与注意事项

    • 用于前端进度条、可视化展示、链路调试和详细日志追踪。
    • 在流式传输中,只有输入全部消费完成才会触发 end 事件(inputs 通常只在 end 事件里完整返回)。
  • 常见事件参考表

事件 名称 输入(示例) 输出(示例)
on_chat_model_start [模型名称] {"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_end [模型名称] {"messages": [[SystemMessage, HumanMessage]]} AIMessageChunk(content="hello world")
on_llm_start [模型名称] {'input': 'hello'}
on_llm_stream [模型名称] 'Hello'
on_llm_end [模型名称] 'Hello human!'
on_chain_start format_docs
on_chain_stream format_docs "hello world!, goodbye world!"
on_chain_end format_docs [Document(...)] "hello world!, goodbye world!"
on_tool_start some_tool {"x": 1, "y": "2"}
on_tool_end some_tool {"x": 1, "y": "2"}
on_retriever_start [检索器名称] {"query": "hello"}
on_retriever_end [检索器名称] {"query": "hello"} [Document(...), ..]
on_prompt_start [模板名称] {"question": "hello"}
on_prompt_end [模板名称] {"question": "hello"} ChatPromptValue(messages: [SystemMessage, ...])
  • LangChain 事件流(astream_events)监听示例: 实时监听 LLM(如 gpt-4)推理过程中的所有事件,将事件对象逐一收集到列表(如日志、调试、追踪用途)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langchain_openai import ChatOpenAI
import asyncio

model = ChatOpenAI(model="gpt-4")

async def async_stream():
events = []
# 监听 gpt-4 推理全过程中的事件(需指定 version="v2")
async for event in model.astream_events(input="hello", version="v2"):
events.append(event)
print(events)

# 运行异步事件流监听
asyncio.run(async_stream())

7. LangChain + asyncio 同步/异步任务串行与并发调用

场景:需要依次执行多个 LLM 相关异步任务(比如先问 GPT-4 一个问题,再问 GPT-4o 另一个问题),每次等上一个任务流式结束才开始下一个

Python asyncio ≠ 多线程,不是“真正的并行”!

  • asyncio 提供的是协程(coroutine),本质上是单线程+多任务,任务之间是并发(concurrent),不是并行(parallel)。
  • 并发:任务可以你一下我一下轮流执行,遇到I/O就切换(适合网络、文件流等“等”的场景)。
  • 并行:同一时刻多个CPU核心真的同时干活,只有多线程/多进程才是真并行。

总结:

  • asyncio == 协程 == 并发 ≠ 并行(不是多线程/多进程)
  • 真正多核并行运算要用多线程/多进程(但API型I/O任务,协程足够了)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from langchain_openai import ChatOpenAI
import asyncio

async def task1():
gpt4 = ChatOpenAI(model="gpt-4")
chunks = []
async for chunk in gpt4.astream("天空是什么颜色?"):
chunks.append(chunk)
# 在收到第二个 chunk 时,输出它
if len(chunks) == 2:
print(chunks[1])
print(chunk.content, end="|", flush=True)

async def task2():
gpt4o = ChatOpenAI(model="gpt-4o")
chunks = []
async for chunk in gpt4o.astream("讲个笑话?"):
chunks.append(chunk)
# 打印收到的第二块内容(chunks[1]),注意下标从0开始
if len(chunks) == 2:
print(chunks[1])
print(chunk.content, end="|", flush=True) # 实际流式场景可推送到前端

async def main_sync():
# 串行:task1完全结束后,才会跑task2
# 适用场景:上下游有依赖、必须一个一个排队执行。
await task1()
await task2()

async def main_async():
# gather可以让多个任务“同时”启动、协作切换,I/O等待时不阻塞
await asyncio.gather(task1(), task2())

# 启动主函数
asyncio.run(main_async())
# asyncio.run(main_sync())
  • 关键点

    • await task1() / await task2():表示串行/顺序调用,等 task1 完全执行结束后,才会执行 task2,适合有前后依赖关系的流程。
    • asyncio.gather(task1(), task2()):表示协程并发(不是多线程/并行),让多个任务“看起来同时”运行,I/O等待时自动切换,适合多个互不依赖的 I/O 密集型任务。
    • 流式输出async for chunk in ... astream(...):能逐步获取 LLM 生成内容,每一块(chunk)立即处理,提高响应速度和用户体验。
  • 应用场景

    • 有前后依赖/顺序要求的 LLM 数据处理/分析,建议用“串行”方式(await task1(); await task2())。
    • 多个任务互不依赖、需要加速整体效率时,可以用 asyncio.gather 实现“协程并发”。
    • 记住:CPU密集型并行要用多线程/多进程;API/网络/流式I/O密集用协程即可。

参考资料


“觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付

LangChain 3 工作流编排
http://neurowave.tech/2025/06/27/2-12-LLM-Langchain3/
作者
Artin Tan
发布于
2025年6月27日
更新于
2025年8月6日