跳转至

持久化

LangGraph 具有内置的持久化层,通过检查点器实现。当你使用检查点器编译图时,检查点器会在每个超步保存图状态的 checkpoint。这些检查点保存到 thread 中,可以在图执行后访问。因为 threads 允许在执行后访问图的状态,所以几个强大的功能,包括人机协同、记忆、时间旅行和容错,都变得可能。下面,我们将更详细地讨论这些概念。

Checkpoints

LangGraph API 自动处理检查点

使用 LangGraph API 时,你不需要手动实现或配置检查点器。API 会在幕后为你处理所有持久化基础设施。

线程

线程是分配给检查点器保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当运行被执行时,助手底层图的状态将被持久化到线程中。

使用检查点器调用图时,你**必须**在配置的 configurable 部分指定 thread_id

{"configurable": {"thread_id": "1"}}
{
  configurable: {
    thread_id: "1";
  }
}

可以检索线程的当前和历史状态。要持久化状态,必须在执行运行之前创建线程。LangGraph Platform API 提供了多个端点用于创建和管理线程和线程状态。有关更多详细信息,请参阅 API 参考

检查点

线程在特定时间点的状态称为检查点。检查点是在每个超步保存的图状态快照,由 StateSnapshot 对象表示,具有以下关键属性:

  • config:与此检查点关联的配置。
  • metadata:与此检查点关联的元数据。
  • values:此时间点状态通道的值。
  • next:图中下一个要执行的节点名称的元组。
  • tasks:包含有关下一个要执行的任务信息的 PregelTask 对象元组。如果该步骤之前已尝试过,它将包含错误信息。如果图从节点内部动态中断,任务将包含与中断相关的附加数据。

检查点被持久化,可以用于在稍后时间恢复线程的状态。

让我们看看当按如下方式调用简单图时保存了哪些检查点:

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
import { StateGraph, START, END, MemoryServer } from "@langchain/langgraph";
import { withLangGraph } from "@langchain/langgraph/zod";
import { z } from "zod";

const State = z.object({
  foo: z.string(),
  bar: withLangGraph(z.array(z.string()), {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [],
  }),
});

const workflow = new StateGraph(State)
  .addNode("nodeA", (state) => {
    return { foo: "a", bar: ["a"] };
  })
  .addNode("nodeB", (state) => {
    return { foo: "b", bar: ["b"] };
  })
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);
import { StateGraph, START, END, MemoryServer } from "@langchain/langgraph";
import { withLangGraph } from "@langchain/langgraph/zod";
import { z } from "zod";

const State = z.object({
  foo: z.string(),
  bar: withLangGraph(z.array(z.string()), {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [],
  }),
});

const workflow = new StateGraph(State)
  .addNode("nodeA", (state) => {
    return { foo: "a", bar: ["a"] };
  })
  .addNode("nodeB", (state) => {
    return { foo: "b", bar: ["b"] };
  })
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);

运行图后,我们期望看到正好 4 个检查点:

  • 空检查点,START 作为下一个要执行的节点
  • 包含用户输入 {'foo': '', 'bar': []} 的检查点,node_a 作为下一个要执行的节点
  • 包含 node_a 输出 {'foo': 'a', 'bar': ['a']} 的检查点,node_b 作为下一个要执行的节点
  • 包含 node_b 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,没有下一个要执行的节点

注意 bar 通道值包含两个节点的输出,因为我们为 bar 通道设置了归约器。

运行图后,我们期望看到正好 4 个检查点:

  • 空检查点,START 作为下一个要执行的节点
  • 包含用户输入 {'foo': '', 'bar': []} 的检查点,nodeA 作为下一个要执行的节点
  • 包含 nodeA 输出 {'foo': 'a', 'bar': ['a']} 的检查点,nodeB 作为下一个要执行的节点
  • 包含 nodeB 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,没有下一个要执行的节点

注意 bar 通道值包含两个节点的输出,因为我们为 bar 通道设置了归约器。

获取状态

与保存的图状态交互时,你**必须**指定线程标识符。你可以通过调用 graph.get_state(config) 来查看图的_最新_状态。这将返回一个 StateSnapshot 对象,对应于与配置中提供的线程 ID 关联的最新检查点,或者如果提供了检查点 ID,则对应于该线程的特定检查点。

# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)

