跳转至

如何使用图 API

本指南演示了 LangGraph 图 API 的基础知识。它详细介绍了状态,以及如何组合常见的图结构,如序列分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 map-reduce 工作流的 Send API 和用于结合状态更新与跨节点"跳转"的 Command API

设置

安装 langgraph

pip install -U langgraph

安装 langgraph

npm install @langchain/langgraph

设置 LangSmith 以获得更好的调试体验

注册 LangSmith 以快速发现问题并提升 LangGraph 项目的性能。LangSmith 让你能够使用追踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 — 在文档中阅读更多入门信息。

定义和更新状态

这里我们展示如何在 LangGraph 中定义和更新状态。我们将演示:

  1. 如何使用状态来定义图的模式
  2. 如何使用归约器来控制状态更新的处理方式。

定义状态

LangGraph 中的状态可以是 TypedDictPydantic 模型或 dataclass。下面我们将使用 TypedDict。有关使用 Pydantic 的详细信息,请参阅此部分

LangGraph 中的状态可以使用 Zod 模式定义。下面我们将使用 Zod。有关使用其他方法的详细信息,请参阅此部分

默认情况下,图将具有相同的输入和输出模式,状态决定该模式。有关如何定义不同的输入和输出模式,请参阅此部分

让我们考虑一个使用消息的简单示例。这代表了许多 LLM 应用程序状态的通用表示方式。有关更多详细信息,请参阅我们的概念页面

from langchain_core.messages import AnyMessage
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[AnyMessage]
    extra_field: int

此状态跟踪一个消息对象列表,以及一个额外的整数字段。

import { BaseMessage } from "@langchain/core/messages";
import { z } from "zod";

const State = z.object({
  messages: z.array(z.custom<BaseMessage>()),
  extraField: z.number(),
});

此状态跟踪一个消息对象列表,以及一个额外的整数字段。

更新状态

让我们构建一个包含单个节点的示例图。我们的节点只是一个读取图状态并对其进行更新的 Python 函数。此函数的第一个参数始终是状态:

from langchain_core.messages import AIMessage

def node(state: State):
    messages = state["messages"]
    new_message = AIMessage("Hello!")
    return {"messages": messages + [new_message], "extra_field": 10}

此节点只是将一条消息追加到我们的消息列表中,并填充一个额外的字段。

让我们构建一个包含单个节点的示例图。我们的节点只是一个读取图状态并对其进行更新的 TypeScript 函数。此函数的第一个参数始终是状态:

import { AIMessage } from "@langchain/core/messages";

const node = (state: z.infer<typeof State>) => {
  const messages = state.messages;
  const newMessage = new AIMessage("Hello!");
  return { messages: messages.concat([newMessage]), extraField: 10 };
};

此节点只是将一条消息追加到我们的消息列表中,并填充一个额外的字段。

Important

节点应该直接返回对状态的更新,而不是修改状态。

接下来让我们定义一个包含此节点的简单图。我们使用 StateGraph 来定义一个在此状态上操作的图。然后我们使用 add_node 填充我们的图。

from langgraph.graph import StateGraph

builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()

接下来让我们定义一个包含此节点的简单图。我们使用 StateGraph 来定义一个在此状态上操作的图。然后我们使用 addNode 填充我们的图。

import { StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge("__start__", "node")
  .compile();

LangGraph 提供了用于可视化图的内置工具。让我们检查一下我们的图。有关可视化的详细信息,请参阅此部分

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

带单个节点的简单图

import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);

在这种情况下,我们的图只执行单个节点。让我们进行一个简单的调用:

from langchain_core.messages import HumanMessage

result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi'), AIMessage(content='Hello!')], 'extra_field': 10}
import { HumanMessage } from "@langchain/core/messages";

const result = await graph.invoke({ messages: [new HumanMessage("Hi")], extraField: 0 });
console.log(result);
{ messages: [HumanMessage { content: 'Hi' }, AIMessage { content: 'Hello!' }], extraField: 10 }

请注意:

  • 我们通过更新状态的单个键来启动调用。
  • 我们在调用结果中收到完整的状态。

为了方便起见,我们经常通过 pretty-print 检查消息对象的内容:

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

为了方便起见,我们经常通过日志检查消息对象的内容:

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

使用归约器处理状态更新

状态中的每个键都可以有自己独立的归约器函数,该函数控制如何应用来自节点的更新。如果没有明确指定归约器函数,则假定对该键的所有更新都应覆盖它。

对于 TypedDict 状态模式,我们可以通过使用归约器函数注释状态的相应字段来定义归约器。

在前面的示例中,我们的节点通过向状态中的 "messages" 键追加消息来更新它。下面,我们向此键添加一个归约器,以便自动追加更新:

from typing_extensions import Annotated

def add(left, right):
    """也可以从 `operator` 内置模块导入 `add`。"""
    return left + right

class State(TypedDict):
    # highlight-next-line
    messages: Annotated[list[AnyMessage], add]
    extra_field: int

现在我们的节点可以简化为:

def node(state: State):
    new_message = AIMessage("Hello!")
    # highlight-next-line
    return {"messages": [new_message], "extra_field": 10}

对于 Zod 状态模式,我们可以通过在模式字段上使用特殊的 .langgraph.reducer() 方法来定义归约器。

在前面的示例中,我们的节点通过向状态中的 "messages" 键追加消息来更新它。下面,我们向此键添加一个归约器,以便自动追加更新:

import "@langchain/langgraph/zod";

const State = z.object({
  // highlight-next-line
  messages: z.array(z.custom<BaseMessage>()).langgraph.reducer((x, y) => x.concat(y)),
  extraField: z.number(),
});

现在我们的节点可以简化为:

const node = (state: z.infer<typeof State>) => {
  const newMessage = new AIMessage("Hello!");
  // highlight-next-line
  return { messages: [newMessage], extraField: 10 };
};
from langgraph.graph import START

graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()

