跳转至

流式输出

你可以从 LangGraph 智能代理或工作流 流式传输输出

支持的流式模式

将以下一种或多种流式模式作为列表传递给 @[stream()][CompiledStateGraph.stream] 或 @[astream()][CompiledStateGraph.astream] 方法:

将以下一种或多种流式模式作为列表传递给 @[stream()][CompiledStateGraph.stream] 方法:

模式 描述
values 在图的每一步之后流式传输状态的完整值。
updates 在图的每一步之后流式传输状态的更新。如果在同一步中进行多次更新(例如运行多个节点),这些更新会分别流式传输。
custom 从图节点内部流式传输自定义数据。
messages 从调用 LLM 的任何图节点流式传输 2 元组(LLM token,元数据)。
debug 在图执行过程中尽可能多地流式传输信息。

从智能代理流式传输

智能代理进度

要流式传输智能代理进度,使用 @[stream()][CompiledStateGraph.stream] 或 @[astream()][CompiledStateGraph.astream] 方法并设置 stream_mode="updates"。这会在每个智能代理步骤后发出事件。

要流式传输智能代理进度,使用 @[stream()][CompiledStateGraph.stream] 方法并设置 streamMode: "updates"。这会在每个智能代理步骤后发出事件。

例如,如果你有一个调用一次工具的智能代理,你应该看到以下更新:

  • LLM 节点:带有工具调用请求的 AI 消息
  • 工具节点:带有执行结果的工具消息
  • LLM 节点:最终 AI 响应
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
# highlight-next-line
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode="updates"
):
    print(chunk)
    print("\n")
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
# highlight-next-line
async for chunk in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode="updates"
):
    print(chunk)
    print("\n")
const agent = createReactAgent({
  llm: model,
  tools: [getWeather],
});

for await (const chunk of await agent.stream(
  { messages: [{ role: "user", content: "what is the weather in sf" }] },
  { streamMode: "updates" }
)) {
  console.log(chunk);
  console.log("\n");
}

LLM token

要在 LLM 生成 token 时流式传输它们,使用 stream_mode="messages"

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
# highlight-next-line
for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode="messages"
):
    print("Token", token)
    print("Metadata", metadata)
    print("\n")
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
# highlight-next-line
async for token, metadata in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode="messages"
):
    print("Token", token)
    print("Metadata", metadata)
    print("\n")

要在 LLM 生成 token 时流式传输它们,使用 streamMode: "messages"

const agent = createReactAgent({
  llm: model,
  tools: [getWeather],
});

for await (const [token, metadata] of await agent.stream(
  { messages: [{ role: "user", content: "what is the weather in sf" }] },
  { streamMode: "messages" }
)) {
  console.log("Token", token);
  console.log("Metadata", metadata);
  console.log("\n");
}

工具更新

要在工具执行时流式传输更新,可以使用 @[get_stream_writer][]。

# highlight-next-line
from langgraph.config import get_stream_writer

def get_weather(city: str) -> str:
    """Get weather for a given city."""
    # highlight-next-line
    writer = get_stream_writer()
    # 流式传输任意数据
    # highlight-next-line
    writer(f"Looking up data for city: {city}")
    return f"It's always sunny in {city}!"

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode="custom"
):
    print(chunk)
    print("\n")
# highlight-next-line
from langgraph.config import get_stream_writer

def get_weather(city: str) -> str:
    """Get weather for a given city."""
    # highlight-next-line
    writer = get_stream_writer()
    # 流式传输任意数据
    # highlight-next-line
    writer(f"Looking up data for city: {city}")
    return f"It's always sunny in {city}!"

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

async for chunk in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode="custom"
):
    print(chunk)
    print("\n")

Note

如果你在工具中添加 get_stream_writer,你将无法在 LangGraph 执行上下文之外调用该工具。

要在工具执行时流式传输更新,可以使用配置中的 writer 参数。

import { LangGraphRunnableConfig } from "@langchain/langgraph";

const getWeather = tool(
  async (input, config: LangGraphRunnableConfig) => {
    // 流式传输任意数据
    config.writer?.("Looking up data for city: " + input.city);
    return `It's always sunny in ${input.city}!`;
  },
  {
    name: "get_weather",
    description: "Get weather for a given city.",
    schema: z.object({
      city: z.string().describe("The city to get weather for."),
    }),
  }
);

