跳转至

持久化执行

**持久化执行**是一种技术,其中进程或工作流在关键点保存其进度,允许它暂停并稍后从停止的地方恢复。这在需要人机协同的场景中特别有用,用户可以在继续之前检查、验证或修改进程,以及在可能遇到中断或错误的长时间运行任务中(例如,对 LLM 的调用超时)。通过保留已完成的工作,持久化执行使进程能够恢复而无需重新处理先前的步骤——即使在很长的延迟(例如一周后)之后。

LangGraph 内置的持久化层为工作流提供持久化执行,确保每个执行步骤的状态都保存到持久化存储中。此功能可确保如果工作流被中断——无论是由于系统故障还是人机协同交互——它都可以从最后记录的状态恢复。

Tip

如果你使用 LangGraph 与检查点器,你已经启用了持久化执行。你可以在任何点暂停和恢复工作流,即使在中断或失败之后。 要充分利用持久化执行,请确保你的工作流被设计为确定性幂等,并将任何副作用或非确定性操作包装在任务内。你可以从StateGraph(图 API)函数式 API中使用任务

要求

要在 LangGraph 中利用持久化执行,你需要:

  1. 通过指定将保存工作流进度的检查点器在工作流中启用持久化
  2. 在执行工作流时指定线程标识符。这将跟踪工作流特定实例的执行历史。
  1. 将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)包装在 @[tasks][task] 内,以确保当工作流恢复时,这些操作不会针对特定运行重复执行,而是从持久化层检索它们的结果。有关更多信息,请参阅确定性和一致重放
  1. 将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)包装在 @[tasks][task] 内,以确保当工作流恢复时,这些操作不会针对特定运行重复执行,而是从持久化层检索它们的结果。有关更多信息,请参阅确定性和一致重放

确定性和一致重放

当你恢复工作流运行时,代码**不会**从执行停止的**同一行代码**恢复;相反,它将识别一个适当的起始点,从中继续停止的地方。这意味着工作流将从起始点重放所有步骤,直到达到停止的点。

因此,当你为持久化执行编写工作流时,必须将任何非确定性操作(例如随机数生成)和任何具有副作用的操作(例如文件写入、API 调用)包装在任务节点内。

为确保你的工作流是确定性的并且可以一致地重放,请遵循以下准则:

  • 避免重复工作:如果节点包含多个具有副作用的操作(例如日志记录、文件写入或网络调用),请将每个操作包装在单独的**任务**中。这可确保当工作流恢复时,操作不会重复,并且它们的结果将从持久化层检索。
  • **封装非确定性操作:**将任何可能产生非确定性结果的代码(例如随机数生成)包装在**任务**或**节点**内。这可确保在恢复时,工作流遵循具有相同结果的确切记录步骤序列。
  • 使用幂等操作:在可能的情况下,确保副作用(例如 API 调用、文件写入)是幂等的。这意味着如果操作在工作流失败后重试,它将具有与第一次执行时相同的效果。这对于导致数据写入的操作特别重要。如果**任务**启动但未能成功完成,工作流的恢复将重新运行**任务**,依赖记录的结果来保持一致性。使用幂等性键或验证现有结果以避免意外重复,确保顺利且可预测的工作流执行。

有关要避免的一些陷阱的示例,请参阅函数式 API 中的常见陷阱部分,该部分展示了如何使用**任务**构建代码以避免这些问题。相同的原则适用于 @[StateGraph(图 API)][StateGraph]。

有关要避免的一些陷阱的示例,请参阅函数式 API 中的常见陷阱部分,该部分展示了如何使用**任务**构建代码以避免这些问题。相同的原则适用于 @[StateGraph(图 API)][StateGraph]。

持久性模式

LangGraph 支持三种持久性模式,允许你根据应用程序的要求平衡性能和数据一致性。持久性模式从最不持久到最持久如下:

更高的持久性模式会为工作流执行增加更多开销。

在版本 0.6.0 中添加

使用 durability 参数而不是 checkpoint_during(在 v0.6.0 中弃用)进行持久化策略管理:

  • durability="async" 替换 checkpoint_during=True
  • durability="exit" 替换 checkpoint_during=False

用于持久化策略管理,具有以下映射:

  • checkpoint_during=True -> durability="async"
  • checkpoint_during=False -> durability="exit"

"exit"

仅当图执行完成(成功或出错)时才持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态不会被保存,因此你无法从执行中期失败中恢复或中断图执行。

"async"

在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但如果进程在执行期间崩溃,检查点可能不会被写入,存在小风险。

"sync"

在下一步开始之前同步持久化更改。这可确保在继续执行之前写入每个检查点,以一些性能开销为代价提供高持久性。

你可以在调用任何图执行方法时指定持久性模式:

graph.stream(
    {"input": "test"},
    durability="sync"
)

在节点中使用任务