result = graph.invoke({"messages": [HumanMessage("Hi")]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!
import { START } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge(START, "node")
  .compile();

const result = await graph.invoke({ messages: [new HumanMessage("Hi")] });

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

MessagesState

在实践中,更新消息列表还有一些额外的考虑因素:

  • 我们可能希望更新状态中的现有消息。
  • 我们可能希望接受消息格式的简写,例如 OpenAI 格式

LangGraph 包含一个内置的归约器 add_messages,它处理这些考虑因素:

from langgraph.graph.message import add_messages

class State(TypedDict):
    # highlight-next-line
    messages: Annotated[list[AnyMessage], add_messages]
    extra_field: int

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}

graph = StateGraph(State).add_node(node).set_entry_point("node").compile()
# highlight-next-line
input_message = {"role": "user", "content": "Hi"}

result = graph.invoke({"messages": [input_message]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

这是涉及聊天模型的应用程序状态的通用表示方式。LangGraph 包含一个预构建的 MessagesState 以方便使用,因此我们可以有:

from langgraph.graph import MessagesState

class State(MessagesState):
    extra_field: int

LangGraph 包含一个内置的 MessagesZodState,它处理这些考虑因素:

import { MessagesZodState } from "@langchain/langgraph";

const State = z.object({
  // highlight-next-line
  messages: MessagesZodState.shape.messages,
  extraField: z.number(),
});

const graph = new StateGraph(State)
  .addNode("node", (state) => {
    const newMessage = new AIMessage("Hello!");
    return { messages: [newMessage], extraField: 10 };
  })
  .addEdge(START, "node")
  .compile();
// highlight-next-line
const inputMessage = { role: "user", content: "Hi" };

const result = await graph.invoke({ messages: [inputMessage] });

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

这是涉及聊天模型的应用程序状态的通用表示方式。LangGraph 包含这个预构建的 MessagesZodState 以方便使用,因此我们可以有:

import { MessagesZodState } from "@langchain/langgraph";

const State = MessagesZodState.extend({
  extraField: z.number(),
});

定义输入和输出模式

默认情况下,StateGraph 使用单个模式操作,所有节点都应使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。

当指定不同的模式时,内部模式仍将用于节点之间的通信。输入模式确保提供的输入与预期结构匹配,而输出模式根据定义的输出模式过滤内部数据以仅返回相关信息。

下面,我们将看到如何定义不同的输入和输出模式。

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 定义输入模式
class InputState(TypedDict):
    question: str

# 定义输出模式
class OutputState(TypedDict):
    answer: str

# 定义总体模式,结合输入和输出
class OverallState(InputState, OutputState):
    pass

# 定义处理输入并生成答案的节点
def answer_node(state: InputState):
    # 示例答案和一个额外的键
    return {"answer": "bye", "question": state["question"]}

# 使用指定的输入和输出模式构建图
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node)  # 添加答案节点
builder.add_edge(START, "answer_node")  # 定义起始边
builder.add_edge("answer_node", END)  # 定义结束边
graph = builder.compile()  # 编译图

# 使用输入调用图并打印结果
print(graph.invoke({"question": "hi"}))
{'answer': 'bye'}
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

// 定义输入模式
const InputState = z.object({
  question: z.string(),
});

// 定义输出模式
const OutputState = z.object({
  answer: z.string(),
});

// 定义总体模式,结合输入和输出
const OverallState = InputState.merge(OutputState);

// 使用指定的输入和输出模式构建图
const graph = new StateGraph({
  input: InputState,
  output: OutputState,
  state: OverallState,
})
  .addNode("answerNode", (state) => {
    // 示例答案和一个额外的键
    return { answer: "bye", question: state.question };
  })
  .addEdge(START, "answerNode")
  .addEdge("answerNode", END)
  .compile();

// 使用输入调用图并打印结果
console.log(await graph.invoke({ question: "hi" }));
{ answer: 'bye' }

请注意,invoke 的输出仅包含输出模式。

在节点之间传递私有状态

在某些情况下,你可能希望节点交换对中间逻辑至关重要但不需要成为图主模式一部分的信息。这些私有数据与图的整体输入/输出无关,只应在某些节点之间共享。

下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,其中私有数据在前两个步骤(node_1 和 node_2)之间传递,而第三个步骤(node_3)只能访问公共的总体状态。

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 图的总体状态(这是跨节点共享的公共状态)
class OverallState(TypedDict):
    a: str

# node_1 的输出包含不属于总体状态的私有数据
class Node1Output(TypedDict):
    private_data: str

# 私有数据仅在 node_1 和 node_2 之间共享
def node_1(state: OverallState) -> Node1Output:
    output = {"private_data": "set by node_1"}
    print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 2 输入只请求 node_1 之后可用的私有数据
class Node2Input(TypedDict):
    private_data: str

def node_2(state: Node2Input) -> OverallState:
    output = {"a": "set by node_2"}
    print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 3 只能访问总体状态(无法访问来自 node_1 的私有数据)
def node_3(state: OverallState) -> OverallState:
    output = {"a": "set by node_3"}
    print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# 按顺序连接节点
# node_2 接受来自 node_1 的私有数据,而
# node_3 看不到私有数据。
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()

# 使用初始状态调用图
response = graph.invoke(
    {
        "a": "set at start",
    }
)

print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:
    Input: {'a': 'set at start'}.
    Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:
    Input: {'private_data': 'set by node_1'}.
    Returned: {'a': 'set by node_2'}
Entered node `node_3`:
    Input: {'a': 'set by node_2'}.
    Returned: {'a': 'set by node_3'}

Output of graph invocation: {'a': 'set by node_3'}
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

// 图的总体状态(这是跨节点共享的公共状态)
const OverallState = z.object({
  a: z.string(),
});

// node1 的输出包含不属于总体状态的私有数据
const Node1Output = z.object({
  privateData: z.string(),
});

// 私有数据仅在 node1 和 node2 之间共享
const node1 = (state: z.infer<typeof OverallState>): z.infer<typeof Node1Output> => {
  const output = { privateData: "set by node1" };
  console.log(`Entered node 'node1':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`);
  return output;
};

// Node 2 输入只请求 node1 之后可用的私有数据
const Node2Input = z.object({
  privateData: z.string(),
});

const node2 = (state: z.infer<typeof Node2Input>): z.infer<typeof OverallState> => {
  const output = { a: "set by node2" };
  console.log(`Entered node 'node2':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`);
  return output;
};

// Node 3 只能访问总体状态(无法访问来自 node1 的私有数据)
const node3 = (state: z.infer<typeof OverallState>): z.infer<typeof OverallState> => {
  const output = { a: "set by node3" };
  console.log(`Entered node 'node3':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`);
  return output;
};

// 按顺序连接节点
// node2 接受来自 node1 的私有数据,而
// node3 看不到私有数据。
const graph = new StateGraph({
  state: OverallState,
  nodes: {
    node1: { action: node1, output: Node1Output },
    node2: { action: node2, input: Node2Input },
    node3: { action: node3 },
  }
})
  .addEdge(START, "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .addEdge("node3", END)
  .compile();

// 使用初始状态调用图
const response = await graph.invoke({ a: "set at start" });

console.log(`\nOutput of graph invocation: ${JSON.stringify(response)}`);
Entered node 'node1':
    Input: {"a":"set at start"}.
    Returned: {"privateData":"set by node1"}
Entered node 'node2':
    Input: {"privateData":"set by node1"}.
    Returned: {"a":"set by node2"}
Entered node 'node3':
    Input: {"a":"set by node2"}.
    Returned: {"a":"set by node3"}

Output of graph invocation: {"a":"set by node3"}

使用 Pydantic 模型作为图状态

StateGraph 在初始化时接受一个 state_schema 参数,用于指定图中节点可以访问和更新的状态"形状"。

在我们的示例中,我们通常使用 Python 原生的 TypedDictdataclass 作为 state_schema,但 state_schema 可以是任何类型

在这里,我们将看到如何使用 Pydantic BaseModel 作为 state_schema,以在**输入**上添加运行时验证。

已知限制

  • 目前,图的输出将**不会**是 pydantic 模型的实例。
  • 运行时验证仅在节点的输入上进行,而不在输出上进行。
  • pydantic 的验证错误追踪不会显示错误出现在哪个节点中。
  • Pydantic 的递归验证可能会很慢。对于对性能敏感的应用程序,你可能需要考虑使用 dataclass 代替。
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel

# 图的总体状态(这是跨节点共享的公共状态)
class OverallState(BaseModel):
    a: str

def node(state: OverallState):
    return {"a": "goodbye"}

# 构建状态图
builder = StateGraph(OverallState)
builder.add_node(node)  # node_1 是第一个节点
builder.add_edge(START, "node")  # 以 node_1 启动图
builder.add_edge("node", END)  # 在 node_1 之后结束图
graph = builder.compile()

# 使用有效输入测试图
graph.invoke({"a": "hello"})

使用**无效**输入调用图

try:
    graph.invoke({"a": 123})  # 应该是字符串
except Exception as e:
    print("An exception was raised because `a` is an integer rather than a string.")
    print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
a
  Input should be a valid string [type=string_type, input_value=123, input_type=int]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type

有关 Pydantic 模型状态的其他功能,请参见下文:

序列化行为

当使用 Pydantic 模型作为状态模式时,了解序列化的工作原理非常重要,特别是在以下情况下: - 将 Pydantic 对象作为输入传递 - 从图接收输出 - 使用嵌套的 Pydantic 模型

让我们看看这些行为的实际情况。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class NestedModel(BaseModel):
    value: str

class ComplexState(BaseModel):
    text: str
    count: int
    nested: NestedModel

def process_node(state: ComplexState):
    # 节点接收一个已验证的 Pydantic 对象
    print(f"Input state type: {type(state)}")
    print(f"Nested type: {type(state.nested)}")
    # 返回字典更新
    return {"text": state.text + " processed", "count": state.count + 1}

# 构建图
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()

# 创建用于输入的 Pydantic 实例
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"Input object type: {type(input_state)}")

# 使用 Pydantic 实例调用图
result = graph.invoke(input_state)
print(f"Output type: {type(result)}")
print(f"Output content: {result}")

# 如果需要,转换回 Pydantic 模型
output_model = ComplexState(**result)
print(f"Converted back to Pydantic: {type(output_model)}")
运行时类型强制转换

Pydantic 对某些数据类型执行运行时类型强制转换。这可能很有帮助,但如果你不了解它,也可能导致意外行为。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class CoercionExample(BaseModel):
    # Pydantic 将字符串数字强制转换为整数
    number: int
    # Pydantic 将字符串布尔值解析为 bool
    flag: bool

def inspect_node(state: CoercionExample):
    print(f"number: {state.number} (type: {type(state.number)})")
    print(f"flag: {state.flag} (type: {type(state.flag)})")
    return {}

builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()

# 演示使用将被转换的字符串输入进行强制转换
result = graph.invoke({"number": "42", "flag": "true"})

# 这将因验证错误而失败
try:
    graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
    print(f"\nExpected validation error: {e}")
使用消息模型

在状态模式中使用 LangChain 消息类型时,序列化有一些重要的考虑因素。你应该使用 AnyMessage(而不是 BaseMessage)以在通过网络传输消息对象时进行适当的序列化/反序列化。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage
from typing import List

class ChatState(BaseModel):
    messages: List[AnyMessage]
    context: str

def add_message(state: ChatState):
    return {"messages": state.messages + [AIMessage(content="Hello there!")]}

builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()

# 使用消息创建输入
initial_state = ChatState(
    messages=[HumanMessage(content="Hi")], context="Customer support chat"
)

result = graph.invoke(initial_state)
print(f"Output: {result}")

# 转换回 Pydantic 模型以查看消息类型
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
    print(f"Message {i}: {type(msg).__name__} - {msg.content}")

替代状态定义

虽然 Zod 模式是推荐的方法,但 LangGraph 也支持其他定义状态模式的方式:

import { BaseMessage } from "@langchain/core/messages";
import { StateGraph } from "@langchain/langgraph";

interface WorkflowChannelsState {
  messages: BaseMessage[];
  question: string;
  answer: string;
}

const workflowWithChannels = new StateGraph<WorkflowChannelsState>({
  channels: {
    messages: {
      reducer: (currentState, updateValue) => currentState.concat(updateValue),
      default: () => [],
    },
    question: null,
    answer: null,
  },
});

添加运行时配置

有时你希望能够在调用图时对其进行配置。例如,你可能希望能够在运行时指定要使用的 LLM 或系统提示,而不会使用这些参数污染图状态

要添加运行时配置:

  1. 为你的配置指定模式
  2. 将配置添加到节点或条件边的函数签名中
  3. 将配置传递到图中。

下面是一个简单的示例:

from langgraph.graph import END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

# 1. 指定配置模式
class ContextSchema(TypedDict):
    my_runtime_value: str

# 2. 定义一个在节点中访问配置的图
class State(TypedDict):
    my_state_value: str

# highlight-next-line
def node(state: State, runtime: Runtime[ContextSchema]):
    # highlight-next-line
    if runtime.context["my_runtime_value"] == "a":
        return {"my_state_value": 1}
        # highlight-next-line
    elif runtime.context["my_runtime_value"] == "b":
        return {"my_state_value": 2}
    else:
        raise ValueError("Unknown values.")

# highlight-next-line
builder = StateGraph(State, context_schema=ContextSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)

graph = builder.compile()

# 3. 在运行时传入配置:
# highlight-next-line
print(graph.invoke({}, context={"my_runtime_value": "a"}))
# highlight-next-line
print(graph.invoke({}, context={"my_runtime_value": "b"}))
{'my_state_value': 1}
{'my_state_value': 2}
import { StateGraph, END, START } from "@langchain/langgraph";
import { RunnableConfig } from "@langchain/core/runnables";
import { z } from "zod";

// 1. 指定配置模式
const ConfigurableSchema = z.object({
  myRuntimeValue: z.string(),
});

// 2. 定义一个在节点中访问配置的图
const State = z.object({
  myStateValue: z.number(),
});

const graph = new StateGraph(State)
  .addNode("node", (state, config) => {
    // highlight-next-line
    if (config?.configurable?.myRuntimeValue === "a") {
      return { myStateValue: 1 };
      // highlight-next-line
    } else if (config?.configurable?.myRuntimeValue === "b") {
      return { myStateValue: 2 };
    } else {
      throw new Error("Unknown values.");
    }
  })
  .addEdge(START, "node")
  .addEdge("node", END)
  .compile();

// 3. 在运行时传入配置:
// highlight-next-line
console.log(await graph.invoke({}, { configurable: { myRuntimeValue: "a" } }));
// highlight-next-line
console.log(await graph.invoke({}, { configurable: { myRuntimeValue: "b" } }));
{ myStateValue: 1 }
{ myStateValue: 2 }
扩展示例:在运行时指定 LLM

下面我们演示一个实际示例,其中我们在运行时配置要使用的 LLM。我们将同时使用 OpenAI 和 Anthropic 模型。

from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"

MODELS = {
    "anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
    "openai": init_chat_model("openai:gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    response = model.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# 用法
input_message = {"role": "user", "content": "hi"}
# 不使用配置时,使用默认值(Anthropic)
response_1 = graph.invoke({"messages": [input_message]}, context=ContextSchema())["messages"][-1]
# 或者,可以设置 OpenAI
response_2 = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai"})["messages"][-1]

print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
claude-3-5-haiku-20241022
gpt-4.1-mini-2025-04-14

下面我们演示一个实际示例,其中我们在运行时配置要使用的 LLM。我们将同时使用 OpenAI 和 Anthropic 模型。

import { ChatOpenAI } from "@langchain/openai";
import { ChatAnthropic } from "@langchain/anthropic";
import { MessagesZodState, StateGraph, START, END } from "@langchain/langgraph";
import { RunnableConfig } from "@langchain/core/runnables";
import { z } from "zod";

const ConfigSchema = z.object({
  modelProvider: z.string().default("anthropic"),
});

const MODELS = {
  anthropic: new ChatAnthropic({ model: "claude-3-5-haiku-latest" }),
  openai: new ChatOpenAI({ model: "gpt-4o-mini" }),
};

const graph = new StateGraph(MessagesZodState)
  .addNode("model", async (state, config) => {
    const modelProvider = config?.configurable?.modelProvider || "anthropic";
    const model = MODELS[modelProvider as keyof typeof MODELS];
    const response = await model.invoke(state.messages);
    return { messages: [response] };
  })
  .addEdge(START, "model")
  .addEdge("model", END)
  .compile();

// 用法
const inputMessage = { role: "user", content: "hi" };
// 不使用配置时,使用默认值(Anthropic)
const response1 = await graph.invoke({ messages: [inputMessage] });
// 或者,可以设置 OpenAI
const response2 = await graph.invoke(
  { messages: [inputMessage] },
  { configurable: { modelProvider: "openai" } }
);

console.log(response1.messages.at(-1)?.response_metadata?.model);
console.log(response2.messages.at(-1)?.response_metadata?.model);
claude-3-5-haiku-20241022
gpt-4o-mini-2024-07-18

扩展示例:在运行时指定模型和系统消息

下面我们演示一个实际示例,其中我们配置两个参数:在运行时使用的 LLM 和系统消息。

from dataclasses import dataclass
from typing import Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"
    system_message: str | None = None

MODELS = {
    "anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
    "openai": init_chat_model("openai:gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    messages = state["messages"]
    if (system_message := runtime.context.system_message):
        messages = [SystemMessage(system_message)] + messages
    response = model.invoke(messages)
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# 用法
input_message = {"role": "user", "content": "hi"}
response = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai", "system_message": "Respond in Italian."})
for message in response["messages"]:
    message.pretty_print()
================================ Human Message ================================

hi
================================== Ai Message ==================================

Ciao! Come posso aiutarti oggi?

下面我们演示一个实际示例,其中我们配置两个参数:在运行时使用的 LLM 和系统消息。

import { ChatOpenAI } from "@langchain/openai";
import { ChatAnthropic } from "@langchain/anthropic";
import { SystemMessage } from "@langchain/core/messages";
import { MessagesZodState, StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

const ConfigSchema = z.object({
  modelProvider: z.string().default("anthropic"),
  systemMessage: z.string().optional(),
});

const MODELS = {
  anthropic: new ChatAnthropic({ model: "claude-3-5-haiku-latest" }),
  openai: new ChatOpenAI({ model: "gpt-4o-mini" }),
};

const graph = new StateGraph(MessagesZodState)
  .addNode("model", async (state, config) => {
    const modelProvider = config?.configurable?.modelProvider || "anthropic";
    const systemMessage = config?.configurable?.systemMessage;

    const model = MODELS[modelProvider as keyof typeof MODELS];
    let messages = state.messages;

    if (systemMessage) {
      messages = [new SystemMessage(systemMessage), ...messages];
    }

    const response = await model.invoke(messages);
    return { messages: [response] };
  })
  .addEdge(START, "model")
  .addEdge("model", END)
  .compile();

// 用法
const inputMessage = { role: "user", content: "hi" };
const response = await graph.invoke(
  { messages: [inputMessage] },
  {
    configurable: {
      modelProvider: "openai",
      systemMessage: "Respond in Italian."
    }
  }
);

for (const message of response.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: hi
ai: Ciao! Come posso aiutarti oggi?

添加重试策略

在许多使用场景中,你可能希望节点具有自定义重试策略,例如调用 API、查询数据库或调用 LLM 等。LangGraph 允许你向节点添加重试策略。

要配置重试策略,请将 retry_policy 参数传递给 add_noderetry_policy 参数接受一个 RetryPolicy 命名元组对象。下面我们使用默认参数实例化一个 RetryPolicy 对象并将其与节点关联:

from langgraph.types import RetryPolicy

builder.add_node(
    "node_name",
    node_function,
    retry_policy=RetryPolicy(),
)

默认情况下,retry_on 参数使用 default_retry_on 函数,该函数对除以下异常外的任何异常进行重试:

  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError

此外,对于来自流行的 http 请求库(如 requestshttpx)的异常,它仅在 5xx 状态码时重试。

要配置重试策略,请将 retryPolicy 参数传递给 addNoderetryPolicy 参数接受一个 RetryPolicy 对象。下面我们使用默认参数实例化一个 RetryPolicy 对象并将其与节点关联:

import { RetryPolicy } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("nodeName", nodeFunction, { retryPolicy: {} })
  .compile();

默认情况下,重试策略对除以下异常外的任何异常进行重试:

  • TypeError
  • SyntaxError
  • ReferenceError
扩展示例:自定义重试策略

考虑一个我们从 SQL 数据库读取数据的示例。下面我们向节点传递两个不同的重试策略:

import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.types import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage

db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")

def query_database(state: MessagesState):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}

def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 定义新图
builder = StateGraph(MessagesState)
builder.add_node(
    "query_database",
    query_database,
    retry_policy=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry_policy=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()

考虑一个我们从 SQL 数据库读取数据的示例。下面我们向节点传递两个不同的重试策略:

import Database from "better-sqlite3";
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, START, END, MessagesZodState } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";
import { z } from "zod";

// 创建内存数据库
const db: typeof Database.prototype = new Database(":memory:");

const model = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620" });

const callModel = async (state: z.infer<typeof MessagesZodState>) => {
  const response = await model.invoke(state.messages);
  return { messages: [response] };
};

const queryDatabase = async (state: z.infer<typeof MessagesZodState>) => {
  const queryResult: string = JSON.stringify(
    db.prepare("SELECT * FROM Artist LIMIT 10;").all(),
  );

  return { messages: [new AIMessage({ content: "queryResult" })] };
};

const workflow = new StateGraph(MessagesZodState)
  // 定义我们将在其间循环的两个节点
  .addNode("call_model", callModel, { retryPolicy: { maxAttempts: 5 } })
  .addNode("query_database", queryDatabase, {
    retryPolicy: {
      retryOn: (e: any): boolean => {
        if (e instanceof Database.SqliteError) {
          // 在 "SQLITE_BUSY" 错误时重试
          return e.code === "SQLITE_BUSY";
        }
        return false; // 在其他错误时不重试
      },
    },
  })
  .addEdge(START, "call_model")
  .addEdge("call_model", "query_database")
  .addEdge("query_database", END);

const graph = workflow.compile();

添加节点缓存

节点缓存在你想要避免重复操作的情况下很有用,例如执行昂贵的操作(无论是时间上还是成本上)。LangGraph 允许你向图中的节点添加个性化的缓存策略。

要配置缓存策略,请将 cache_policy 参数传递给 add_node 函数。在以下示例中,实例化了一个生存时间为 120 秒的 CachePolicy 对象和默认的 key_func 生成器。然后将其与节点关联:

from langgraph.types import CachePolicy

builder.add_node(
    "node_name",
    node_function,
    cache_policy=CachePolicy(ttl=120),
)

然后,要为图启用节点级缓存,请在编译图时设置 cache 参数。下面的示例使用 InMemoryCache 设置具有内存缓存的图,但也可以使用 SqliteCache

from langgraph.cache.memory import InMemoryCache

graph = builder.compile(cache=InMemoryCache())

创建步骤序列

前提条件

本指南假设你已熟悉上面关于状态的部分。

这里我们演示如何构建简单的步骤序列。我们将展示:

  1. 如何构建顺序图
  2. 构建类似图的内置简写方法。

要添加节点序列,我们使用.add_node.add_edge 方法:

from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

我们也可以使用内置简写 .add_sequence

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")

要添加节点序列,我们使用.addNode.addEdge 方法:

import { START, StateGraph } from "@langchain/langgraph";

const builder = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addNode("step3", step3)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "step3");
为什么使用 LangGraph 将应用步骤拆分为序列?

LangGraph 使向应用程序添加底层持久化层变得容易。 这允许在节点执行之间检查点状态,因此你的 LangGraph 节点控制:

它们还决定执行步骤如何流式传输,以及如何使用 LangGraph Studio 可视化和调试应用程序。

让我们演示一个端到端示例。我们将创建一个包含三个步骤的序列:

  1. 在状态的键中填充值
  2. 更新相同的值
  3. 填充不同的值

让我们首先定义我们的状态。这控制图的模式,还可以指定如何应用更新。有关更多详细信息,请参阅此部分

在我们的例子中,我们只需跟踪两个值:

from typing_extensions import TypedDict

class State(TypedDict):
    value_1: str
    value_2: int
import { z } from "zod";

const State = z.object({
  value1: z.string(),
  value2: z.number(),
});

我们的节点只是读取图状态并对其进行更新的 Python 函数。此函数的第一个参数始终是状态:

def step_1(state: State):
    return {"value_1": "a"}

def step_2(state: State):
    current_value_1 = state["value_1"]
    return {"value_1": f"{current_value_1} b"}

def step_3(state: State):
    return {"value_2": 10}

我们的节点只是读取图状态并对其进行更新的 TypeScript 函数。此函数的第一个参数始终是状态:

const step1 = (state: z.infer<typeof State>) => {
  return { value1: "a" };
};

const step2 = (state: z.infer<typeof State>) => {
  const currentValue1 = state.value1;
  return { value1: `${currentValue1} b` };
};

const step3 = (state: z.infer<typeof State>) => {
  return { value2: 10 };
};

Note

请注意,在向状态发出更新时,每个节点只需指定它希望更新的键的值。

默认情况下,这将**覆盖**相应键的值。你还可以使用归约器来控制如何处理更新——例如,你可以将连续的更新追加到键而不是覆盖。有关更多详细信息,请参阅此部分

最后,我们定义图。我们使用 StateGraph 来定义一个在此状态上操作的图。

然后我们将使用 add_nodeadd_edge 来填充我们的图并定义其控制流。

from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

然后我们将使用 addNodeaddEdge 来填充我们的图并定义其控制流。

import { START, StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addNode("step3", step3)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "step3")
  .compile();

指定自定义名称

你可以使用 .add_node 为节点指定自定义名称:

builder.add_node("my_node", step_1)

指定自定义名称

你可以使用 .addNode 为节点指定自定义名称:

const graph = new StateGraph(State)
  .addNode("myNode", step1)
  .compile();

请注意:

  • .add_edge 接受节点的名称,对于函数,默认为 node.__name__
  • 我们必须指定图的入口点。为此,我们添加一条与 START 节点的边。
  • 当没有更多节点要执行时,图将停止。

接下来我们编译我们的图。这提供了对图结构的一些基本检查(例如,识别孤立的节点)。如果我们通过检查点向应用程序添加持久化,它也会在这里传入。

graph = builder.compile()
  • .addEdge 接受节点的名称,对于函数,默认为 node.name
  • 我们必须指定图的入口点。为此,我们添加一条与 START 节点的边。
  • 当没有更多节点要执行时,图将停止。

接下来我们编译我们的图。这提供了对图结构的一些基本检查(例如,识别孤立的节点)。如果我们通过检查点向应用程序添加持久化,它也会在这里传入。

LangGraph 提供了用于可视化图的内置工具。让我们检查我们的序列。有关可视化的详细信息,请参阅本指南

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

步骤序列图

import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);

让我们进行一个简单的调用:

graph.invoke({"value_1": "c"})
{'value_1': 'a b', 'value_2': 10}
const result = await graph.invoke({ value1: "c" });
console.log(result);
{ value1: 'a b', value2: 10 }

请注意:

  • 我们通过为单个状态键提供值来启动调用。我们必须始终为至少一个键提供值。
  • 我们传入的值被第一个节点覆盖。
  • 第二个节点更新了该值。
  • 第三个节点填充了不同的值。

内置简写

langgraph>=0.2.46 包含一个内置简写 add_sequence 用于添加节点序列。你可以如下编译相同的图:

# highlight-next-line
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")

graph = builder.compile()

graph.invoke({"value_1": "c"})

创建分支

并行执行节点对于加快整体图操作至关重要。LangGraph 为节点的并行执行提供原生支持,这可以显著提高基于图的工作流的性能。这种并行化是通过扇出和扇入机制实现的,利用标准边和条件边。下面是一些示例,展示如何添加适合你的分支数据流。

并行运行图节点

在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。对于我们的状态,我们指定归约器 add 操作。这将为状态中的特定键组合或累积值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用归约器更新状态的更多详细信息,请参阅上面的状态归约器部分。

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # operator.add 归约器函数使其仅支持追加
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
import "@langchain/langgraph/zod";
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  // 归约器使其仅支持追加
  aggregate: z.array(z.string()).langgraph.reducer((x, y) => x.concat(y)),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log(`Adding "A" to ${state.aggregate}`);
  return { aggregate: ["A"] };
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log(`Adding "B" to ${state.aggregate}`);
  return { aggregate: ["B"] };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log(`Adding "C" to ${state.aggregate}`);
  return { aggregate: ["C"] };
};

const nodeD = (state: z.infer<typeof State>) => {
  console.log(`Adding "D" to ${state.aggregate}`);
  return { aggregate: ["D"] };
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addNode("d", nodeD)
  .addEdge(START, "a")
  .addEdge("a", "b")
  .addEdge("a", "c")
  .addEdge("b", "d")
  .addEdge("c", "d")
  .addEdge("d", END)
  .compile();
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

并行执行图

import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);

使用归约器,你可以看到在每个节点中添加的值被累积。

graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
const result = await graph.invoke({
  aggregate: [],
});
console.log(result);
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
{ aggregate: ['A', 'B', 'C', 'D'] }

Note

在上面的示例中,节点 "b""c" 在同一超步骤中并发执行。因为它们在同一步骤中,节点 "d""b""c" 都完成后执行。

重要的是,来自并行超步骤的更新可能不会始终有序。如果你需要从并行超步骤中获得一致的、预定的更新顺序,你应该将输出写入状态的单独字段,并使用一个值来对它们进行排序。

异常处理?

LangGraph 在超步骤内执行节点,这意味着虽然并行分支是并行执行的,但整个超步骤是**事务性的**。如果这些分支中的任何一个引发异常,**没有**任何更新会应用到状态(整个超步骤出错)。

重要的是,当使用检查点时,超步骤内成功节点的结果会被保存,并且在恢复时不会重复。

如果你有容易出错的节点(可能想要处理不稳定的 API 调用),LangGraph 提供两种方法来解决这个问题:

  1. 你可以在节点内编写常规 python 代码来捕获和处理异常。
  2. 你可以设置**retry_policy**来指导图重试引发某些类型异常的节点。只有失败的分支会被重试,因此你无需担心执行冗余工作。

总之,这些功能让你可以执行并行执行并完全控制异常处理。

延迟节点执行

延迟节点执行在你想要延迟节点的执行直到所有其他待处理任务完成时很有用。这在分支具有不同长度的情况下特别相关,这在 map-reduce 流等工作流中很常见。

上面的示例展示了当每个路径只有一步时如何扇出和扇入。但是如果一个分支有多个步骤呢?让我们在 "b" 分支中添加一个节点 "b_2"

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # operator.add 归约器函数使其仅支持追加
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def b_2(state: State):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
# highlight-next-line
builder.add_node(d, defer=True)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

延迟执行图

graph.invoke({"aggregate": []})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']

在上面的示例中,节点 "b""c" 在同一超步骤中并发执行。我们在节点 d 上设置 defer=True,以便它在所有待处理任务完成之前不会执行。在这种情况下,这意味着 "d" 等待执行直到整个 "b" 分支完成。

条件分支

如果你的扇出应该根据状态在运行时变化,你可以使用 add_conditional_edges 来使用图状态选择一个或多个路径。请参见下面的示例,其中节点 a 生成确定后续节点的状态更新。

import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    # 向状态添加一个键。我们将设置此键来确定
    # 如何分支。
    which: str

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    # highlight-next-line
    return {"aggregate": ["A"], "which": "c"}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)

def conditional_edge(state: State) -> Literal["b", "c"]:
    # 在此处填写使用状态的任意逻辑
    # 来确定下一个节点
    return state["which"]

# highlight-next-line
builder.add_conditional_edges("a", conditional_edge)

graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

条件分支图

result = graph.invoke({"aggregate": []})
print(result)
Adding "A" to []
Adding "C" to ['A']
{'aggregate': ['A', 'C'], 'which': 'c'}

如果你的扇出应该根据状态在运行时变化,你可以使用 addConditionalEdges 来使用图状态选择一个或多个路径。请参见下面的示例,其中节点 a 生成确定后续节点的状态更新。

import "@langchain/langgraph/zod";
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  aggregate: z.array(z.string()).langgraph.reducer((x, y) => x.concat(y)),
  // 向状态添加一个键。我们将设置此键来确定
  // 如何分支。
  which: z.string().langgraph.reducer((x, y) => y ?? x),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log(`Adding "A" to ${state.aggregate}`);
  // highlight-next-line
  return { aggregate: ["A"], which: "c" };
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log(`Adding "B" to ${state.aggregate}`);
  return { aggregate: ["B"] };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log(`Adding "C" to ${state.aggregate}`);
  return { aggregate: ["C"] };
};

const conditionalEdge = (state: z.infer<typeof State>): "b" | "c" => {
  // 在此处填写使用状态的任意逻辑
  // 来确定下一个节点
  return state.which as "b" | "c";
};

// highlight-next-line
const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addEdge(START, "a")
  .addEdge("b", END)
  .addEdge("c", END)
  .addConditionalEdges("a", conditionalEdge)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
const result = await graph.invoke({ aggregate: [] });
console.log(result);
Adding "A" to []
Adding "C" to ['A']
{ aggregate: ['A', 'C'], which: 'c' }

Tip

你的条件边可以路由到多个目标节点。例如:

def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]
const routeBcOrCd = (state: z.infer<typeof State>): string[] => {
  if (state.which === "cd") {
    return ["c", "d"];
  }
  return ["b", "c"];
};

Map-Reduce 和 Send API

LangGraph 使用 Send API 支持 map-reduce 和其他高级分支模式。以下是如何使用它的示例:

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing_extensions import TypedDict, Annotated
import operator

class OverallState(TypedDict):
    topic: str
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]
    best_selected_joke: str