与保存的图状态交互时,你**必须**指定线程标识符。你可以通过调用 graph.getState(config) 来查看图的_最新_状态。这将返回一个 StateSnapshot 对象,对应于与配置中提供的线程 ID 关联的最新检查点,或者如果提供了检查点 ID,则对应于该线程的特定检查点。

// get the latest state snapshot
const config = { configurable: { thread_id: "1" } };
await graph.getState(config);

// get a state snapshot for a specific checkpoint_id
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "1ef663ba-28fe-6528-8002-5a559208592c",
  },
};
await graph.getState(config);

在我们的示例中,get_state 的输出将如下所示:

StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

在我们的示例中,getState 的输出将如下所示:

StateSnapshot {
  values: { foo: 'b', bar: ['a', 'b'] },
  next: [],
  config: {
    configurable: {
      thread_id: '1',
      checkpoint_ns: '',
      checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c'
    }
  },
  metadata: {
    source: 'loop',
    writes: { nodeB: { foo: 'b', bar: ['b'] } },
    step: 2
  },
  createdAt: '2024-08-29T19:19:38.821749+00:00',
  parentConfig: {
    configurable: {
      thread_id: '1',
      checkpoint_ns: '',
      checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
    }
  },
  tasks: []
}

获取状态历史

你可以通过调用 graph.get_state_history(config) 获取给定线程的图执行完整历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最近的检查点/StateSnapshot 在列表的第一位。

config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))

你可以通过调用 graph.getStateHistory(config) 获取给定线程的图执行完整历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最近的检查点/StateSnapshot 在列表的第一位。

const config = { configurable: { thread_id: "1" } };
for await (const state of graph.getStateHistory(config)) {
  console.log(state);
}

在我们的示例中,get_state_history 的输出将如下所示:

[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]

在我们的示例中,getStateHistory 的输出将如下所示:

[
  StateSnapshot {
    values: { foo: 'b', bar: ['a', 'b'] },
    next: [],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c'
      }
    },
    metadata: {
      source: 'loop',
      writes: { nodeB: { foo: 'b', bar: ['b'] } },
      step: 2
    },
    createdAt: '2024-08-29T19:19:38.821749+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
      }
    },
    tasks: []
  },
  StateSnapshot {
    values: { foo: 'a', bar: ['a'] },
    next: ['nodeB'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
      }
    },
    metadata: {
      source: 'loop',
      writes: { nodeA: { foo: 'a', bar: ['a'] } },
      step: 1
    },
    createdAt: '2024-08-29T19:19:38.819946+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a'
      }
    },
    tasks: [
      PregelTask {
        id: '6fb7314f-f114-5413-a1f3-d37dfe98ff44',
        name: 'nodeB',
        error: null,
        interrupts: []
      }
    ]
  },
  StateSnapshot {
    values: { foo: '', bar: [] },
    next: ['node_a'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a'
      }
    },
    metadata: {
      source: 'loop',
      writes: null,
      step: 0
    },
    createdAt: '2024-08-29T19:19:38.817813+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f0-6c66-bfff-6723431e8481'
      }
    },
    tasks: [
      PregelTask {
        id: 'f1b14528-5ee5-579c-949b-23ef9bfbed58',
        name: 'node_a',
        error: null,
        interrupts: []
      }
    ]
  },
  StateSnapshot {
    values: { bar: [] },
    next: ['__start__'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f0-6c66-bfff-6723431e8481'
      }
    },
    metadata: {
      source: 'input',
      writes: { foo: '' },
      step: -1
    },
    createdAt: '2024-08-29T19:19:38.816205+00:00',
    parentConfig: null,
    tasks: [
      PregelTask {
        id: '6d27aa2e-d72b-5504-a36f-8620e54a76dd',
        name: '__start__',
        error: null,
        interrupts: []
      }
    ]
  }
]

State

重放

也可以回放之前的图执行。如果我们用 thread_idcheckpoint_idinvoke 图,那么我们将_重放_对应于 checkpoint_id 的检查点_之前_先前执行的步骤,并且只执行检查点_之后_的步骤。

  • thread_id 是线程的 ID。
  • checkpoint_id 是指线程内特定检查点的标识符。

调用图时,你必须将这些作为配置的 configurable 部分传递:

config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "0c62ca34-ac19-445d-bbb0-5b4984975b2a",
  },
};
await graph.invoke(null, config);