如果节点包含多个操作,你可能会发现将每个操作转换为**任务**比将操作重构为单个节点更容易。

from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# Define a TypedDict to represent the state
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """Example node that makes an API request."""
    # highlight-next-line
    result = requests.get(state['url']).text[:100]  # Side-effect
    return {
        "result": result
    }

# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# Specify a checkpointer
checkpointer = InMemorySaver()

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests

# Define a TypedDict to represent the state
class State(TypedDict):
    urls: list[str]
    result: NotRequired[list[str]]


@task
def _make_request(url: str):
    """Make a request."""
    # highlight-next-line
    return requests.get(url).text[:100]

def call_api(state: State):
    """Example node that makes an API request."""
    # highlight-next-line
    requests = [_make_request(url) for url in state['urls']]
    results = [request.result() for request in requests]
    return {
        "results": results
    }

# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# Specify a checkpointer
checkpointer = InMemorySaver()

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph
graph.invoke({"urls": ["https://www.example.com"]}, config)
import { StateGraph, START, END } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";
import { z } from "zod";

// Define a Zod schema to represent the state
const State = z.object({
  url: z.string(),
  result: z.string().optional(),
});

const callApi = async (state: z.infer<typeof State>) => {
  // highlight-next-line
  const response = await fetch(state.url);
  const text = await response.text();
  const result = text.slice(0, 100); // Side-effect
  return {
    result,
  };
};

// Create a StateGraph builder and add a node for the callApi function
const builder = new StateGraph(State)
  .addNode("callApi", callApi)
  .addEdge(START, "callApi")
  .addEdge("callApi", END);

// Specify a checkpointer
const checkpointer = new MemorySaver();

// Compile the graph with the checkpointer
const graph = builder.compile({ checkpointer });

// Define a config with a thread ID.
const threadId = uuidv4();
const config = { configurable: { thread_id: threadId } };

// Invoke the graph
await graph.invoke({ url: "https://www.example.com" }, config);
import { StateGraph, START, END } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph";
import { task } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";
import { z } from "zod";

// Define a Zod schema to represent the state
const State = z.object({
  urls: z.array(z.string()),
  results: z.array(z.string()).optional(),
});

const makeRequest = task("makeRequest", async (url: string) => {
  // highlight-next-line
  const response = await fetch(url);
  const text = await response.text();
  return text.slice(0, 100);
});

const callApi = async (state: z.infer<typeof State>) => {
  // highlight-next-line
  const requests = state.urls.map((url) => makeRequest(url));
  const results = await Promise.all(requests);
  return {
    results,
  };
};

// Create a StateGraph builder and add a node for the callApi function
const builder = new StateGraph(State)
  .addNode("callApi", callApi)
  .addEdge(START, "callApi")
  .addEdge("callApi", END);

// Specify a checkpointer
const checkpointer = new MemorySaver();

// Compile the graph with the checkpointer
const graph = builder.compile({ checkpointer });

// Define a config with a thread ID.
const threadId = uuidv4();
const config = { configurable: { thread_id: threadId } };

// Invoke the graph
await graph.invoke({ urls: ["https://www.example.com"] }, config);

恢复工作流

一旦你在工作流中启用了持久化执行,你可以为以下场景恢复执行:

  • **暂停和恢复工作流:**使用 @interrupt 函数在特定点暂停工作流,并使用 @[Command] 原语使用更新的状态恢复它。有关更多详细信息,请参阅人机协同
  • **从失败中恢复:**在异常(例如 LLM 提供商中断)之后自动从最后一个成功的检查点恢复工作流。这涉及通过向其提供 None 作为输入值来使用相同的线程标识符执行工作流(参见使用函数式 API 的此示例)。
  • **暂停和恢复工作流:**使用 @interrupt 函数在特定点暂停工作流,并使用 @[Command] 原语使用更新的状态恢复它。有关更多详细信息,请参阅人机协同
  • **从失败中恢复:**在异常(例如 LLM 提供商中断)之后自动从最后一个成功的检查点恢复工作流。这涉及通过向其提供 null 作为输入值来使用相同的线程标识符执行工作流(参见使用函数式 API 的此示例)。

恢复工作流的起始点

  • 如果你使用 @[StateGraph(图 API)][StateGraph],起始点是执行停止的节点的开头。
  • 如果你在节点内进行子图调用,起始点将是调用被暂停的子图的**父**节点。 在子图内,起始点将是执行停止的特定节点
  • 如果你使用函数式 API,起始点是执行停止的入口点的开头。
  • 如果你使用StateGraph(图 API),起始点是执行停止的节点的开头。
  • 如果你在节点内进行子图调用,起始点将是调用被暂停的子图的**父**节点。 在子图内,起始点将是执行停止的特定节点
  • 如果你使用函数式 API,起始点是执行停止的入口点的开头。