def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def best_joke(state: OverallState):
    return {"best_selected_joke": "penguins"}

builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Map-reduce graph with fanout

# 调用图:这里我们调用它来生成笑话列表
for step in graph.stream({"topic": "animals"}):
    print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}
import "@langchain/langgraph/zod";
import { StateGraph, START, END, Send } from "@langchain/langgraph";
import { z } from "zod";

const OverallState = z.object({
  topic: z.string(),
  subjects: z.array(z.string()),
  jokes: z.array(z.string()).langgraph.reducer((x, y) => x.concat(y)),
  bestSelectedJoke: z.string(),
});

const generateTopics = (state: z.infer<typeof OverallState>) => {
  return { subjects: ["lions", "elephants", "penguins"] };
};

const generateJoke = (state: { subject: string }) => {
  const jokeMap: Record<string, string> = {
    lions: "Why don't lions like fast food? Because they can't catch it!",
    elephants: "Why don't elephants use computers? They're afraid of the mouse!",
    penguins: "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
  };
  return { jokes: [jokeMap[state.subject]] };
};

const continueToJokes = (state: z.infer<typeof OverallState>) => {
  return state.subjects.map((subject) => new Send("generateJoke", { subject }));
};

const bestJoke = (state: z.infer<typeof OverallState>) => {
  return { bestSelectedJoke: "penguins" };
};