重要的是,LangGraph 知道特定步骤之前是否已执行。如果已执行,LangGraph 只是_重放_图中的该特定步骤,而不会重新执行该步骤,但这只适用于提供的 checkpoint_id _之前_的步骤。checkpoint_id _之后_的所有步骤都将被执行(即,新的分叉),即使它们之前已执行。请参阅此关于时间旅行的操作指南了解更多关于重放的信息

Replay

更新状态

除了从特定 checkpoints 重放图之外,我们还可以_编辑_图状态。我们使用 graph.update_state() 来完成此操作。此方法接受三个不同的参数:

除了从特定 checkpoints 重放图之外,我们还可以_编辑_图状态。我们使用 graph.updateState() 来完成此操作。此方法接受三个不同的参数:

config

配置应包含指定要更新哪个线程的 thread_id。当只传递 thread_id 时,我们更新(或分叉)当前状态。可选地,如果我们包含 checkpoint_id 字段,则我们分叉选定的检查点。

values

这些是将用于更新状态的值。请注意,此更新与来自节点的任何更新完全相同处理。这意味着这些值将传递给归约器函数,如果为图状态中的某些通道定义了归约器。这意味着 update_state 不会自动覆盖每个通道的通道值,而只覆盖没有归约器的通道。让我们通过一个例子来说明。

假设你使用以下模式定义了图的状态(参见上面的完整示例):

from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]
import { withLangGraph } from "@langchain/langgraph/zod";
import { z } from "zod";

const State = z.object({
  foo: z.number(),
  bar: withLangGraph(z.array(z.string()), {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [],
  }),
});

现在假设图的当前状态是

{"foo": 1, "bar": ["a"]}
{ foo: 1, bar: ["a"] }

如果你按如下方式更新状态:

graph.update_state(config, {"foo": 2, "bar": ["b"]})
await graph.updateState(config, { foo: 2, bar: ["b"] });

那么图的新状态将是:

{"foo": 2, "bar": ["a", "b"]}

foo 键(通道)被完全更改(因为该通道没有指定归约器,所以 update_state 会覆盖它)。但是,bar 键有一个指定的归约器,所以它将 "b" 附加到 bar 的状态。

{ foo: 2, bar: ["a", "b"] }

foo 键(通道)被完全更改(因为该通道没有指定归约器,所以 updateState 会覆盖它)。但是,bar 键有一个指定的归约器,所以它将 "b" 附加到 bar 的状态。

as_node

调用 update_state 时你可以选择指定的最后一件事是 as_node。如果你提供了它,更新将被应用为好像它来自节点 as_node。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果没有歧义)。这很重要的原因是下一个要执行的步骤取决于最后一个给出更新的节点,因此这可用于控制下一个执行哪个节点。请参阅此关于时间旅行的操作指南了解更多关于分叉状态的信息

调用 updateState 时你可以选择指定的最后一件事是 asNode。如果你提供了它,更新将被应用为好像它来自节点 asNode。如果未提供 asNode,它将被设置为最后一个更新状态的节点(如果没有歧义)。这很重要的原因是下一个要执行的步骤取决于最后一个给出更新的节点,因此这可用于控制下一个执行哪个节点。请参阅此关于时间旅行的操作指南了解更多关于分叉状态的信息

Update

记忆存储

Model of shared state

状态模式指定了在图执行时填充的一组键。如上所述,状态可以由检查点器在每个图步骤写入线程,从而实现状态持久化。

但是,如果我们想_跨线程_保留一些信息呢?考虑聊天机器人的情况,我们想在该用户的_所有_聊天对话(即线程)中保留关于用户的特定信息!

仅使用检查点器,我们无法跨线程共享信息。这促使需要 Store 接口。作为说明,我们可以定义一个 InMemoryStore 来跨线程存储关于用户的信息。我们只需像以前一样用检查点器编译我们的图,并使用我们的新 in_memory_store 变量。

LangGraph API 自动处理存储

使用 LangGraph API 时,你不需要手动实现或配置存储。API 会在幕后为你处理所有存储基础设施。

基本用法

首先,让我们在不使用 LangGraph 的情况下单独展示这一点。

from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()
import { MemoryStore } from "@langchain/langgraph";

const memoryStore = new MemoryStore();

记忆通过 tuple 进行命名空间划分,在这个特定示例中将是 (<user_id>, "memories")。命名空间可以是任意长度并表示任何内容,不必是用户特定的。

user_id = "1"
namespace_for_memory = (user_id, "memories")
const userId = "1";
const namespaceForMemory = [userId, "memories"];