const agent = createReactAgent({
  llm: model,
  tools: [getWeather],
});

for await (const chunk of await agent.stream(
  { messages: [{ role: "user", content: "what is the weather in sf" }] },
  { streamMode: "custom" }
)) {
  console.log(chunk);
  console.log("\n");
}

Note

如果你向工具添加 writer 参数,你将无法在不提供 writer 函数的情况下在 LangGraph 执行上下文之外调用该工具。

多模式流式传输

你可以通过将流式模式作为列表传递来指定多种流式模式:stream_mode=["updates", "messages", "custom"]

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

for stream_mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode=["updates", "messages", "custom"]
):
    print(chunk)
    print("\n")
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

async for stream_mode, chunk in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    # highlight-next-line
    stream_mode=["updates", "messages", "custom"]
):
    print(chunk)
    print("\n")

你可以通过将 streamMode 作为数组传递来指定多种流式模式:streamMode: ["updates", "messages", "custom"]

const agent = createReactAgent({
  llm: model,
  tools: [getWeather],
});

for await (const chunk of await agent.stream(
  { messages: [{ role: "user", content: "what is the weather in sf" }] },
  { streamMode: ["updates", "messages", "custom"] }
)) {
  console.log(chunk);
  console.log("\n");
}

禁用流式传输

在某些应用中,你可能需要为给定模型禁用单个 token 的流式传输。这在 多智能代理 系统中很有用,可以控制哪些智能代理流式输出。

请参阅 模型 指南了解如何禁用流式传输。

从工作流流式传输

基本用法示例

LangGraph 图暴露 @[.stream()][Pregel.stream](同步)和 @[.astream()][Pregel.astream](异步)方法,将流式输出作为迭代器返回。

for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
async for chunk in graph.astream(inputs, stream_mode="updates"):
    print(chunk)

LangGraph 图暴露 @[.stream()][Pregel.stream] 方法,将流式输出作为迭代器返回。

for await (const chunk of await graph.stream(inputs, {
  streamMode: "updates",
})) {
  console.log(chunk);
}
扩展示例:流式传输更新

=== "Python"

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

# highlight-next-line
for chunk in graph.stream( # (1)!
    {"topic": "ice cream"},
    # highlight-next-line
    stream_mode="updates", # (2)!
):
    print(chunk)

  1. stream() 方法返回一个产出流式输出的迭代器。
  2. 设置 stream_mode="updates" 以仅流式传输每个节点后对图状态的更新。还有其他流式模式可用。详情请参阅 支持的流式模式

=== "JavaScript"

import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();

for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "updates" } // (1)!
)) {
  console.log(chunk);
}

  1. 设置 streamMode: "updates" 以仅流式传输每个节点后对图状态的更新。还有其他流式模式可用。详情请参阅 支持的流式模式

output {'refineTopic': {'topic': 'ice cream and cats'}} {'generateJoke': {'joke': 'This is a joke about ice cream and cats'}} |

多模式流式传输

你可以将列表作为 stream_mode 参数传递,一次流式传输多种模式。

流式输出将是 (mode, chunk) 元组,其中 mode 是流式模式的名称,chunk 是该模式流式传输的数据。

for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)
async for mode, chunk in graph.astream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

你可以将数组作为 streamMode 参数传递,一次流式传输多种模式。

流式输出将是 [mode, chunk] 元组,其中 mode 是流式模式的名称,chunk 是该模式流式传输的数据。

for await (const [mode, chunk] of await graph.stream(inputs, {
  streamMode: ["updates", "custom"],
})) {
  console.log(chunk);
}

流式传输图状态

使用流式模式 updatesvalues 在图执行时流式传输其状态。

  • updates 在图的每一步之后流式传输状态的 更新
  • values 在图的每一步之后流式传输状态的 完整值
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
  topic: str
  joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();

使用此模式仅流式传输节点返回的 状态更新。流式输出包括节点名称和更新。

for chunk in graph.stream(
    {"topic": "ice cream"},
    # highlight-next-line
    stream_mode="updates",
):
    print(chunk)
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "updates" }
)) {
  console.log(chunk);
}