const graph = new StateGraph(OverallState)
  .addNode("generateTopics", generateTopics)
  .addNode("generateJoke", generateJoke)
  .addNode("bestJoke", bestJoke)
  .addEdge(START, "generateTopics")
  .addConditionalEdges("generateTopics", continueToJokes)
  .addEdge("generateJoke", "bestJoke")
  .addEdge("bestJoke", END)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
// 调用图:这里我们调用它来生成笑话列表
for await (const step of await graph.stream({ topic: "animals" })) {
  console.log(step);
}
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }
{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }
{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }
{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }
{ bestJoke: { bestSelectedJoke: 'penguins' } }

创建和控制循环

在创建带有循环的图时,我们需要一个终止执行的机制。最常见的做法是添加一个条件边,一旦达到某个终止条件,就路由到 END 节点。

你还可以在调用或流式传输图时设置图的递归限制。递归限制设置图在引发错误之前允许执行的超步骤数量。在这里阅读有关递归限制概念的更多信息。

让我们考虑一个带有循环的简单图,以更好地理解这些机制如何工作。

Tip

要返回状态的最后一个值而不是收到递归限制错误,请参阅下一节

在创建循环时,你可以包含一个指定终止条件的条件边:

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

def route(state: State) -> Literal["b", END]:
    if termination_condition(state):
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addConditionalEdges("a", route)
  .addEdge("b", "a")
  .compile();