我们使用 store.put 方法将记忆保存到存储中的命名空间。当我们这样做时,我们指定如上定义的命名空间,以及记忆的键值对:键只是记忆的唯一标识符(memory_id),值(一个字典)是记忆本身。

memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
import { v4 as uuidv4 } from "uuid";

const memoryId = uuidv4();
const memory = { food_preference: "I like pizza" };
await memoryStore.put(namespaceForMemory, memoryId, memory);

我们可以使用 store.search 方法读取命名空间中的记忆,这将返回给定用户的所有记忆作为列表。最近的记忆是列表中的最后一个。

memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

每种记忆类型都是一个 Python 类(Item),具有某些属性。我们可以通过如上所示的 .dict 将其转换为字典来访问它。

它具有的属性是:

  • value:此记忆的值(本身是一个字典)
  • key:此命名空间中此记忆的唯一键
  • namespace:字符串列表,此记忆类型的命名空间
  • created_at:此记忆创建时的时间戳
  • updated_at:此记忆更新时的时间戳
const memories = await memoryStore.search(namespaceForMemory);
memories[memories.length - 1];

// {
//   value: { food_preference: 'I like pizza' },
//   key: '07e0caf4-1631-47b7-b15f-65515d4c1843',
//   namespace: ['1', 'memories'],
//   createdAt: '2024-10-02T17:22:31.590602+00:00',
//   updatedAt: '2024-10-02T17:22:31.590605+00:00'
// }

它具有的属性是:

  • value:此记忆的值
  • key:此命名空间中此记忆的唯一键
  • namespace:字符串列表,此记忆类型的命名空间
  • createdAt:此记忆创建时的时间戳
  • updatedAt:此记忆更新时的时间戳

语义搜索

除了简单检索之外,存储还支持语义搜索,允许你根据含义而不是精确匹配来查找记忆。要启用此功能,请使用嵌入模型配置存储:

from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)
import { OpenAIEmbeddings } from "@langchain/openai";

const store = new InMemoryStore({
  index: {
    embeddings: new OpenAIEmbeddings({ model: "text-embedding-3-small" }),
    dims: 1536,
    fields: ["food_preference", "$"], // Fields to embed
  },
});

现在搜索时,你可以使用自然语言查询来查找相关记忆:

# Find memories about food preferences
# (This can be done after putting memories into the store)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # Return top 3 matches
)
// Find memories about food preferences
// (This can be done after putting memories into the store)
const memories = await store.search(namespaceForMemory, {
  query: "What does the user like to eat?",
  limit: 3, // Return top 3 matches
});

你可以通过配置 fields 参数或在存储记忆时指定 index 参数来控制哪些部分的记忆被嵌入:

# Store with specific fields to embed
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # Only embed "food_preferences" field
)

# Store without embedding (still retrievable, but not searchable)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)
// Store with specific fields to embed
await store.put(
  namespaceForMemory,
  uuidv4(),
  {
    food_preference: "I love Italian cuisine",
    context: "Discussing dinner plans",
  },
  { index: ["food_preference"] } // Only embed "food_preferences" field
);

// Store without embedding (still retrievable, but not searchable)
await store.put(
  namespaceForMemory,
  uuidv4(),
  { system_info: "Last updated: 2024-01-01" },
  { index: false }
);

在 LangGraph 中使用

有了这些准备,我们在 LangGraph 中使用 in_memory_storein_memory_store 与检查点器协同工作:检查点器将状态保存到线程,如上所述,而 in_memory_store 允许我们存储任意信息以便_跨_线程访问。我们按如下方式用检查点器和 in_memory_store 编译图。

from langgraph.checkpoint.memory import InMemorySaver

# We need this because we want to enable threads (conversations)
checkpointer = InMemorySaver()

# ... Define the graph ...

# Compile the graph with the checkpointer and store
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

有了这些准备,我们在 LangGraph 中使用 memoryStorememoryStore 与检查点器协同工作:检查点器将状态保存到线程,如上所述,而 memoryStore 允许我们存储任意信息以便_跨_线程访问。我们按如下方式用检查点器和 memoryStore 编译图。

import { MemorySaver } from "@langchain/langgraph";

// We need this because we want to enable threads (conversations)
const checkpointer = new MemorySaver();

// ... Define the graph ...

// Compile the graph with the checkpointer and store
const graph = workflow.compile({ checkpointer, store: memoryStore });