使用此模式在每一步之后流式传输图的 完整状态

for chunk in graph.stream(
    {"topic": "ice cream"},
    # highlight-next-line
    stream_mode="values",
):
    print(chunk)
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "values" }
)) {
  console.log(chunk);
}

流式传输子图输出

要在流式输出中包含 子图 的输出,你可以在父图的 .stream() 方法中设置 subgraphs=True。这将流式传输父图和任何子图的输出。

输出将以元组 (namespace, data) 的形式流式传输,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ("parent_node:<task_id>", "child_node:<task_id>")

for chunk in graph.stream(
    {"foo": "foo"},
    # highlight-next-line
    subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 subgraphs=True 以流式传输子图的输出。

要在流式输出中包含 子图 的输出,你可以在父图的 .stream() 方法中设置 subgraphs: true。这将流式传输父图和任何子图的输出。

输出将以元组 [namespace, data] 的形式流式传输,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ["parent_node:<task_id>", "child_node:<task_id>"]

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    subgraphs: true, // (1)!
    streamMode: "updates",
  }
)) {
  console.log(chunk);
}
  1. 设置 subgraphs: true 以流式传输子图的输出。
扩展示例:从子图流式传输

=== "Python"

from langgraph.graph import START, StateGraph
from typing import TypedDict

# 定义子图
class SubgraphState(TypedDict):
    foo: str  # 注意此键与父图状态共享
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 定义父图
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="updates",
    # highlight-next-line
    subgraphs=True, # (1)!
):
    print(chunk)

  1. 设置 subgraphs=True 以流式传输子图的输出。

=== "JavaScript"

import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";

// 定义子图
const SubgraphState = z.object({
  foo: z.string(), // 注意此键与父图状态共享
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();

// 定义父图
const ParentState = z.object({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");
const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    streamMode: "updates",
    subgraphs: true, // (1)!
  }
)) {
  console.log(chunk);
}

  1. 设置 subgraphs: true 以流式传输子图的输出。

=== "Python"

((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})

=== "JavaScript"

[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]

注意,我们不仅接收节点更新,还接收命名空间,它们告诉我们正在从哪个图(或子图)流式传输。

调试

使用 debug 流式模式在图执行过程中尽可能多地流式传输信息。流式输出包括节点名称和完整状态。

for chunk in graph.stream(
    {"topic": "ice cream"},
    # highlight-next-line
    stream_mode="debug",
):
    print(chunk)
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "debug" }
)) {
  console.log(chunk);
}

LLM token

使用 messages 流式模式从图的任何部分(包括节点、工具、子图或任务)逐 token 流式传输大型语言模型(LLM)输出。

messages 模式 的流式输出是一个元组 (message_chunk, metadata),其中:

  • message_chunk:来自 LLM 的 token 或消息片段。
  • metadata:包含图节点和 LLM 调用详细信息的字典。

如果你的 LLM 没有 LangChain 集成,你可以使用 custom 模式流式传输其输出。详情请参阅 与任意 LLM 一起使用

Python < 3.11 异步需要手动配置

在 Python < 3.11 中使用异步代码时,你必须显式将 RunnableConfig 传递给 ainvoke() 以启用正确的流式传输。详情请参阅 Python < 3.11 异步 或升级到 Python 3.11+。

from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