const route = (state: z.infer<typeof State>): "b" | typeof END => {
  if (terminationCondition(state)) {
    return END;
  } else {
    return "b";
  }
};

要控制递归限制,请在配置中指定 "recursionLimit"。这将引发一个 GraphRecursionError,你可以捕获并处理它:

from langgraph.errors import GraphRecursionError

try:
    graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
    print("Recursion Error")
import { GraphRecursionError } from "@langchain/langgraph";

try {
  await graph.invoke(inputs, { recursionLimit: 3 });
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion Error");
  }
}

让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # operator.add 归约器函数使其仅支持追加
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# 定义边
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Simple loop graph

import "@langchain/langgraph/zod";
import { StateGraph, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  // 归约器使其仅支持追加
  aggregate: z.array(z.string()).langgraph.reducer((x, y) => x.concat(y)),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log(`Node A sees ${state.aggregate}`);
  return { aggregate: ["A"] };
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log(`Node B sees ${state.aggregate}`);
  return { aggregate: ["B"] };
};

// 定义边
const route = (state: z.infer<typeof State>): "b" | typeof END => {
  if (state.aggregate.length < 7) {
    return "b";
  } else {
    return END;
  }
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addConditionalEdges("a", route)
  .addEdge("b", "a")
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);

这种架构类似于一个 React 智能代理,其中节点 "a" 是一个工具调用模型,而节点 "b" 代表工具。

