流式输出¶
你可以从 LangGraph 智能代理或工作流 流式传输输出。
支持的流式模式¶
将以下一种或多种流式模式作为列表传递给 @[stream()][CompiledStateGraph.stream] 或 @[astream()][CompiledStateGraph.astream] 方法:
将以下一种或多种流式模式作为列表传递给 @[stream()][CompiledStateGraph.stream] 方法:
| 模式 | 描述 |
|---|---|
values |
在图的每一步之后流式传输状态的完整值。 |
updates |
在图的每一步之后流式传输状态的更新。如果在同一步中进行多次更新(例如运行多个节点),这些更新会分别流式传输。 |
custom |
从图节点内部流式传输自定义数据。 |
messages |
从调用 LLM 的任何图节点流式传输 2 元组(LLM token,元数据)。 |
debug |
在图执行过程中尽可能多地流式传输信息。 |
从智能代理流式传输¶
智能代理进度¶
要流式传输智能代理进度,使用 @[stream()][CompiledStateGraph.stream] 或 @[astream()][CompiledStateGraph.astream] 方法并设置 stream_mode="updates"。这会在每个智能代理步骤后发出事件。
要流式传输智能代理进度,使用 @[stream()][CompiledStateGraph.stream] 方法并设置 streamMode: "updates"。这会在每个智能代理步骤后发出事件。
例如,如果你有一个调用一次工具的智能代理,你应该看到以下更新:
- LLM 节点:带有工具调用请求的 AI 消息
- 工具节点:带有执行结果的工具消息
- LLM 节点:最终 AI 响应
const agent = createReactAgent({
llm: model,
tools: [getWeather],
});
for await (const chunk of await agent.stream(
{ messages: [{ role: "user", content: "what is the weather in sf" }] },
{ streamMode: "updates" }
)) {
console.log(chunk);
console.log("\n");
}
LLM token¶
要在 LLM 生成 token 时流式传输它们,使用 stream_mode="messages":
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
# highlight-next-line
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
# highlight-next-line
stream_mode="messages"
):
print("Token", token)
print("Metadata", metadata)
print("\n")
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
# highlight-next-line
async for token, metadata in agent.astream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
# highlight-next-line
stream_mode="messages"
):
print("Token", token)
print("Metadata", metadata)
print("\n")
要在 LLM 生成 token 时流式传输它们,使用 streamMode: "messages":
const agent = createReactAgent({
llm: model,
tools: [getWeather],
});
for await (const [token, metadata] of await agent.stream(
{ messages: [{ role: "user", content: "what is the weather in sf" }] },
{ streamMode: "messages" }
)) {
console.log("Token", token);
console.log("Metadata", metadata);
console.log("\n");
}
工具更新¶
要在工具执行时流式传输更新,可以使用 @[get_stream_writer][]。
# highlight-next-line
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
# highlight-next-line
writer = get_stream_writer()
# 流式传输任意数据
# highlight-next-line
writer(f"Looking up data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
# highlight-next-line
stream_mode="custom"
):
print(chunk)
print("\n")
# highlight-next-line
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
# highlight-next-line
writer = get_stream_writer()
# 流式传输任意数据
# highlight-next-line
writer(f"Looking up data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
async for chunk in agent.astream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
# highlight-next-line
stream_mode="custom"
):
print(chunk)
print("\n")
Note
如果你在工具中添加 get_stream_writer,你将无法在 LangGraph 执行上下文之外调用该工具。
要在工具执行时流式传输更新,可以使用配置中的 writer 参数。
import { LangGraphRunnableConfig } from "@langchain/langgraph";
const getWeather = tool(
async (input, config: LangGraphRunnableConfig) => {
// 流式传输任意数据
config.writer?.("Looking up data for city: " + input.city);
return `It's always sunny in ${input.city}!`;
},
{
name: "get_weather",
description: "Get weather for a given city.",
schema: z.object({
city: z.string().describe("The city to get weather for."),
}),
}
);
const agent = createReactAgent({
llm: model,
tools: [getWeather],
});
for await (const chunk of await agent.stream(
{ messages: [{ role: "user", content: "what is the weather in sf" }] },
{ streamMode: "custom" }
)) {
console.log(chunk);
console.log("\n");
}
Note
如果你向工具添加 writer 参数,你将无法在不提供 writer 函数的情况下在 LangGraph 执行上下文之外调用该工具。
多模式流式传输¶
你可以通过将流式模式作为列表传递来指定多种流式模式:stream_mode=["updates", "messages", "custom"]:
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
for stream_mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
# highlight-next-line
stream_mode=["updates", "messages", "custom"]
):
print(chunk)
print("\n")
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
async for stream_mode, chunk in agent.astream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
# highlight-next-line
stream_mode=["updates", "messages", "custom"]
):
print(chunk)
print("\n")
你可以通过将 streamMode 作为数组传递来指定多种流式模式:streamMode: ["updates", "messages", "custom"]:
const agent = createReactAgent({
llm: model,
tools: [getWeather],
});
for await (const chunk of await agent.stream(
{ messages: [{ role: "user", content: "what is the weather in sf" }] },
{ streamMode: ["updates", "messages", "custom"] }
)) {
console.log(chunk);
console.log("\n");
}
禁用流式传输¶
在某些应用中,你可能需要为给定模型禁用单个 token 的流式传输。这在 多智能代理 系统中很有用,可以控制哪些智能代理流式输出。
请参阅 模型 指南了解如何禁用流式传输。
从工作流流式传输¶
基本用法示例¶
LangGraph 图暴露 @[.stream()][Pregel.stream](同步)和 @[.astream()][Pregel.astream](异步)方法,将流式输出作为迭代器返回。
LangGraph 图暴露 @[.stream()][Pregel.stream] 方法,将流式输出作为迭代器返回。
for await (const chunk of await graph.stream(inputs, {
streamMode: "updates",
})) {
console.log(chunk);
}
扩展示例:流式传输更新
=== "Python"
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
# highlight-next-line
for chunk in graph.stream( # (1)!
{"topic": "ice cream"},
# highlight-next-line
stream_mode="updates", # (2)!
):
print(chunk)
stream()方法返回一个产出流式输出的迭代器。- 设置
stream_mode="updates"以仅流式传输每个节点后对图状态的更新。还有其他流式模式可用。详情请参阅 支持的流式模式。
=== "JavaScript"
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = z.object({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "updates" } // (1)!
)) {
console.log(chunk);
}
- 设置
streamMode: "updates"以仅流式传输每个节点后对图状态的更新。还有其他流式模式可用。详情请参阅 支持的流式模式。
output
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}} |
多模式流式传输¶
你可以将列表作为 stream_mode 参数传递,一次流式传输多种模式。
流式输出将是 (mode, chunk) 元组,其中 mode 是流式模式的名称,chunk 是该模式流式传输的数据。
你可以将数组作为 streamMode 参数传递,一次流式传输多种模式。
流式输出将是 [mode, chunk] 元组,其中 mode 是流式模式的名称,chunk 是该模式流式传输的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
streamMode: ["updates", "custom"],
})) {
console.log(chunk);
}
流式传输图状态¶
使用流式模式 updates 和 values 在图执行时流式传输其状态。
updates在图的每一步之后流式传输状态的 更新。values在图的每一步之后流式传输状态的 完整值。
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = z.object({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
流式传输子图输出¶
要在流式输出中包含 子图 的输出,你可以在父图的 .stream() 方法中设置 subgraphs=True。这将流式传输父图和任何子图的输出。
输出将以元组 (namespace, data) 的形式流式传输,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ("parent_node:<task_id>", "child_node:<task_id>")。
for chunk in graph.stream(
{"foo": "foo"},
# highlight-next-line
subgraphs=True, # (1)!
stream_mode="updates",
):
print(chunk)
- 设置
subgraphs=True以流式传输子图的输出。
要在流式输出中包含 子图 的输出,你可以在父图的 .stream() 方法中设置 subgraphs: true。这将流式传输父图和任何子图的输出。
输出将以元组 [namespace, data] 的形式流式传输,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ["parent_node:<task_id>", "child_node:<task_id>"]。
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
subgraphs: true, // (1)!
streamMode: "updates",
}
)) {
console.log(chunk);
}
- 设置
subgraphs: true以流式传输子图的输出。
扩展示例:从子图流式传输
=== "Python"
from langgraph.graph import START, StateGraph
from typing import TypedDict
# 定义子图
class SubgraphState(TypedDict):
foo: str # 注意此键与父图状态共享
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# 定义父图
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
for chunk in graph.stream(
{"foo": "foo"},
stream_mode="updates",
# highlight-next-line
subgraphs=True, # (1)!
):
print(chunk)
- 设置
subgraphs=True以流式传输子图的输出。
=== "JavaScript"
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";
// 定义子图
const SubgraphState = z.object({
foo: z.string(), // 注意此键与父图状态共享
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// 定义父图
const ParentState = z.object({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
subgraphs: true, // (1)!
}
)) {
console.log(chunk);
}
- 设置
subgraphs: true以流式传输子图的输出。
=== "Python"
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
=== "JavaScript"
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
注意,我们不仅接收节点更新,还接收命名空间,它们告诉我们正在从哪个图(或子图)流式传输。
调试¶
使用 debug 流式模式在图执行过程中尽可能多地流式传输信息。流式输出包括节点名称和完整状态。
for chunk in graph.stream(
{"topic": "ice cream"},
# highlight-next-line
stream_mode="debug",
):
print(chunk)
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "debug" }
)) {
console.log(chunk);
}
LLM token¶
使用 messages 流式模式从图的任何部分(包括节点、工具、子图或任务)逐 token 流式传输大型语言模型(LLM)输出。
messages 模式 的流式输出是一个元组 (message_chunk, metadata),其中:
message_chunk:来自 LLM 的 token 或消息片段。metadata:包含图节点和 LLM 调用详细信息的字典。
如果你的 LLM 没有 LangChain 集成,你可以使用
custom模式流式传输其输出。详情请参阅 与任意 LLM 一起使用。
Python < 3.11 异步需要手动配置
在 Python < 3.11 中使用异步代码时,你必须显式将 RunnableConfig 传递给 ainvoke() 以启用正确的流式传输。详情请参阅 Python < 3.11 异步 或升级到 Python 3.11+。
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
llm = init_chat_model(model="openai:gpt-4o-mini")
def call_model(state: MyState):
"""Call the LLM to generate a joke about a topic"""
# highlight-next-line
llm_response = llm.invoke( # (1)!
[
{"role": "user", "content": f"Generate a joke about {state.topic}"}
]
)
return {"joke": llm_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
for message_chunk, metadata in graph.stream( # (2)!
{"topic": "ice cream"},
# highlight-next-line
stream_mode="messages",
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
- 注意,即使使用
.invoke而不是.stream运行 LLM,消息事件也会被发出。 - "messages" 流式模式返回一个
(message_chunk, metadata)元组的迭代器,其中message_chunk是 LLM 流式传输的 token,metadata是一个字典,包含调用 LLM 的图节点信息和其他信息。
messages 模式 的流式输出是一个元组 [message_chunk, metadata],其中:
message_chunk:来自 LLM 的 token 或消息片段。metadata:包含图节点和 LLM 调用详细信息的字典。
如果你的 LLM 没有 LangChain 集成,你可以使用
custom模式流式传输其输出。详情请参阅 与任意 LLM 一起使用。
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";
const MyState = z.object({
topic: z.string(),
joke: z.string().default(""),
});
const llm = new ChatOpenAI({ model: "gpt-4o-mini" });
const callModel = async (state: z.infer<typeof MyState>) => {
// Call the LLM to generate a joke about a topic
const llmResponse = await llm.invoke([
{ role: "user", content: `Generate a joke about ${state.topic}` },
]); // (1)!
return { joke: llmResponse.content };
};
const graph = new StateGraph(MyState)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
for await (const [messageChunk, metadata] of await graph.stream(
// (2)!
{ topic: "ice cream" },
{ streamMode: "messages" }
)) {
if (messageChunk.content) {
console.log(messageChunk.content + "|");
}
}
- 注意,即使使用
.invoke而不是.stream运行 LLM,消息事件也会被发出。 - "messages" 流式模式返回一个
[messageChunk, metadata]元组的迭代器,其中messageChunk是 LLM 流式传输的 token,metadata是一个字典,包含调用 LLM 的图节点信息和其他信息。
按 LLM 调用过滤¶
你可以将 tags 与 LLM 调用关联,以按 LLM 调用过滤流式 token。
from langchain.chat_models import init_chat_model
llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) # (1)!
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) # (2)!
graph = ... # 定义使用这些 LLM 的图
async for msg, metadata in graph.astream( # (3)!
{"topic": "cats"},
# highlight-next-line
stream_mode="messages",
):
if metadata["tags"] == ["joke"]: # (4)!
print(msg.content, end="|", flush=True)
- llm_1 被标记为 "joke"。
- llm_2 被标记为 "poem"。
stream_mode设置为 "messages" 以流式传输 LLM token。metadata包含有关 LLM 调用的信息,包括标签。- 通过元数据中的
tags字段过滤流式 token,只包含带有 "joke" 标签的 LLM 调用的 token。
import { ChatOpenAI } from "@langchain/openai";
const llm1 = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ['joke'] // (1)!
});
const llm2 = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ['poem'] // (2)!
});
const graph = // ... 定义使用这些 LLM 的图
for await (const [msg, metadata] of await graph.stream( // (3)!
{ topic: "cats" },
{ streamMode: "messages" }
)) {
if (metadata.tags?.includes("joke")) { // (4)!
console.log(msg.content + "|");
}
}
- llm1 被标记为 "joke"。
- llm2 被标记为 "poem"。
streamMode设置为 "messages" 以流式传输 LLM token。metadata包含有关 LLM 调用的信息,包括标签。- 通过元数据中的
tags字段过滤流式 token,只包含带有 "joke" 标签的 LLM 调用的 token。
扩展示例:按标签过滤
=== "Python"
from typing import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph
joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) # (1)!
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) # (2)!
class State(TypedDict):
topic: str
joke: str
poem: str
async def call_model(state, config):
topic = state["topic"]
print("Writing joke...")
# 注意:在 Python < 3.11 中显式传递 config 是必需的
# 因为在此之前没有添加上下文变量支持:https://docs.python.org/3/library/asyncio-task.html#creating-tasks
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config, # (3)!
)
print("\n\nWriting poem...")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config, # (3)!
)
return {"joke": joke_response.content, "poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
async for msg, metadata in graph.astream(
{"topic": "cats"},
# highlight-next-line
stream_mode="messages", # (4)!
):
if metadata["tags"] == ["joke"]: # (4)!
print(msg.content, end="|", flush=True)
joke_model被标记为 "joke"。poem_model被标记为 "poem"。- 显式传递
config以确保上下文变量正确传播。这是在使用异步代码时 Python < 3.11 所必需的。详情请参阅 异步部分。 stream_mode设置为 "messages" 以流式传输 LLM token。metadata包含有关 LLM 调用的信息,包括标签。
=== "JavaScript"
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";
const jokeModel = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ["joke"] // (1)!
});
const poemModel = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ["poem"] // (2)!
});
const State = z.object({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const graph = new StateGraph(State)
.addNode("callModel", (state) => {
const topic = state.topic;
console.log("Writing joke...");
const jokeResponse = await jokeModel.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
console.log("\n\nWriting poem...");
const poemResponse = await poemModel.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return {
joke: jokeResponse.content,
poem: poemResponse.content
};
})
.addEdge(START, "callModel")
.compile();
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" } // (3)!
)) {
if (metadata.tags?.includes("joke")) { // (4)!
console.log(msg.content + "|");
}
}
jokeModel被标记为 "joke"。poemModel被标记为 "poem"。streamMode设置为 "messages" 以流式传输 LLM token。metadata包含有关 LLM 调用的信息,包括标签。- 通过元数据中的
tags字段过滤流式 token,只包含带有 "joke" 标签的 LLM 调用的 token。
按节点过滤¶
要仅从特定节点流式传输 token,使用 stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出:
for msg, metadata in graph.stream( # (1)!
inputs,
# highlight-next-line
stream_mode="messages",
):
# highlight-next-line
if msg.content and metadata["langgraph_node"] == "some_node_name": # (2)!
...
- "messages" 流式模式返回一个
(message_chunk, metadata)元组,其中message_chunk是 LLM 流式传输的 token,metadata是一个字典,包含调用 LLM 的图节点信息和其他信息。 - 通过元数据中的
langgraph_node字段过滤流式 token,只包含来自write_poem节点的 token。
for await (const [msg, metadata] of await graph.stream(
// (1)!
inputs,
{ streamMode: "messages" }
)) {
if (msg.content && metadata.langgraph_node === "some_node_name") {
// (2)!
// ...
}
}
- "messages" 流式模式返回一个
[messageChunk, metadata]元组,其中messageChunk是 LLM 流式传输的 token,metadata是一个字典,包含调用 LLM 的图节点信息和其他信息。 - 通过元数据中的
langgraph_node字段过滤流式 token,只包含来自writePoem节点的 token。
扩展示例:从特定节点流式传输 LLM token
=== "Python"
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
topic: str
joke: str
poem: str
def write_joke(state: State):
topic = state["topic"]
joke_response = model.invoke(
[{"role": "user", "content": f"Write a joke about {topic}"}]
)
return {"joke": joke_response.content}
def write_poem(state: State):
topic = state["topic"]
poem_response = model.invoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}]
)
return {"poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(write_joke)
.add_node(write_poem)
# 并发写笑话和诗
.add_edge(START, "write_joke")
.add_edge(START, "write_poem")
.compile()
)
# highlight-next-line
for msg, metadata in graph.stream( # (1)!
{"topic": "cats"},
stream_mode="messages",
):
# highlight-next-line
if msg.content and metadata["langgraph_node"] == "write_poem": # (2)!
print(msg.content, end="|", flush=True)
- "messages" 流式模式返回一个
(message_chunk, metadata)元组,其中message_chunk是 LLM 流式传输的 token,metadata是一个字典,包含调用 LLM 的图节点信息和其他信息。 - 通过元数据中的
langgraph_node字段过滤流式 token,只包含来自write_poem节点的 token。
=== "JavaScript"
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";
const model = new ChatOpenAI({ model: "gpt-4o-mini" });
const State = z.object({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const graph = new StateGraph(State)
.addNode("writeJoke", async (state) => {
const topic = state.topic;
const jokeResponse = await model.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
return { joke: jokeResponse.content };
})
.addNode("writePoem", async (state) => {
const topic = state.topic;
const poemResponse = await model.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return { poem: poemResponse.content };
})
// 并发写笑话和诗
.addEdge(START, "writeJoke")
.addEdge(START, "writePoem")
.compile();
for await (const [msg, metadata] of await graph.stream( // (1)!
{ topic: "cats" },
{ streamMode: "messages" }
)) {
if (msg.content && metadata.langgraph_node === "writePoem") { // (2)!
console.log(msg.content + "|");
}
}
- "messages" 流式模式返回一个
[messageChunk, metadata]元组,其中messageChunk是 LLM 流式传输的 token,metadata是一个字典,包含调用 LLM 的图节点信息和其他信息。 - 通过元数据中的
langgraph_node字段过滤流式 token,只包含来自writePoem节点的 token。
流式传输自定义数据¶
要从 LangGraph 节点或工具内部发送 自定义用户定义数据,请按照以下步骤操作:
- 使用
get_stream_writer()访问流写入器并发出自定义数据。 - 在调用
.stream()或.astream()时设置stream_mode="custom"以在流中获取自定义数据。你可以组合多种模式(例如["updates", "custom"]),但至少必须有一个是"custom"。
Python < 3.11 异步中没有 get_stream_writer()
在 Python < 3.11 上运行的异步代码中,get_stream_writer() 将不起作用。
相反,向你的节点或工具添加 writer 参数并手动传递它。
请参阅 Python < 3.11 异步 了解使用示例。
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State(TypedDict):
query: str
answer: str
def node(state: State):
writer = get_stream_writer() # (1)!
writer({"custom_key": "Generating custom data inside node"}) # (2)!
return {"answer": "some data"}
graph = (
StateGraph(State)
.add_node(node)
.add_edge(START, "node")
.compile()
)
inputs = {"query": "example"}
# 使用
for chunk in graph.stream(inputs, stream_mode="custom"): # (3)!
print(chunk)
- 获取流写入器以发送自定义数据。
- 发出自定义键值对(例如进度更新)。
- 设置
stream_mode="custom"以在流中接收自定义数据。
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database(query: str) -> str:
"""Query the database."""
writer = get_stream_writer() # (1)!
# highlight-next-line
writer({"data": "Retrieved 0/100 records", "type": "progress"}) # (2)!
# 执行查询
# highlight-next-line
writer({"data": "Retrieved 100/100 records", "type": "progress"}) # (3)!
return "some-answer"
graph = ... # 定义使用此工具的图
for chunk in graph.stream(inputs, stream_mode="custom"): # (4)!
print(chunk)
- 访问流写入器以发送自定义数据。
- 发出自定义键值对(例如进度更新)。
- 发出另一个自定义键值对。
- 设置
stream_mode="custom"以在流中接收自定义数据。
要从 LangGraph 节点或工具内部发送 自定义用户定义数据,请按照以下步骤操作:
- 使用
LangGraphRunnableConfig中的writer参数发出自定义数据。 - 在调用
.stream()时设置streamMode: "custom"以在流中获取自定义数据。你可以组合多种模式(例如["updates", "custom"]),但至少必须有一个是"custom"。
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { z } from "zod";
const State = z.object({
query: z.string(),
answer: z.string(),
});
const graph = new StateGraph(State)
.addNode("node", async (state, config) => {
config.writer({ custom_key: "Generating custom data inside node" }); // (1)!
return { answer: "some data" };
})
.addEdge(START, "node")
.compile();
const inputs = { query: "example" };
// 使用
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { // (2)!
console.log(chunk);
}
- 使用写入器发出自定义键值对(例如进度更新)。
- 设置
streamMode: "custom"以在流中接收自定义数据。
import { tool } from "@langchain/core/tools";
import { LangGraphRunnableConfig } from "@langchain/langgraph";
import { z } from "zod";
const queryDatabase = tool(
async (input, config: LangGraphRunnableConfig) => {
config.writer({ data: "Retrieved 0/100 records", type: "progress" }); // (1)!
// 执行查询
config.writer({ data: "Retrieved 100/100 records", type: "progress" }); // (2)!
return "some-answer";
},
{
name: "query_database",
description: "Query the database.",
schema: z.object({
query: z.string().describe("The query to execute."),
}),
}
);
const graph = // ... 定义使用此工具的图
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { // (3)!
console.log(chunk);
}
- 使用写入器发出自定义键值对(例如进度更新)。
- 发出另一个自定义键值对。
- 设置
streamMode: "custom"以在流中接收自定义数据。
与任意 LLM 一起使用¶
你可以使用 stream_mode="custom" 从 任何 LLM API 流式传输数据 — 即使该 API 没有 实现 LangChain 聊天模型接口。
这让你可以集成原始 LLM 客户端或提供自己流式接口的外部服务,使 LangGraph 对自定义设置高度灵活。
from langgraph.config import get_stream_writer
def call_arbitrary_model(state):
"""Example node that calls an arbitrary model and streams the output"""
# highlight-next-line
writer = get_stream_writer() # (1)!
# 假设你有一个产出块的流式客户端
for chunk in your_custom_streaming_client(state["topic"]): # (2)!
# highlight-next-line
writer({"custom_llm_chunk": chunk}) # (3)!
return {"result": "completed"}
graph = (
StateGraph(State)
.add_node(call_arbitrary_model)
# 根据需要添加其他节点和边
.compile()
)
for chunk in graph.stream(
{"topic": "cats"},
# highlight-next-line
stream_mode="custom", # (4)!
):
# chunk 将包含从 llm 流式传输的自定义数据
print(chunk)
- 获取流写入器以发送自定义数据。
- 使用你的自定义流式客户端生成 LLM token。
- 使用写入器向流发送自定义数据。
- 设置
stream_mode="custom"以在流中接收自定义数据。
你可以使用 streamMode: "custom" 从 任何 LLM API 流式传输数据 — 即使该 API 没有 实现 LangChain 聊天模型接口。
这让你可以集成原始 LLM 客户端或提供自己流式接口的外部服务,使 LangGraph 对自定义设置高度灵活。
import { LangGraphRunnableConfig } from "@langchain/langgraph";
const callArbitraryModel = async (
state: any,
config: LangGraphRunnableConfig
) => {
// 调用任意模型并流式传输输出的示例节点
// 假设你有一个产出块的流式客户端
for await (const chunk of yourCustomStreamingClient(state.topic)) {
// (1)!
config.writer({ custom_llm_chunk: chunk }); // (2)!
}
return { result: "completed" };
};
const graph = new StateGraph(State)
.addNode("callArbitraryModel", callArbitraryModel)
// 根据需要添加其他节点和边
.compile();
for await (const chunk of await graph.stream(
{ topic: "cats" },
{ streamMode: "custom" } // (3)!
)) {
// chunk 将包含从 llm 流式传输的自定义数据
console.log(chunk);
}
- 使用你的自定义流式客户端生成 LLM token。
- 使用写入器向流发送自定义数据。
- 设置
streamMode: "custom"以在流中接收自定义数据。
扩展示例:流式传输任意聊天模型
=== "Python"
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
# 这是我们的工具
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role": "user",
"content": (
"Can you tell me what kind of items "
f"i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. "
"And include a brief description of each item."
),
}
],
):
response += msg_chunk["content"]
writer(msg_chunk)
return response
class State(TypedDict):
messages: Annotated[list[dict], operator.add]
# 这是工具调用图节点
async def call_tool(state: State):
ai_message = state["messages"][-1]
tool_call = ai_message["tool_calls"][-1]
function_name = tool_call["function"]["name"]
if function_name != "get_items":
raise ValueError(f"Tool {function_name} not supported")
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await get_items(**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge(START, "call_tool")
.compile()
)
让我们用一个包含工具调用的 AI 消息调用图:
inputs = {
"messages": [
{
"content": None,
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"arguments": '{"place":"bedroom"}',
"name": "get_items",
},
"type": "function",
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode="custom",
):
print(chunk["content"], end="|", flush=True)
=== "JavaScript"
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { z } from "zod";
import OpenAI from "openai";
const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";
async function* streamTokens(modelName: string, messages: any[]) {
const response = await openaiClient.chat.completions.create({
messages,
model: modelName,
stream: true,
});
let role: string | null = null;
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta?.role) {
role = delta.role;
}
if (delta?.content) {
yield { role, content: delta.content };
}
}
}
// 这是我们的工具
const getItems = tool(
async (input, config: LangGraphRunnableConfig) => {
let response = "";
for await (const msgChunk of streamTokens(
modelName,
[
{
role: "user",
content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
},
]
)) {
response += msgChunk.content;
config.writer?.(msgChunk);
}
return response;
},
{
name: "get_items",
description: "Use this tool to list items one might find in a place you're asked about.",
schema: z.object({
place: z.string().describe("The place to look up items for."),
}),
}
);
const State = z.object({
messages: z.array(z.any()),
});
const graph = new StateGraph(State)
// 这是工具调用图节点
.addNode("callTool", async (state) => {
const aiMessage = state.messages.at(-1);
const toolCall = aiMessage.tool_calls?.at(-1);
const functionName = toolCall?.function?.name;
if (functionName !== "get_items") {
throw new Error(`Tool ${functionName} not supported`);
}
const functionArguments = toolCall?.function?.arguments;
const args = JSON.parse(functionArguments);
const functionResponse = await getItems.invoke(args);
const toolMessage = {
tool_call_id: toolCall.id,
role: "tool",
name: functionName,
content: functionResponse,
};
return { messages: [toolMessage] };
})
.addEdge(START, "callTool")
.compile();
让我们用一个包含工具调用的 AI 消息调用图:
const inputs = {
messages: [
{
content: null,
role: "assistant",
tool_calls: [
{
id: "1",
function: {
arguments: '{"place":"bedroom"}',
name: "get_items",
},
type: "function",
}
],
}
]
};
for await (const chunk of await graph.stream(
inputs,
{ streamMode: "custom" }
)) {
console.log(chunk.content + "|");
}
为特定聊天模型禁用流式传输¶
如果你的应用混合使用支持流式传输和不支持流式传输的模型,你可能需要为不支持流式传输的模型显式禁用流式传输。
初始化模型时设置 disable_streaming=True。
初始化模型时设置 streaming: false。
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "o1-preview",
streaming: false, // (1)!
});
Python < 3.11 异步¶
在 Python < 3.11 版本中,asyncio tasks 不支持 context 参数。
这限制了 LangGraph 自动传播上下文的能力,并以两种关键方式影响 LangGraph 的流式机制:
- 你 必须 显式将
RunnableConfig传递给异步 LLM 调用(例如ainvoke()),因为回调不会自动传播。 - 你 不能 在异步节点或工具中使用
get_stream_writer()— 你必须直接传递writer参数。
扩展示例:带手动配置的异步 LLM 调用
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model
llm = init_chat_model(model="openai:gpt-4o-mini")
class State(TypedDict):
topic: str
joke: str
async def call_model(state, config): # (1)!
topic = state["topic"]
print("Generating joke...")
joke_response = await llm.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
# highlight-next-line
config, # (2)!
)
return {"joke": joke_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
async for chunk, metadata in graph.astream(
{"topic": "ice cream"},
# highlight-next-line
stream_mode="messages", # (3)!
):
if chunk.content:
print(chunk.content, end="|", flush=True)
- 在异步节点函数中接受
config作为参数。 - 将
config传递给llm.ainvoke()以确保正确的上下文传播。 - 设置
stream_mode="messages"以流式传输 LLM token。
扩展示例:带流写入器的异步自定义流式传输
from typing import TypedDict
from langgraph.types import StreamWriter
class State(TypedDict):
topic: str
joke: str
# highlight-next-line
async def generate_joke(state: State, writer: StreamWriter): # (1)!
writer({"custom_key": "Streaming custom data while generating a joke"})
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(generate_joke)
.add_edge(START, "generate_joke")
.compile()
)
async for chunk in graph.astream(
{"topic": "ice cream"},
# highlight-next-line
stream_mode="custom", # (2)!
):
print(chunk)
- 在异步节点或工具的函数签名中添加
writer作为参数。LangGraph 将自动将流写入器传递给函数。 - 设置
stream_mode="custom"以在流中接收自定义数据。