llm = init_chat_model(model="openai:gpt-4o-mini")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    # highlight-next-line
    llm_response = llm.invoke( # (1)!
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": llm_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

for message_chunk, metadata in graph.stream( # (2)!
    {"topic": "ice cream"},
    # highlight-next-line
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)
  1. 注意,即使使用 .invoke 而不是 .stream 运行 LLM,消息事件也会被发出。
  2. "messages" 流式模式返回一个 (message_chunk, metadata) 元组的迭代器,其中 message_chunk 是 LLM 流式传输的 token,metadata 是一个字典,包含调用 LLM 的图节点信息和其他信息。

messages 模式 的流式输出是一个元组 [message_chunk, metadata],其中:

  • message_chunk:来自 LLM 的 token 或消息片段。
  • metadata:包含图节点和 LLM 调用详细信息的字典。

如果你的 LLM 没有 LangChain 集成,你可以使用 custom 模式流式传输其输出。详情请参阅 与任意 LLM 一起使用

import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";

const MyState = z.object({
  topic: z.string(),
  joke: z.string().default(""),
});

const llm = new ChatOpenAI({ model: "gpt-4o-mini" });

const callModel = async (state: z.infer<typeof MyState>) => {
  // Call the LLM to generate a joke about a topic
  const llmResponse = await llm.invoke([
    { role: "user", content: `Generate a joke about ${state.topic}` },
  ]); // (1)!
  return { joke: llmResponse.content };
};

const graph = new StateGraph(MyState)
  .addNode("callModel", callModel)
  .addEdge(START, "callModel")
  .compile();

for await (const [messageChunk, metadata] of await graph.stream(
  // (2)!
  { topic: "ice cream" },
  { streamMode: "messages" }
)) {
  if (messageChunk.content) {
    console.log(messageChunk.content + "|");
  }
}
  1. 注意,即使使用 .invoke 而不是 .stream 运行 LLM,消息事件也会被发出。
  2. "messages" 流式模式返回一个 [messageChunk, metadata] 元组的迭代器,其中 messageChunk 是 LLM 流式传输的 token,metadata 是一个字典,包含调用 LLM 的图节点信息和其他信息。

按 LLM 调用过滤

你可以将 tags 与 LLM 调用关联,以按 LLM 调用过滤流式 token。

from langchain.chat_models import init_chat_model

llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) # (1)!
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) # (2)!

graph = ... # 定义使用这些 LLM 的图

async for msg, metadata in graph.astream(  # (3)!
    {"topic": "cats"},
    # highlight-next-line
    stream_mode="messages",
):
    if metadata["tags"] == ["joke"]: # (4)!
        print(msg.content, end="|", flush=True)
  1. llm_1 被标记为 "joke"。
  2. llm_2 被标记为 "poem"。
  3. stream_mode 设置为 "messages" 以流式传输 LLM token。metadata 包含有关 LLM 调用的信息,包括标签。
  4. 通过元数据中的 tags 字段过滤流式 token,只包含带有 "joke" 标签的 LLM 调用的 token。
import { ChatOpenAI } from "@langchain/openai";

const llm1 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['joke'] // (1)!
});
const llm2 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['poem'] // (2)!
});

const graph = // ... 定义使用这些 LLM 的图

for await (const [msg, metadata] of await graph.stream( // (3)!
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  if (metadata.tags?.includes("joke")) { // (4)!
    console.log(msg.content + "|");
  }
}
  1. llm1 被标记为 "joke"。
  2. llm2 被标记为 "poem"。
  3. streamMode 设置为 "messages" 以流式传输 LLM token。metadata 包含有关 LLM 调用的信息,包括标签。
  4. 通过元数据中的 tags 字段过滤流式 token,只包含带有 "joke" 标签的 LLM 调用的 token。
扩展示例:按标签过滤

=== "Python"

from typing import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph

joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) # (1)!
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) # (2)!


class State(TypedDict):
      topic: str
      joke: str
      poem: str


async def call_model(state, config):
      topic = state["topic"]
      print("Writing joke...")
      # 注意:在 Python < 3.11 中显式传递 config 是必需的
      # 因为在此之前没有添加上下文变量支持:https://docs.python.org/3/library/asyncio-task.html#creating-tasks
      joke_response = await joke_model.ainvoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}],
            config, # (3)!
      )
      print("\n\nWriting poem...")
      poem_response = await poem_model.ainvoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}],
            config, # (3)!
      )
      return {"joke": joke_response.content, "poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
)

async for msg, metadata in graph.astream(
      {"topic": "cats"},
      # highlight-next-line
      stream_mode="messages", # (4)!
):
    if metadata["tags"] == ["joke"]: # (4)!
        print(msg.content, end="|", flush=True)

  1. joke_model 被标记为 "joke"。
  2. poem_model 被标记为 "poem"。
  3. 显式传递 config 以确保上下文变量正确传播。这是在使用异步代码时 Python < 3.11 所必需的。详情请参阅 异步部分
  4. stream_mode 设置为 "messages" 以流式传输 LLM token。metadata 包含有关 LLM 调用的信息,包括标签。