在我们的 route 条件边中,我们指定在状态中的 "aggregate" 列表超过阈值长度后应该结束。

调用图时,我们看到我们在节点 "a""b" 之间交替,直到达到终止条件后才终止。

graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
const result = await graph.invoke({ aggregate: [] });
console.log(result);
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
{ aggregate: ['A', 'B', 'A', 'B', 'A', 'B', 'A'] }

施加递归限制

在某些应用中,我们可能无法保证会达到给定的终止条件。在这些情况下,我们可以设置图的递归限制。这将在给定数量的超步骤之后引发 GraphRecursionError。然后我们可以捕获并处理此异常:

from langgraph.errors import GraphRecursionError

try:
    graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error
import { GraphRecursionError } from "@langchain/langgraph";

try {
  await graph.invoke({ aggregate: [] }, { recursionLimit: 4 });
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion Error");
  }
}
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Recursion Error
扩展示例:在达到递归限制时返回状态

与其引发 GraphRecursionError,我们可以向状态引入一个新键,该键跟踪达到递归限制之前剩余的步数。然后我们可以使用此键来确定是否应该结束运行。

LangGraph 实现了一个特殊的 RemainingSteps 注释。在底层,它创建了一个 ManagedValue 通道——一个在我们的图运行期间存在且不会更长时间存在的状态通道。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    remaining_steps: RemainingSteps

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# 定义边
def route(state: State) -> Literal["b", END]:
    if state["remaining_steps"] <= 2:
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

# 测试一下
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
{'aggregate': ['A', 'B', 'A']}

扩展示例:带分支的循环