我们用 thread_id 调用图,像以前一样,还有 user_id,我们将用它来为这个特定用户的记忆进行命名空间划分,如上所示。

# Invoke the graph
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# First let's just say hi to the AI
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)
// Invoke the graph
const userId = "1";
const config = { configurable: { thread_id: "1", user_id: userId } };

// First let's just say hi to the AI
for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}

我们可以通过将 store: BaseStoreconfig: RunnableConfig 作为节点参数传递来在_任何节点_中访问 in_memory_storeuser_id。以下是如何在节点中使用语义搜索来查找相关记忆:

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # ... Analyze conversation and create a new memory

    # Create a new memory ID
    memory_id = str(uuid.uuid4())

    # We create a new memory
    store.put(namespace, memory_id, {"memory": memory})

我们可以通过访问 configstore 作为节点参数来在_任何节点_中访问 memoryStoreuser_id。以下是如何在节点中使用语义搜索来查找相关记忆:

import {
  LangGraphRunnableConfig,
  BaseStore,
  MessagesZodState,
} from "@langchain/langgraph";
import { z } from "zod";

const updateMemory = async (
  state: z.infer<typeof MessagesZodState>,
  config: LangGraphRunnableConfig,
  store: BaseStore
) => {
  // Get the user id from the config
  const userId = config.configurable?.user_id;

  // Namespace the memory
  const namespace = [userId, "memories"];

  // ... Analyze conversation and create a new memory

  // Create a new memory ID
  const memoryId = uuidv4();

  // We create a new memory
  await store.put(namespace, memoryId, { memory });
};

如上所示,我们还可以在任何节点中访问存储并使用 store.search 方法获取记忆。回想一下,记忆作为可以转换为字典的对象列表返回。

memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
memories[memories.length - 1];
// {
//   value: { food_preference: 'I like pizza' },
//   key: '07e0caf4-1631-47b7-b15f-65515d4c1843',
//   namespace: ['1', 'memories'],
//   createdAt: '2024-10-02T17:22:31.590602+00:00',
//   updatedAt: '2024-10-02T17:22:31.590605+00:00'
// }

我们可以访问记忆并在模型调用中使用它们。

def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # Search based on the most recent message
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... Use memories in the model call
const callModel = async (
  state: z.infer<typeof MessagesZodState>,
  config: LangGraphRunnableConfig,
  store: BaseStore
) => {
  // Get the user id from the config
  const userId = config.configurable?.user_id;

  // Namespace the memory
  const namespace = [userId, "memories"];

  // Search based on the most recent message
  const memories = await store.search(namespace, {
    query: state.messages[state.messages.length - 1].content,
    limit: 3,
  });
  const info = memories.map((d) => d.value.memory).join("\n");

  // ... Use memories in the model call
};

如果我们创建一个新线程,只要 user_id 相同,我们仍然可以访问相同的记忆。

# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# Let's say hi again
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)
// Invoke the graph
const config = { configurable: { thread_id: "2", user_id: "1" } };

// Let's say hi again
for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi, tell me about my memories" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}

当我们使用 LangGraph Platform 时,无论是本地(例如,在 LangGraph Studio 中)还是使用 LangGraph Platform,基础存储默认可用,不需要在图编译期间指定。但是,要启用语义搜索,你**确实**需要在 langgraph.json 文件中配置索引设置。例如:

{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}

有关更多详细信息和配置选项,请参阅部署指南

检查点器库

在底层,检查点由符合 @[BaseCheckpointSaver] 接口的检查点器对象提供支持。LangGraph 提供了几种检查点器实现,全部通过独立的可安装库实现:

  • langgraph-checkpoint:检查点器保存器(@[BaseCheckpointSaver])和序列化/反序列化接口(@[SerializerProtocol][])的基本接口。包括用于实验的内存检查点器实现(@[InMemorySaver][])。LangGraph 自带 langgraph-checkpoint
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库的 LangGraph 检查点器实现(@[SqliteSaver][] / @[AsyncSqliteSaver])。适合实验和本地工作流。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库的高级检查点器(@[PostgresSaver][] / @[AsyncPostgresSaver]),在 LangGraph Platform 中使用。适合在生产中使用。需要单独安装。
  • @langchain/langgraph-checkpoint:检查点器保存器(@[BaseCheckpointSaver][])和序列化/反序列化接口(@[SerializerProtocol][])的基本接口。包括用于实验的内存检查点器实现(@[MemorySaver])。LangGraph 自带 @langchain/langgraph-checkpoint
  • @langchain/langgraph-checkpoint-sqlite:使用 SQLite 数据库的 LangGraph 检查点器实现(@[SqliteSaver])。适合实验和本地工作流。需要单独安装。
  • @langchain/langgraph-checkpoint-postgres:使用 Postgres 数据库的高级检查点器(@[PostgresSaver]),在 LangGraph Platform 中使用。适合在生产中使用。需要单独安装。