=== "JavaScript"

import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";

const jokeModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["joke"] // (1)!
});
const poemModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["poem"] // (2)!
});

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("callModel", (state) => {
    const topic = state.topic;
    console.log("Writing joke...");

    const jokeResponse = await jokeModel.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);

    console.log("\n\nWriting poem...");
    const poemResponse = await poemModel.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);

    return {
      joke: jokeResponse.content,
      poem: poemResponse.content
    };
  })
  .addEdge(START, "callModel")
  .compile();

for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" } // (3)!
)) {
  if (metadata.tags?.includes("joke")) { // (4)!
    console.log(msg.content + "|");
  }
}

  1. jokeModel 被标记为 "joke"。
  2. poemModel 被标记为 "poem"。
  3. streamMode 设置为 "messages" 以流式传输 LLM token。metadata 包含有关 LLM 调用的信息,包括标签。
  4. 通过元数据中的 tags 字段过滤流式 token,只包含带有 "joke" 标签的 LLM 调用的 token。

按节点过滤

要仅从特定节点流式传输 token,使用 stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出:

for msg, metadata in graph.stream( # (1)!
    inputs,
    # highlight-next-line
    stream_mode="messages",
):
    # highlight-next-line
    if msg.content and metadata["langgraph_node"] == "some_node_name": # (2)!
        ...
  1. "messages" 流式模式返回一个 (message_chunk, metadata) 元组,其中 message_chunk 是 LLM 流式传输的 token,metadata 是一个字典,包含调用 LLM 的图节点信息和其他信息。
  2. 通过元数据中的 langgraph_node 字段过滤流式 token,只包含来自 write_poem 节点的 token。
for await (const [msg, metadata] of await graph.stream(
  // (1)!
  inputs,
  { streamMode: "messages" }
)) {
  if (msg.content && metadata.langgraph_node === "some_node_name") {
    // (2)!
    // ...
  }
}
  1. "messages" 流式模式返回一个 [messageChunk, metadata] 元组,其中 messageChunk 是 LLM 流式传输的 token,metadata 是一个字典,包含调用 LLM 的图节点信息和其他信息。
  2. 通过元数据中的 langgraph_node 字段过滤流式 token,只包含来自 writePoem 节点的 token。
扩展示例:从特定节点流式传输 LLM token

=== "Python"

from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")


class State(TypedDict):
      topic: str
      joke: str
      poem: str


def write_joke(state: State):
      topic = state["topic"]
      joke_response = model.invoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}]
      )
      return {"joke": joke_response.content}


def write_poem(state: State):
      topic = state["topic"]
      poem_response = model.invoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}]
      )
      return {"poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(write_joke)
      .add_node(write_poem)
      # 并发写笑话和诗
      .add_edge(START, "write_joke")
      .add_edge(START, "write_poem")
      .compile()
)

# highlight-next-line
for msg, metadata in graph.stream( # (1)!
    {"topic": "cats"},
    stream_mode="messages",
):
    # highlight-next-line
    if msg.content and metadata["langgraph_node"] == "write_poem": # (2)!
        print(msg.content, end="|", flush=True)

  1. "messages" 流式模式返回一个 (message_chunk, metadata) 元组,其中 message_chunk 是 LLM 流式传输的 token,metadata 是一个字典,包含调用 LLM 的图节点信息和其他信息。
  2. 通过元数据中的 langgraph_node 字段过滤流式 token,只包含来自 write_poem 节点的 token。

=== "JavaScript"

import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import { z } from "zod";

const model = new ChatOpenAI({ model: "gpt-4o-mini" });

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("writeJoke", async (state) => {
    const topic = state.topic;
    const jokeResponse = await model.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);
    return { joke: jokeResponse.content };
  })
  .addNode("writePoem", async (state) => {
    const topic = state.topic;
    const poemResponse = await model.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);
    return { poem: poemResponse.content };
  })
  // 并发写笑话和诗
  .addEdge(START, "writeJoke")
  .addEdge(START, "writePoem")
  .compile();