为了更好地理解递归限制如何工作,让我们考虑一个更复杂的示例。下面我们实现一个循环,但有一步会扇出到两个节点:

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Node C sees {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Node D sees {state["aggregate"]}')
    return {"aggregate": ["D"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)

# 定义边
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

复杂的带分支的循环图

这个图看起来很复杂,但可以被概念化为超步骤的循环:

  1. 节点 A
  2. 节点 B
  3. 节点 C 和 D
  4. 节点 A
  5. ...

我们有一个四个超步骤的循环,其中节点 C 和 D 并发执行。

如之前那样调用图,我们看到在达到终止条件之前我们完成了两圈完整的"循环":

result = graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']

然而,如果我们将递归限制设置为四,我们只完成一圈,因为每圈是四个超步骤:

from langgraph.errors import GraphRecursionError

try:
    result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

异步

使用异步编程范式可以在并发运行 IO 密集型代码时产生显著的性能改进(例如,向聊天模型提供商发出并发 API 请求)。

要将图的 sync 实现转换为 async 实现,你需要:

  1. 更新 nodes 使用 async def 而不是 def
  2. 更新内部代码以适当使用 await
  3. 根据需要使用 .ainvoke.astream 调用图。

因为许多 LangChain 对象实现了 Runnable Protocol,它具有所有 sync 方法的 async 变体,所以通常可以相当快速地将 sync 图升级为 async 图。

请参见下面的示例。为了演示底层 LLM 的异步调用,我们将包含一个聊天模型:

聊天模型配置

查看支持的聊天模型配置方法:聊天模型选项卡

支持的模型包括:OpenAI、Anthropic、Azure、Google Gemini 和 AWS Bedrock。

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph

# highlight-next-line
async def node(state: MessagesState): # (1)!
    # highlight-next-line
    new_message = await llm.ainvoke(state["messages"]) # (2)!
    return {"messages": [new_message]}

builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()

input_message = {"role": "user", "content": "Hello"}
# highlight-next-line
result = await graph.ainvoke({"messages": [input_message]}) # (3)!
  1. 将节点声明为异步函数。
  2. 在节点内使用可用的异步调用。
  3. 在图对象本身上使用异步调用。

异步流式传输

有关使用异步进行流式传输的示例,请参阅流式传输指南

使用 Command 组合控制流和状态更新

组合控制流(边)和状态更新(节点)可能很有用。例如,你可能希望在同一个节点中同时执行状态更新并决定接下来要去哪个节点。LangGraph 提供了一种方法,通过从节点函数返回 Command 对象来实现:

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # 状态更新
        update={"foo": "bar"},
        # 控制流
        goto="my_other_node"
    )
import { Command } from "@langchain/langgraph";

const myNode = (state: State): Command => {
  return new Command({
    // 状态更新
    update: { foo: "bar" },
    // 控制流
    goto: "myOtherNode"
  });
};

我们在下面展示一个端到端示例。让我们创建一个包含 3 个节点的简单图:A、B 和 C。我们将首先执行节点 A,然后根据节点 A 的输出决定接下来是去节点 B 还是节点 C。

import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command

# 定义图状态
class State(TypedDict):
    foo: str

# 定义节点

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    print("Called A")
    value = random.choice(["b", "c"])
    # 这是条件边函数的替代
    if value == "b":
        goto = "node_b"
    else:
        goto = "node_c"

    # 注意 Command 如何允许你同时更新图状态并路由到下一个节点
    return Command(
        # 这是状态更新
        update={"foo": value},
        # 这是边的替代
        goto=goto,
    )

def node_b(state: State):
    print("Called B")
    return {"foo": state["foo"] + "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": state["foo"] + "c"}

我们现在可以使用上述节点创建 StateGraph。请注意,该图没有用于路由的条件边!这是因为控制流在 node_a 内部使用 Command 定义。

builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# 注意:节点 A、B 和 C 之间没有边!

graph = builder.compile()

Important

你可能已经注意到我们使用 Command 作为返回类型注释,例如 Command[Literal["node_b", "node_c"]]。这对于图渲染是必要的,并告诉 LangGraph node_a 可以导航到 node_bnode_c

from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))

基于 Command 的图导航

如果我们多次运行图,我们会看到它根据节点 A 中的随机选择采用不同的路径(A -> B 或 A -> C)。

graph.invoke({"foo": ""})
Called A
Called C
import { StateGraph, START, Command } from "@langchain/langgraph";
import { z } from "zod";

// 定义图状态
const State = z.object({
  foo: z.string(),
});

// 定义节点

const nodeA = (state: z.infer<typeof State>): Command => {
  console.log("Called A");
  const value = Math.random() > 0.5 ? "b" : "c";
  // 这是条件边函数的替代
  const goto = value === "b" ? "nodeB" : "nodeC";

  // 注意 Command 如何允许你同时更新图状态并路由到下一个节点
  return new Command({
    // 这是状态更新
    update: { foo: value },
    // 这是边的替代
    goto,
  });
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log("Called B");
  return { foo: state.foo + "b" };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log("Called C");
  return { foo: state.foo + "c" };
};

我们现在可以使用上述节点创建 StateGraph。请注意,该图没有用于路由的条件边!这是因为控制流在 nodeA 内部使用 Command 定义。

const graph = new StateGraph(State)
  .addNode("nodeA", nodeA, {
    ends: ["nodeB", "nodeC"],
  })
  .addNode("nodeB", nodeB)
  .addNode("nodeC", nodeC)
  .addEdge(START, "nodeA")
  .compile();

Important

你可能已经注意到我们使用 ends 来指定 nodeA 可以导航到哪些节点。这对于图渲染是必要的,并告诉 LangGraph nodeA 可以导航到 nodeBnodeC

import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);

如果我们多次运行图,我们会看到它根据节点 A 中的随机选择采用不同的路径(A -> B 或 A -> C)。

const result = await graph.invoke({ foo: "" });
console.log(result);
Called A
Called C
{ foo: 'cc' }

导航到父图中的节点

如果你正在使用子图,你可能希望从子图内的节点导航到不同的子图(即父图中的不同节点)。为此,你可以在 Command 中指定 graph=Command.PARENT:

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # 其中 `other_subgraph` 是父图中的一个节点
        graph=Command.PARENT
    )
const myNode = (state: State): Command => {
  return new Command({
    update: { foo: "bar" },
    goto: "otherSubgraph",  // 其中 `otherSubgraph` 是父图中的一个节点
    graph: Command.PARENT
  });
};

让我们使用上面的示例来演示这一点。我们将通过将上面示例中的 nodeA 更改为单节点图来做到这一点,我们将其作为子图添加到父图中。

使用 Command.PARENT 进行状态更新

当你从子图节点向父图节点发送更新时,对于父图和子图状态模式共享的键,你**必须**在父图状态中为要更新的键定义归约器。请参见下面的示例。

import operator
from typing_extensions import Annotated

class State(TypedDict):
    # 注意:我们在这里定义一个归约器
    # highlight-next-line
    foo: Annotated[str, operator.add]

def node_a(state: State):
    print("Called A")
    value = random.choice(["a", "b"])
    # 这是条件边函数的替代
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # 注意 Command 如何允许你同时更新图状态并路由到下一个节点
    return Command(
        update={"foo": value},
        goto=goto,
        # 这告诉 LangGraph 导航到父图中的 node_b 或 node_c
        # 注意:这将导航到相对于子图最近的父图
        # highlight-next-line
        graph=Command.PARENT,
    )

subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()

def node_b(state: State):
    print("Called B")
    # 注意:由于我们定义了归约器,我们不需要手动追加
    # 新字符到现有的 'foo' 值。相反,归约器将自动追加这些
    # (通过 operator.add)
    # highlight-next-line
    return {"foo": "b"}

def node_c(state: State):
    print("Called C")
    # highlight-next-line
    return {"foo": "c"}

builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()
graph.invoke({"foo": ""})
Called A
Called C
import "@langchain/langgraph/zod";
import { StateGraph, START, Command } from "@langchain/langgraph";
import { z } from "zod";