检查点器接口

每个检查点器都符合 @[BaseCheckpointSaver] 接口并实现以下方法:

  • .put - 存储检查点及其配置和元数据。
  • .put_writes - 存储链接到检查点的中间写入(即待处理写入)。
  • .get_tuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.get_state() 中填充 StateSnapshot
  • .list - 列出匹配给定配置和过滤条件的检查点。这用于在 graph.get_state_history() 中填充状态历史

如果检查点器与异步图执行一起使用(即通过 .ainvoke.astream.abatch 执行图),将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。

Note

要异步运行图,你可以使用 InMemorySaver,或 Sqlite/Postgres 检查点器的异步版本——AsyncSqliteSaver / AsyncPostgresSaver 检查点器。

每个检查点器都符合 @[BaseCheckpointSaver][] 接口并实现以下方法:

  • .put - 存储检查点及其配置和元数据。
  • .putWrites - 存储链接到检查点的中间写入(即待处理写入)。
  • .getTuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.getState() 中填充 StateSnapshot
  • .list - 列出匹配给定配置和过滤条件的检查点。这用于在 graph.getStateHistory() 中填充状态历史

序列化器

当检查点器保存图状态时,它们需要序列化状态中的通道值。这通过序列化器对象完成。

langgraph_checkpoint 定义了实现序列化器的@[协议][SerializerProtocol]并提供了一个默认实现(@[JsonPlusSerializer][]),它处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

使用 pickle 序列化

默认序列化器 @[JsonPlusSerializer][] 在底层使用 ormsgpack 和 JSON,这不适合所有类型的对象。

如果你想为当前不受我们的 msgpack 编码器支持的对象(如 Pandas 数据框)回退到 pickle, 你可以使用 JsonPlusSerializerpickle_fallback 参数:

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... Define the graph ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

加密

检查点器可以选择加密所有持久化状态。要启用此功能,请将 @[EncryptedSerializer][] 的实例传递给任何 BaseCheckpointSaver 实现的 serde 参数。创建加密序列化器的最简单方法是通过 @[from_pycryptodome_aes][],它从 LANGGRAPH_AES_KEY 环境变量读取 AES 密钥(或接受 key 参数):

import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()

在 LangGraph Platform 上运行时,只要存在 LANGGRAPH_AES_KEY,加密就会自动启用,所以你只需要提供环境变量。其他加密方案可以通过实现 @[CipherProtocol][] 并将其提供给 EncryptedSerializer 来使用。

@langchain/langgraph-checkpoint 定义了实现序列化器的协议并提供了一个默认实现,它处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

功能

人机协同

首先,检查点器通过允许人类检查、中断和批准图步骤来促进人机协同工作流。检查点器对于这些工作流是必需的,因为人类必须能够在任何时间点查看图的状态,并且图必须能够在人类对状态进行任何更新后恢复执行。有关示例,请参阅操作指南

记忆

其次,检查点器允许交互之间的"记忆"。在重复人类交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留对先前消息的记忆。有关如何使用检查点器添加和管理对话记忆的信息,请参阅添加记忆

时间旅行

第三,检查点器允许"时间旅行",允许用户重放先前的图执行以审查和/或调试特定图步骤。此外,检查点器使得可以在任意检查点分叉图状态以探索替代轨迹。

容错

最后,检查点提供容错和错误恢复:如果一个或多个节点在给定超步失败,你可以从最后一个成功的步骤重新启动图。此外,当图节点在给定超步执行中途失败时,LangGraph 会存储在该超步成功完成的任何其他节点的待处理检查点写入,以便每当我们从该超步恢复图执行时,我们不会重新运行成功的节点。

待处理写入

此外,当图节点在给定超步执行中途失败时,LangGraph 会存储在该超步成功完成的任何其他节点的待处理检查点写入,以便每当我们从该超步恢复图执行时,我们不会重新运行成功的节点。