for await (const [msg, metadata] of await graph.stream( // (1)!
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  if (msg.content && metadata.langgraph_node === "writePoem") { // (2)!
    console.log(msg.content + "|");
  }
}

  1. "messages" 流式模式返回一个 [messageChunk, metadata] 元组,其中 messageChunk 是 LLM 流式传输的 token,metadata 是一个字典,包含调用 LLM 的图节点信息和其他信息。
  2. 通过元数据中的 langgraph_node 字段过滤流式 token,只包含来自 writePoem 节点的 token。

流式传输自定义数据

要从 LangGraph 节点或工具内部发送 自定义用户定义数据,请按照以下步骤操作:

  1. 使用 get_stream_writer() 访问流写入器并发出自定义数据。
  2. 在调用 .stream().astream() 时设置 stream_mode="custom" 以在流中获取自定义数据。你可以组合多种模式(例如 ["updates", "custom"]),但至少必须有一个是 "custom"

Python < 3.11 异步中没有 get_stream_writer()

在 Python < 3.11 上运行的异步代码中,get_stream_writer() 将不起作用。 相反,向你的节点或工具添加 writer 参数并手动传递它。 请参阅 Python < 3.11 异步 了解使用示例。

from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START

class State(TypedDict):
    query: str
    answer: str

def node(state: State):
    writer = get_stream_writer()  # (1)!
    writer({"custom_key": "Generating custom data inside node"}) # (2)!
    return {"answer": "some data"}

graph = (
    StateGraph(State)
    .add_node(node)
    .add_edge(START, "node")
    .compile()
)

inputs = {"query": "example"}

# 使用
for chunk in graph.stream(inputs, stream_mode="custom"):  # (3)!
    print(chunk)
  1. 获取流写入器以发送自定义数据。
  2. 发出自定义键值对(例如进度更新)。
  3. 设置 stream_mode="custom" 以在流中接收自定义数据。
from langchain_core.tools import tool
from langgraph.config import get_stream_writer

@tool
def query_database(query: str) -> str:
    """Query the database."""
    writer = get_stream_writer() # (1)!
    # highlight-next-line
    writer({"data": "Retrieved 0/100 records", "type": "progress"}) # (2)!
    # 执行查询
    # highlight-next-line
    writer({"data": "Retrieved 100/100 records", "type": "progress"}) # (3)!
    return "some-answer"


graph = ... # 定义使用此工具的图

for chunk in graph.stream(inputs, stream_mode="custom"): # (4)!
    print(chunk)
  1. 访问流写入器以发送自定义数据。
  2. 发出自定义键值对(例如进度更新)。
  3. 发出另一个自定义键值对。
  4. 设置 stream_mode="custom" 以在流中接收自定义数据。

要从 LangGraph 节点或工具内部发送 自定义用户定义数据,请按照以下步骤操作:

  1. 使用 LangGraphRunnableConfig 中的 writer 参数发出自定义数据。
  2. 在调用 .stream() 时设置 streamMode: "custom" 以在流中获取自定义数据。你可以组合多种模式(例如 ["updates", "custom"]),但至少必须有一个是 "custom"
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  query: z.string(),
  answer: z.string(),
});

const graph = new StateGraph(State)
  .addNode("node", async (state, config) => {
    config.writer({ custom_key: "Generating custom data inside node" }); // (1)!
    return { answer: "some data" };
  })
  .addEdge(START, "node")
  .compile();

const inputs = { query: "example" };

// 使用
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { // (2)!
  console.log(chunk);
}
  1. 使用写入器发出自定义键值对(例如进度更新)。
  2. 设置 streamMode: "custom" 以在流中接收自定义数据。
import { tool } from "@langchain/core/tools";
import { LangGraphRunnableConfig } from "@langchain/langgraph";
import { z } from "zod";

const queryDatabase = tool(
  async (input, config: LangGraphRunnableConfig) => {
    config.writer({ data: "Retrieved 0/100 records", type: "progress" }); // (1)!
    // 执行查询
    config.writer({ data: "Retrieved 100/100 records", type: "progress" }); // (2)!
    return "some-answer";
  },
  {
    name: "query_database",
    description: "Query the database.",
    schema: z.object({
      query: z.string().describe("The query to execute."),
    }),
  }
);

const graph = // ... 定义使用此工具的图