const State = z.object({
  // 注意:我们在这里定义一个归约器
  // highlight-next-line
  foo: z.string().langgraph.reducer((x, y) => x + y),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log("Called A");
  const value = Math.random() > 0.5 ? "nodeB" : "nodeC";

  // 注意 Command 如何允许你同时更新图状态并路由到下一个节点
  return new Command({
    update: { foo: "a" },
    goto: value,
    // 这告诉 LangGraph 导航到父图中的 nodeB 或 nodeC
    // 注意:这将导航到相对于子图最近的父图
    // highlight-next-line
    graph: Command.PARENT,
  });
};

const subgraph = new StateGraph(State)
  .addNode("nodeA", nodeA, { ends: ["nodeB", "nodeC"] })
  .addEdge(START, "nodeA")
  .compile();

const nodeB = (state: z.infer<typeof State>) => {
  console.log("Called B");
  // 注意:由于我们定义了归约器,我们不需要手动追加
  // 新字符到现有的 'foo' 值。相反,归约器将自动追加这些
  // highlight-next-line
  return { foo: "b" };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log("Called C");
  // highlight-next-line
  return { foo: "c" };
};

const graph = new StateGraph(State)
  .addNode("subgraph", subgraph, { ends: ["nodeB", "nodeC"] })
  .addNode("nodeB", nodeB)
  .addNode("nodeC", nodeC)
  .addEdge(START, "subgraph")
  .compile();
const result = await graph.invoke({ foo: "" });
console.log(result);
Called A
Called C
{ foo: 'ac' }

在工具内使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,你可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,你可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]}):

@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
    """使用它来查找用户信息以更好地协助他们回答问题。"""
    user_info = get_user_info(config.get("configurable", {}).get("user_id"))
    return Command(
        update={
            # 更新状态键
            "user_info": user_info,
            # 更新消息历史记录
            "messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)]
        }
    )
import { tool } from "@langchain/core/tools";
import { Command } from "@langchain/langgraph";
import { RunnableConfig } from "@langchain/core/runnables";
import { z } from "zod";

const lookupUserInfo = tool(
  async (input, config: RunnableConfig) => {
    const userId = config.configurable?.userId;
    const userInfo = getUserInfo(userId);
    return new Command({
      update: {
        // 更新状态键
        userInfo: userInfo,
        // 更新消息历史记录
        messages: [{
          role: "tool",
          content: "Successfully looked up user information",
          tool_call_id: config.toolCall.id
        }]
      }
    });
  },
  {
    name: "lookupUserInfo",
    description: "使用它来查找用户信息以更好地协助他们回答问题。",
    schema: z.object({}),
  }
);

Important

当从工具返回 Command 时,你必须在 Command.update 中包含 messages(或用于消息历史记录的任何状态键),并且 messages 中的消息列表必须包含一个 ToolMessage。这对于生成的消息历史记录有效是必要的(LLM 提供商要求带有工具调用的 AI 消息后面必须跟着工具结果消息)。

如果你正在使用通过 Command 更新状态的工具,我们建议使用预构建的 ToolNode,它会自动处理返回 Command 对象的工具并将它们传播到图状态。如果你正在编写调用工具的自定义节点,则需要手动传播工具返回的 Command 对象作为节点的更新。

可视化你的图

这里我们演示如何可视化你创建的图。

你可以可视化任意的 Graph,包括 StateGraph

让我们通过绘制分形来获得一些乐趣 :)。

import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

class MyNode:
    def __init__(self, name: str):
        self.name = name
    def __call__(self, state: State):
        return {"messages": [("assistant", f"Called node {self.name}")]}

def route(state) -> Literal["entry_node", "__end__"]:
    if len(state["messages"]) > 10:
        return "__end__"
    return "entry_node"

def add_fractal_nodes(builder, current_node, level, max_level):
    if level > max_level:
        return
    # Number of nodes to create at this level
    num_nodes = random.randint(1, 3)  # Adjust randomness as needed
    for i in range(num_nodes):
        nm = ["A", "B", "C"][i]
        node_name = f"node_{current_node}_{nm}"
        builder.add_node(node_name, MyNode(node_name))
        builder.add_edge(current_node, node_name)
        # Recursively add more nodes
        r = random.random()
        if r > 0.2 and level + 1 < max_level:
            add_fractal_nodes(builder, node_name, level + 1, max_level)
        elif r > 0.05:
            builder.add_conditional_edges(node_name, route, node_name)
        else:
            # End
            builder.add_edge(node_name, "__end__")

def build_fractal_graph(max_level: int):
    builder = StateGraph(State)
    entry_point = "entry_node"
    builder.add_node(entry_point, MyNode(entry_point))
    builder.add_edge(START, entry_point)
    add_fractal_nodes(builder, entry_point, 1, max_level)
    # Optional: set a finish point if required
    builder.add_edge(entry_point, END)  # or any specific node
    return builder.compile()

app = build_fractal_graph(3)

让我们创建一个简单的示例图来演示可视化。

import { StateGraph, START, END } from "@langchain/langgraph";
import { MessagesZodState } from "@langchain/langgraph";
import { z } from "zod";

const State = MessagesZodState.extend({
  value: z.number(),
});

const app = new StateGraph(State)
  .addNode("node1", (state) => {
    return { value: state.value + 1 };
  })
  .addNode("node2", (state) => {
    return { value: state.value * 2 };
  })
  .addEdge(START, "node1")
  .addConditionalEdges("node1", (state) => {
    if (state.value < 10) {
      return "node2";
    }
    return END;
  })
  .addEdge("node2", "node1")
  .compile();

Mermaid

我们还可以将图类转换为 Mermaid 语法。

print(app.get_graph().draw_mermaid())
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    __start__([<p>__start__</p>])
    entry_node(entry_node)
    node_entry_node_A(node_entry_node_A)
    node_entry_node_B(node_entry_node_B)
    node_node_entry_node_B_A(node_node_entry_node_B_A)
    node_node_entry_node_B_B(node_node_entry_node_B_B)
    node_node_entry_node_B_C(node_node_entry_node_B_C)
    __end__([<p>__end__</p>])
    __start__ --> entry_node;
    entry_node --> __end__;
    entry_node --> node_entry_node_A;
    entry_node --> node_entry_node_B;
    node_entry_node_B --> node_node_entry_node_B_A;
    node_entry_node_B --> node_node_entry_node_B_B;
    node_entry_node_B --> node_node_entry_node_B_C;
    node_entry_node_A -.-> entry_node;
    node_entry_node_A -.-> __end__;
    node_node_entry_node_B_A -.-> entry_node;
    node_node_entry_node_B_A -.-> __end__;
    node_node_entry_node_B_B -.-> entry_node;
    node_node_entry_node_B_B -.-> __end__;
    node_node_entry_node_B_C -.-> entry_node;
    node_node_entry_node_B_C -.-> __end__;
    classDef default fill:#f2f0ff,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#bfb6fc
const drawableGraph = await app.getGraphAsync();
console.log(drawableGraph.drawMermaid());
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    __start__([<p>__start__</p>])
    node1(node1)
    node2(node2)
    __end__([<p>__end__</p>])
    __start__ --> node1;
    node1 -.-> node2;
    node1 -.-> __end__;
    node2 --> node1;
    classDef default fill:#f2f0ff,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#bfb6fc

PNG

如果需要,我们可以将图渲染为 .png。这里我们可以使用三个选项:

  • 使用 Mermaid.ink API(不需要额外的包)
  • 使用 Mermaid + Pyppeteer(需要 pip install pyppeteer)
  • 使用 graphviz(需要 pip install graphviz)

使用 Mermaid.Ink

默认情况下,draw_mermaid_png() 使用 Mermaid.Ink 的 API 来生成图表。

from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(Image(app.get_graph().draw_mermaid_png()))

分形图可视化

使用 Mermaid + Pyppeteer

import nest_asyncio

nest_asyncio.apply()  # Jupyter Notebook 运行异步函数所需

display(
    Image(
        app.get_graph().draw_mermaid_png(
            curve_style=CurveStyle.LINEAR,
            node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
            wrap_label_n_words=9,
            output_file_path=None,
            draw_method=MermaidDrawMethod.PYPPETEER,
            background_color="white",
            padding=10,
        )
    )
)

使用 Graphviz

try:
    display(Image(app.get_graph().draw_png()))
except ImportError:
    print(
        "You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
    )

如果需要,我们可以将图渲染为 .png。这使用 Mermaid.ink API 来生成图表。

import * as fs from "node:fs/promises";

const drawableGraph = await app.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);