for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { // (3)!
  console.log(chunk);
}
  1. 使用写入器发出自定义键值对(例如进度更新)。
  2. 发出另一个自定义键值对。
  3. 设置 streamMode: "custom" 以在流中接收自定义数据。

与任意 LLM 一起使用

你可以使用 stream_mode="custom"任何 LLM API 流式传输数据 — 即使该 API 没有 实现 LangChain 聊天模型接口。

这让你可以集成原始 LLM 客户端或提供自己流式接口的外部服务,使 LangGraph 对自定义设置高度灵活。

from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """Example node that calls an arbitrary model and streams the output"""
    # highlight-next-line
    writer = get_stream_writer() # (1)!
    # 假设你有一个产出块的流式客户端
    for chunk in your_custom_streaming_client(state["topic"]): # (2)!
        # highlight-next-line
        writer({"custom_llm_chunk": chunk}) # (3)!
    return {"result": "completed"}

graph = (
    StateGraph(State)
    .add_node(call_arbitrary_model)
    # 根据需要添加其他节点和边
    .compile()
)

for chunk in graph.stream(
    {"topic": "cats"},
    # highlight-next-line
    stream_mode="custom", # (4)!
):
    # chunk 将包含从 llm 流式传输的自定义数据
    print(chunk)
  1. 获取流写入器以发送自定义数据。
  2. 使用你的自定义流式客户端生成 LLM token。
  3. 使用写入器向流发送自定义数据。
  4. 设置 stream_mode="custom" 以在流中接收自定义数据。

你可以使用 streamMode: "custom"任何 LLM API 流式传输数据 — 即使该 API 没有 实现 LangChain 聊天模型接口。

这让你可以集成原始 LLM 客户端或提供自己流式接口的外部服务,使 LangGraph 对自定义设置高度灵活。

import { LangGraphRunnableConfig } from "@langchain/langgraph";

const callArbitraryModel = async (
  state: any,
  config: LangGraphRunnableConfig
) => {
  // 调用任意模型并流式传输输出的示例节点
  // 假设你有一个产出块的流式客户端
  for await (const chunk of yourCustomStreamingClient(state.topic)) {
    // (1)!
    config.writer({ custom_llm_chunk: chunk }); // (2)!
  }
  return { result: "completed" };
};

const graph = new StateGraph(State)
  .addNode("callArbitraryModel", callArbitraryModel)
  // 根据需要添加其他节点和边
  .compile();

for await (const chunk of await graph.stream(
  { topic: "cats" },
  { streamMode: "custom" } // (3)!
)) {
  // chunk 将包含从 llm 流式传输的自定义数据
  console.log(chunk);
}
  1. 使用你的自定义流式客户端生成 LLM token。
  2. 使用写入器向流发送自定义数据。
  3. 设置 streamMode: "custom" 以在流中接收自定义数据。
扩展示例:流式传输任意聊天模型

=== "Python"

import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# 这是我们的工具
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# 这是工具调用图节点
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)

让我们用一个包含工具调用的 AI 消息调用图:

inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)

=== "JavaScript"

import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { z } from "zod";
import OpenAI from "openai";

const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";

async function* streamTokens(modelName: string, messages: any[]) {
  const response = await openaiClient.chat.completions.create({
    messages,
    model: modelName,
    stream: true,
  });

  let role: string | null = null;
  for await (const chunk of response) {
    const delta = chunk.choices[0]?.delta;

    if (delta?.role) {
      role = delta.role;
    }

    if (delta?.content) {
      yield { role, content: delta.content };
    }
  }
}

// 这是我们的工具
const getItems = tool(
  async (input, config: LangGraphRunnableConfig) => {
    let response = "";
    for await (const msgChunk of streamTokens(
      modelName,
      [
        {
          role: "user",
          content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
        },
      ]
    )) {
      response += msgChunk.content;
      config.writer?.(msgChunk);
    }
    return response;
  },
  {
    name: "get_items",
    description: "Use this tool to list items one might find in a place you're asked about.",
    schema: z.object({
      place: z.string().describe("The place to look up items for."),
    }),
  }
);

const State = z.object({
  messages: z.array(z.any()),
});

const graph = new StateGraph(State)
  // 这是工具调用图节点
  .addNode("callTool", async (state) => {
    const aiMessage = state.messages.at(-1);
    const toolCall = aiMessage.tool_calls?.at(-1);

    const functionName = toolCall?.function?.name;
    if (functionName !== "get_items") {
      throw new Error(`Tool ${functionName} not supported`);
    }

    const functionArguments = toolCall?.function?.arguments;
    const args = JSON.parse(functionArguments);

    const functionResponse = await getItems.invoke(args);
    const toolMessage = {
      tool_call_id: toolCall.id,
      role: "tool",
      name: functionName,
      content: functionResponse,
    };
    return { messages: [toolMessage] };
  })
  .addEdge(START, "callTool")
  .compile();

让我们用一个包含工具调用的 AI 消息调用图:

const inputs = {
  messages: [
    {
      content: null,
      role: "assistant",
      tool_calls: [
        {
          id: "1",
          function: {
            arguments: '{"place":"bedroom"}',
            name: "get_items",
          },
          type: "function",
        }
      ],
    }
  ]
};

for await (const chunk of await graph.stream(
  inputs,
  { streamMode: "custom" }
)) {
  console.log(chunk.content + "|");
}

为特定聊天模型禁用流式传输

如果你的应用混合使用支持流式传输和不支持流式传输的模型,你可能需要为不支持流式传输的模型显式禁用流式传输。

初始化模型时设置 disable_streaming=True

from langchain.chat_models import init_chat_model

model = init_chat_model(
    "anthropic:claude-3-7-sonnet-latest",
    # highlight-next-line
    disable_streaming=True # (1)!
)
  1. 设置 disable_streaming=True 以禁用聊天模型的流式传输。
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="o1-preview", disable_streaming=True) # (1)!
  1. 设置 disable_streaming=True 以禁用聊天模型的流式传输。

初始化模型时设置 streaming: false

import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "o1-preview",
  streaming: false, // (1)!
});

Python < 3.11 异步

在 Python < 3.11 版本中,asyncio tasks 不支持 context 参数。 这限制了 LangGraph 自动传播上下文的能力,并以两种关键方式影响 LangGraph 的流式机制:

  1. 必须 显式将 RunnableConfig 传递给异步 LLM 调用(例如 ainvoke()),因为回调不会自动传播。
  2. 不能 在异步节点或工具中使用 get_stream_writer() — 你必须直接传递 writer 参数。
扩展示例:带手动配置的异步 LLM 调用
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model

llm = init_chat_model(model="openai:gpt-4o-mini")

class State(TypedDict):
    topic: str
    joke: str

async def call_model(state, config): # (1)!
    topic = state["topic"]
    print("Generating joke...")
    joke_response = await llm.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        # highlight-next-line
        config, # (2)!
    )
    return {"joke": joke_response.content}

graph = (
    StateGraph(State)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

async for chunk, metadata in graph.astream(
    {"topic": "ice cream"},
    # highlight-next-line
    stream_mode="messages", # (3)!
):
    if chunk.content:
        print(chunk.content, end="|", flush=True)
  1. 在异步节点函数中接受 config 作为参数。
  2. config 传递给 llm.ainvoke() 以确保正确的上下文传播。
  3. 设置 stream_mode="messages" 以流式传输 LLM token。
扩展示例:带流写入器的异步自定义流式传输
from typing import TypedDict
from langgraph.types import StreamWriter

class State(TypedDict):
      topic: str
      joke: str

# highlight-next-line
async def generate_joke(state: State, writer: StreamWriter): # (1)!
      writer({"custom_key": "Streaming custom data while generating a joke"})
      return {"joke": f"This is a joke about {state['topic']}"}

graph = (
      StateGraph(State)
      .add_node(generate_joke)
      .add_edge(START, "generate_joke")
      .compile()
)

async for chunk in graph.astream(
      {"topic": "ice cream"},
      # highlight-next-line
      stream_mode="custom", # (2)!
):
      print(chunk)
  1. 在异步节点或工具的函数签名中添加 writer 作为参数。LangGraph 将自动将流写入器传递给函数。
  2. 设置 stream_mode="custom" 以在流中接收自定义数据。