跳转至

使用函数式 API

函数式 API 允许你以最小的代码修改将 LangGraph 的核心功能——持久化记忆人机协同流式传输——添加到你的应用中。

Tip

有关函数式 API 的概念信息,请参阅 函数式 API

创建简单工作流

定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用字典。

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = inputs["value"]
    another_value = inputs["another_value"]
    ...

my_workflow.invoke({"value": 1, "another_value": 2})
const checkpointer = new MemorySaver();

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { value: number; anotherValue: number }) => {
    const value = inputs.value;
    const anotherValue = inputs.anotherValue;
    // ...
  }
);

await myWorkflow.invoke({ value: 1, anotherValue: 2 });
扩展示例:简单工作流
import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 检查数字是否为偶数的任务
@task
def is_even(number: int) -> bool:
    return number % 2 == 0

# 格式化消息的任务
@task
def format_message(is_even: bool) -> str:
    return "The number is even." if is_even else "The number is odd."

# 创建用于持久化的检查点器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
    """简单的数字分类工作流。"""
    even = is_even(inputs["number"]).result()
    return format_message(even).result()

# 使用唯一的线程 ID 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
import { v4 as uuidv4 } from "uuid";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// 检查数字是否为偶数的任务
const isEven = task("isEven", async (number: number) => {
  return number % 2 === 0;
});

// 格式化消息的任务
const formatMessage = task("formatMessage", async (isEven: boolean) => {
  return isEven ? "The number is even." : "The number is odd.";
});

// 创建用于持久化的检查点器
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (inputs: { number: number }) => {
    // 简单的数字分类工作流
    const even = await isEven(inputs.number);
    return await formatMessage(even);
  }
);

// 使用唯一的线程 ID 运行工作流
const config = { configurable: { thread_id: uuidv4() } };
const result = await workflow.invoke({ number: 7 }, config);
console.log(result);
扩展示例:使用 LLM 撰写文章

此示例演示如何在语法上使用 @task@entrypoint 装饰器。由于提供了检查点器,工作流结果将被持久化到检查点器中。

import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

llm = init_chat_model('openai:gpt-3.5-turbo')

# 任务:使用 LLM 生成文章
@task
def compose_essay(topic: str) -> str:
    """生成关于给定主题的文章。"""
    return llm.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes essays."},
        {"role": "user", "content": f"Write an essay about {topic}."}
    ]).content

# 创建用于持久化的检查点器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
    """使用 LLM 生成文章的简单工作流。"""
    return compose_essay(topic).result()

# 执行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke("the history of flight", config=config)
print(result)
import { v4 as uuidv4 } from "uuid";
import { ChatOpenAI } from "@langchain/openai";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

const llm = new ChatOpenAI({ model: "gpt-3.5-turbo" });

// 任务:使用 LLM 生成文章
const composeEssay = task("composeEssay", async (topic: string) => {
  // 生成关于给定主题的文章
  const response = await llm.invoke([
    { role: "system", content: "You are a helpful assistant that writes essays." },
    { role: "user", content: `Write an essay about ${topic}.` }
  ]);
  return response.content as string;
});

// 创建用于持久化的检查点器
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (topic: string) => {
    // 使用 LLM 生成文章的简单工作流
    return await composeEssay(topic);
  }
);

// 执行工作流
const config = { configurable: { thread_id: uuidv4() } };
const result = await workflow.invoke("the history of flight", config);
console.log(result);

并行执行

通过并发调用任务并等待结果,可以实现任务的并行执行。这对于提高 IO 密集型任务(例如调用 LLM 的 API)的性能非常有用。

@task
def add_one(number: int) -> int:
    return number + 1

@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
    futures = [add_one(i) for i in numbers]
    return [f.result() for f in futures]
const addOne = task("addOne", async (number: number) => {
  return number + 1;
});

const graph = entrypoint(
  { checkpointer, name: "graph" },
  async (numbers: number[]) => {
    return await Promise.all(numbers.map(addOne));
  }
);
扩展示例:并行 LLM 调用

此示例演示如何使用 @task 并行运行多个 LLM 调用。每个调用生成一个关于不同主题的段落,结果合并为单个文本输出。

import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 初始化 LLM 模型
llm = init_chat_model("openai:gpt-3.5-turbo")

# 生成关于给定主题段落的任务
@task
def generate_paragraph(topic: str) -> str:
    response = llm.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
        {"role": "user", "content": f"Write a paragraph about {topic}."}
    ])
    return response.content

# 创建用于持久化的检查点器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
    """并行生成多个段落并合并它们。"""
    futures = [generate_paragraph(topic) for topic in topics]
    paragraphs = [f.result() for f in futures]
    return "\n\n".join(paragraphs)

# 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
import { v4 as uuidv4 } from "uuid";
import { ChatOpenAI } from "@langchain/openai";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// 初始化 LLM 模型
const llm = new ChatOpenAI({ model: "gpt-3.5-turbo" });

// 生成关于给定主题段落的任务
const generateParagraph = task("generateParagraph", async (topic: string) => {
  const response = await llm.invoke([
    { role: "system", content: "You are a helpful assistant that writes educational paragraphs." },
    { role: "user", content: `Write a paragraph about ${topic}.` }
  ]);
  return response.content as string;
});

// 创建用于持久化的检查点器
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (topics: string[]) => {
    // 并行生成多个段落并合并它们
    const paragraphs = await Promise.all(topics.map(generateParagraph));
    return paragraphs.join("\n\n");
  }
);

// 运行工作流
const config = { configurable: { thread_id: uuidv4() } };
const result = await workflow.invoke(["quantum computing", "climate change", "history of aviation"], config);
console.log(result);

此示例使用 LangGraph 的并发模型来改善执行时间,特别是当任务涉及 LLM 调用等 I/O 操作时。

调用图

函数式 API图 API 可以在同一个应用中一起使用,因为它们共享相同的底层运行时。

from langgraph.func import entrypoint
from langgraph.graph import StateGraph

builder = StateGraph()
...
some_graph = builder.compile()

@entrypoint()
def some_workflow(some_input: dict) -> int:
    # 调用使用图 API 定义的图
    result_1 = some_graph.invoke(...)
    # 调用另一个使用图 API 定义的图
    result_2 = another_graph.invoke(...)
    return {
        "result_1": result_1,
        "result_2": result_2
    }
import { entrypoint } from "@langchain/langgraph";
import { StateGraph } from "@langchain/langgraph";

const builder = new StateGraph(/* ... */);
// ...
const someGraph = builder.compile();

const someWorkflow = entrypoint(
  { name: "someWorkflow" },
  async (someInput: Record<string, any>) => {
    // 调用使用图 API 定义的图
    const result1 = await someGraph.invoke(/* ... */);
    // 调用另一个使用图 API 定义的图
    const result2 = await anotherGraph.invoke(/* ... */);
    return {
      result1,
      result2,
    };
  }
);
扩展示例:从函数式 API 调用简单图
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph

# 定义共享状态类型
class State(TypedDict):
    foo: int

# 定义简单的转换节点
def double(state: State) -> State:
    return {"foo": state["foo"] * 2}

# 使用图 API 构建图
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()

# 定义函数式 API 工作流
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
    result = graph.invoke({"foo": x})
    return {"bar": result["foo"]}

# 执行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(workflow.invoke(5, config=config))  # 输出: {'bar': 10}
import { v4 as uuidv4 } from "uuid";
import { entrypoint, MemorySaver } from "@langchain/langgraph";
import { StateGraph } from "@langchain/langgraph";
import { z } from "zod";

// 定义共享状态类型
const State = z.object({
  foo: z.number(),
});

// 使用图 API 构建图
const builder = new StateGraph(State)
  .addNode("double", (state) => {
    return { foo: state.foo * 2 };
  })
  .addEdge("__start__", "double");
const graph = builder.compile();

// 定义函数式 API 工作流
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (x: number) => {
    const result = await graph.invoke({ foo: x });
    return { bar: result.foo };
  }
);

// 执行工作流
const config = { configurable: { thread_id: uuidv4() } };
console.log(await workflow.invoke(5, config)); // 输出: { bar: 10 }

调用其他入口点

你可以在 entrypointtask 中调用其他 entrypoint

@entrypoint() # 将自动使用父入口点的检查点器
def some_other_workflow(inputs: dict) -> int:
    return inputs["value"]

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = some_other_workflow.invoke({"value": 1})
    return value
// 将自动使用父入口点的检查点器
const someOtherWorkflow = entrypoint(
  { name: "someOtherWorkflow" },
  async (inputs: { value: number }) => {
    return inputs.value;
  }
);

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { value: number }) => {
    const value = await someOtherWorkflow.invoke({ value: 1 });
    return value;
  }
);
扩展示例:调用另一个入口点
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

# 初始化检查点器
checkpointer = InMemorySaver()

# 可复用的子工作流,用于乘法运算
@entrypoint()
def multiply(inputs: dict) -> int:
    return inputs["a"] * inputs["b"]

# 调用子工作流的主工作流
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
    result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
    return {"product": result}

# 执行主工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(main.invoke({"x": 6, "y": 7}, config=config))  # 输出: {'product': 42}
import { v4 as uuidv4 } from "uuid";
import { entrypoint, MemorySaver } from "@langchain/langgraph";

// 初始化检查点器
const checkpointer = new MemorySaver();

// 可复用的子工作流,用于乘法运算
const multiply = entrypoint(
  { name: "multiply" },
  async (inputs: { a: number; b: number }) => {
    return inputs.a * inputs.b;
  }
);

// 调用子工作流的主工作流
const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: { x: number; y: number }) => {
    const result = await multiply.invoke({ a: inputs.x, b: inputs.y });
    return { product: result };
  }
);

// 执行主工作流
const config = { configurable: { thread_id: uuidv4() } };
console.log(await main.invoke({ x: 6, y: 7 }, config)); // 输出: { product: 42 }

流式传输

函数式 API 使用与 图 API 相同的流式传输机制。请阅读 流式传输指南 部分了解更多详情。

以下是使用流式传输 API 同时流式传输更新和自定义数据的示例。

from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer # (1)!

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
    writer = get_stream_writer() # (2)!
    writer("Started processing") # (3)!
    result = inputs["x"] * 2
    writer(f"Result is {result}") # (4)!
    return result

config = {"configurable": {"thread_id": "abc"}}

# highlight-next-line
for mode, chunk in main.stream( # (5)!
    {"x": 5},
    stream_mode=["custom", "updates"], # (6)!
    config=config
):
    print(f"{mode}: {chunk}")
  1. langgraph.config 导入 get_stream_writer
  2. 在入口点中获取流写入器实例。
  3. 在计算开始前发出自定义数据。
  4. 在计算结果后发出另一条自定义消息。
  5. 使用 .stream() 处理流式输出。
  6. 指定要使用的流式传输模式。
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})

Python < 3.11 的异步使用

如果使用 Python < 3.11 并编写异步代码,使用 get_stream_writer() 将无法正常工作。请直接使用 StreamWriter 类。更多详情请参阅 Python < 3.11 的异步使用

from langgraph.types import StreamWriter

@entrypoint(checkpointer=checkpointer)
# highlight-next-line
async def main(inputs: dict, writer: StreamWriter) -> int:
    ...
import {
  entrypoint,
  MemorySaver,
  LangGraphRunnableConfig,
} from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const main = entrypoint(
  { checkpointer, name: "main" },
  async (
    inputs: { x: number },
    config: LangGraphRunnableConfig
  ): Promise<number> => {
    config.writer?.("Started processing"); // (1)!
    const result = inputs.x * 2;
    config.writer?.(`Result is ${result}`); // (2)!
    return result;
  }
);

const config = { configurable: { thread_id: "abc" } };

// (3)!
for await (const [mode, chunk] of await main.stream(
  { x: 5 },
  { streamMode: ["custom", "updates"], ...config } // (4)!
)) {
  console.log(`${mode}: ${JSON.stringify(chunk)}`);
}
  1. 在计算开始前发出自定义数据。
  2. 在计算结果后发出另一条自定义消息。
  3. 使用 .stream() 处理流式输出。
  4. 指定要使用的流式传输模式。
updates: {"addOne": 2}
updates: {"addTwo": 3}
custom: "hello"
custom: "world"
updates: {"main": 5}

重试策略

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy

# 此变量仅用于演示目的,模拟网络故障。
# 这不是你在实际代码中会有的内容。
attempts = 0

# 让我们配置 RetryPolicy 以在 ValueError 时重试。
# 默认的 RetryPolicy 针对特定网络错误进行了优化。
retry_policy = RetryPolicy(retry_on=ValueError)

@task(retry_policy=retry_policy)
def get_info():
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError('Failure')
    return "OK"

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
    return get_info().result()

config = {
    "configurable": {
        "thread_id": "1"
    }
}

main.invoke({'any_input': 'foobar'}, config=config)
'OK'
import {
  MemorySaver,
  entrypoint,
  task,
  RetryPolicy,
} from "@langchain/langgraph";

// 此变量仅用于演示目的,模拟网络故障。
// 这不是你在实际代码中会有的内容。
let attempts = 0;

// 让我们配置 RetryPolicy 以在 ValueError 时重试。
// 默认的 RetryPolicy 针对特定网络错误进行了优化。
const retryPolicy: RetryPolicy = { retryOn: (error) => error instanceof Error };

const getInfo = task(
  {
    name: "getInfo",
    retry: retryPolicy,
  },
  () => {
    attempts += 1;

    if (attempts < 2) {
      throw new Error("Failure");
    }
    return "OK";
  }
);

const checkpointer = new MemorySaver();

const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: Record<string, any>) => {
    return await getInfo();
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await main.invoke({ any_input: "foobar" }, config);
'OK'

缓存任务

import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy


@task(cache_policy=CachePolicy(ttl=120))  # (1)!
def slow_add(x: int) -> int:
    time.sleep(1)
    return x * 2


@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
    result1 = slow_add(inputs["x"]).result()
    result2 = slow_add(inputs["x"]).result()
    return {"result1": result1, "result2": result2}


for chunk in main.stream({"x": 5}, stream_mode="updates"):
    print(chunk)

#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
  1. ttl 以秒为单位指定。缓存将在此时间后失效。
import {
  InMemoryCache,
  entrypoint,
  task,
  CachePolicy,
} from "@langchain/langgraph";

const slowAdd = task(
  {
    name: "slowAdd",
    cache: { ttl: 120 }, // (1)!
  },
  async (x: number) => {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    return x * 2;
  }
);

const main = entrypoint(
  { cache: new InMemoryCache(), name: "main" },
  async (inputs: { x: number }) => {
    const result1 = await slowAdd(inputs.x);
    const result2 = await slowAdd(inputs.x);
    return { result1, result2 };
  }
);

for await (const chunk of await main.stream(
  { x: 5 },
  { streamMode: "updates" }
)) {
  console.log(chunk);
}

//> { slowAdd: 10 }
//> { slowAdd: 10, '__metadata__': { cached: true } }
//> { main: { result1: 10, result2: 10 } }
  1. ttl 以秒为单位指定。缓存将在此时间后失效。

错误后恢复

import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter

# 此变量仅用于演示目的,模拟网络故障。
# 这不是你在实际代码中会有的内容。
attempts = 0

@task()
def get_info():
    """
    模拟一个在成功之前失败一次的任务。
    在第一次尝试时抛出异常,然后在后续尝试中返回 "OK"。
    """
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError("Failure")  # 模拟第一次尝试失败
    return "OK"

# 初始化用于持久化的内存检查点器
checkpointer = InMemorySaver()

@task
def slow_task():
    """
    通过引入 1 秒延迟来模拟慢速任务。
    """
    time.sleep(1)
    return "Ran slow task."

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
    """
    按顺序运行 slow_task 和 get_info 任务的主工作流函数。

    参数:
    - inputs: 包含工作流输入值的字典。
    - writer: 用于流式传输自定义数据的 StreamWriter。

    工作流首先执行 `slow_task`,然后尝试执行 `get_info`,
    后者将在第一次调用时失败。
    """
    slow_task_result = slow_task().result()  # 阻塞调用 slow_task
    get_info().result()  # 第一次尝试时将在此处抛出异常
    return slow_task_result

# 带有唯一线程标识符的工作流执行配置
config = {
    "configurable": {
        "thread_id": "1"  # 用于跟踪工作流执行的唯一标识符
    }
}

# 由于 slow_task 执行,此调用将花费约 1 秒
try:
    # 第一次调用将因 `get_info` 任务失败而抛出异常
    main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
    pass  # 优雅地处理失败

当我们恢复执行时,不需要重新运行 slow_task,因为其结果已保存在检查点中。

main.invoke(None, config=config)
'Ran slow task.'
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// 此变量仅用于演示目的,模拟网络故障。
// 这不是你在实际代码中会有的内容。
let attempts = 0;

const getInfo = task("getInfo", async () => {
  /**
   * 模拟一个在成功之前失败一次的任务。
   * 在第一次尝试时抛出异常,然后在后续尝试中返回 "OK"。
   */
  attempts += 1;

  if (attempts < 2) {
    throw new Error("Failure"); // 模拟第一次尝试失败
  }
  return "OK";
});

// 初始化用于持久化的内存检查点器
const checkpointer = new MemorySaver();

const slowTask = task("slowTask", async () => {
  /**
   * 通过引入 1 秒延迟来模拟慢速任务。
   */
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return "Ran slow task.";
});

const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: Record<string, any>) => {
    /**
     * 按顺序运行 slowTask 和 getInfo 任务的主工作流函数。
     *
     * 参数:
     * - inputs: Record<string, any> 包含工作流输入值。
     *
     * 工作流首先执行 `slowTask`,然后尝试执行 `getInfo`,
     * 后者将在第一次调用时失败。
     */
    const slowTaskResult = await slowTask(); // 阻塞调用 slowTask
    await getInfo(); // 第一次尝试时将在此处抛出异常
    return slowTaskResult;
  }
);

// 带有唯一线程标识符的工作流执行配置
const config = {
  configurable: {
    thread_id: "1", // 用于跟踪工作流执行的唯一标识符
  },
};

// 由于 slowTask 执行,此调用将花费约 1 秒
try {
  // 第一次调用将因 `getInfo` 任务失败而抛出异常
  await main.invoke({ any_input: "foobar" }, config);
} catch (err) {
  // 优雅地处理失败
}

当我们恢复执行时,不需要重新运行 slowTask,因为其结果已保存在检查点中。

await main.invoke(null, config);
'Ran slow task.'

人机协同

函数式 API 使用 interrupt 函数和 Command 原语支持 人机协同 工作流。

基本人机协同工作流

我们将创建三个 任务

  1. 追加 "bar"
  2. 暂停等待人工输入。恢复时,追加人工输入。
  3. 追加 "qux"
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """追加 bar。"""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """追加用户输入。"""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """追加 qux。"""
    return f"{input_query} qux"
import { entrypoint, task, interrupt, Command } from "@langchain/langgraph";

const step1 = task("step1", async (inputQuery: string) => {
  // 追加 bar
  return `${inputQuery} bar`;
});

const humanFeedback = task("humanFeedback", async (inputQuery: string) => {
  // 追加用户输入
  const feedback = interrupt(`Please provide feedback: ${inputQuery}`);
  return `${inputQuery} ${feedback}`;
});

const step3 = task("step3", async (inputQuery: string) => {
  // 追加 qux
  return `${inputQuery} qux`;
});

我们现在可以在 入口点 中组合这些任务:

from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3
import { MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const graph = entrypoint(
  { checkpointer, name: "graph" },
  async (inputQuery: string) => {
    const result1 = await step1(inputQuery);
    const result2 = await humanFeedback(result1);
    const result3 = await step3(result2);

    return result3;
  }
);

interrupt() 在任务中被调用,使人工能够审查和编辑前一个任务的输出。先前任务的结果——在本例中是 step_1——会被持久化,因此在 interrupt 之后不会再次运行。

让我们发送一个查询字符串:

config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
const config = { configurable: { thread_id: "1" } };

for await (const event of await graph.stream("foo", config)) {
  console.log(event);
  console.log("\n");
}

注意我们在 step_1 之后通过 interrupt 暂停了。中断提供了恢复运行的说明。要恢复,我们发出一个包含 human_feedback 任务所期望数据的 Command

# 继续执行
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
// 继续执行
for await (const event of await graph.stream(
  new Command({ resume: "baz" }),
  config
)) {
  console.log(event);
  console.log("\n");
}

恢复后,运行将继续执行剩余步骤并按预期终止。

审查工具调用

要在执行前审查工具调用,我们添加一个调用 interruptreview_tool_call 函数。当调用此函数时,执行将暂停,直到我们发出恢复命令。

给定一个工具调用,我们的函数将 interrupt 以供人工审查。此时我们可以:

  • 接受工具调用
  • 修改工具调用并继续
  • 生成自定义工具消息(例如,指示模型重新格式化其工具调用)
from typing import Union

def review_tool_call(tool_call: ToolCall) -> Union[ToolCall, ToolMessage]:
    """审查工具调用,返回验证后的版本。"""
    human_review = interrupt(
        {
            "question": "Is this correct?",
            "tool_call": tool_call,
        }
    )
    review_action = human_review["action"]
    review_data = human_review.get("data")
    if review_action == "continue":
        return tool_call
    elif review_action == "update":
        updated_tool_call = {**tool_call, **{"args": review_data}}
        return updated_tool_call
    elif review_action == "feedback":
        return ToolMessage(
            content=review_data, name=tool_call["name"], tool_call_id=tool_call["id"]
        )
import { ToolCall } from "@langchain/core/messages/tool";
import { ToolMessage } from "@langchain/core/messages";

function reviewToolCall(toolCall: ToolCall): ToolCall | ToolMessage {
  // 审查工具调用,返回验证后的版本
  const humanReview = interrupt({
    question: "Is this correct?",
    tool_call: toolCall,
  });

  const reviewAction = humanReview.action;
  const reviewData = humanReview.data;

  if (reviewAction === "continue") {
    return toolCall;
  } else if (reviewAction === "update") {
    const updatedToolCall = { ...toolCall, args: reviewData };
    return updatedToolCall;
  } else if (reviewAction === "feedback") {
    return new ToolMessage({
      content: reviewData,
      name: toolCall.name,
      tool_call_id: toolCall.id,
    });
  }

  throw new Error(`Unknown review action: ${reviewAction}`);
}

我们现在可以更新我们的 入口点 来审查生成的工具调用。如果工具调用被接受或修改,我们像以前一样执行。否则,我们只是追加人工提供的 ToolMessage。先前任务的结果——在本例中是初始模型调用——会被持久化,因此在 interrupt 之后不会再次运行。

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt


checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    llm_response = call_model(messages).result()
    while True:
        if not llm_response.tool_calls:
            break

        # 审查工具调用
        tool_results = []
        tool_calls = []
        for i, tool_call in enumerate(llm_response.tool_calls):
            review = review_tool_call(tool_call)
            if isinstance(review, ToolMessage):
                tool_results.append(review)
            else:  # 是验证后的工具调用
                tool_calls.append(review)
                if review != tool_call:
                    llm_response.tool_calls[i] = review  # 更新消息

        # 执行剩余的工具调用
        tool_result_futures = [call_tool(tool_call) for tool_call in tool_calls]
        remaining_tool_results = [fut.result() for fut in tool_result_futures]

        # 追加到消息列表
        messages = add_messages(
            messages,
            [llm_response, *tool_results, *remaining_tool_results],
        )

        # 再次调用模型
        llm_response = call_model(messages).result()

    # 生成最终响应
    messages = add_messages(messages, llm_response)
    return entrypoint.final(value=llm_response, save=messages)
import {
  MemorySaver,
  entrypoint,
  interrupt,
  Command,
  addMessages,
} from "@langchain/langgraph";
import { ToolMessage, AIMessage, BaseMessage } from "@langchain/core/messages";

const checkpointer = new MemorySaver();

const agent = entrypoint(
  { checkpointer, name: "agent" },
  async (
    messages: BaseMessage[],
    previous?: BaseMessage[]
  ): Promise<BaseMessage> => {
    if (previous !== undefined) {
      messages = addMessages(previous, messages);
    }

    let llmResponse = await callModel(messages);
    while (true) {
      if (!llmResponse.tool_calls?.length) {
        break;
      }

      // 审查工具调用
      const toolResults: ToolMessage[] = [];
      const toolCalls: ToolCall[] = [];

      for (let i = 0; i < llmResponse.tool_calls.length; i++) {
        const review = reviewToolCall(llmResponse.tool_calls[i]);
        if (review instanceof ToolMessage) {
          toolResults.push(review);
        } else {
          // 是验证后的工具调用
          toolCalls.push(review);
          if (review !== llmResponse.tool_calls[i]) {
            llmResponse.tool_calls[i] = review; // 更新消息
          }
        }
      }

      // 执行剩余的工具调用
      const remainingToolResults = await Promise.all(
        toolCalls.map((toolCall) => callTool(toolCall))
      );

      // 追加到消息列表
      messages = addMessages(messages, [
        llmResponse,
        ...toolResults,
        ...remainingToolResults,
      ]);

      // 再次调用模型
      llmResponse = await callModel(messages);
    }

    // 生成最终响应
    messages = addMessages(messages, llmResponse);
    return entrypoint.final({ value: llmResponse, save: messages });
  }
);

短期记忆

短期记忆允许在同一 线程 ID 的不同 调用 之间存储信息。更多详情请参阅 短期记忆

管理检查点

你可以查看和删除检查点器存储的信息。

查看线程状态(检查点)

config = {
    "configurable": {
        # highlight-next-line
        "thread_id": "1",
        # 可选地提供特定检查点的 ID,
        # 否则显示最新的检查点
        # highlight-next-line
        # "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"

    }
}
# highlight-next-line
graph.get_state(config)
StateSnapshot(
    values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today?), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]}, next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
    metadata={
        'source': 'loop',
        'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}},
        'step': 4,
        'parents': {},
        'thread_id': '1'
    },
    created_at='2025-05-05T16:01:24.680462+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
    tasks=(),
    interrupts=()
)
const config = {
  configurable: {
    // highlight-next-line
    thread_id: "1",
    // 可选地提供特定检查点的 ID,
    // 否则显示最新的检查点
    // highlight-next-line
    // checkpoint_id: "1f029ca3-1f5b-6704-8004-820c16b69a5a"
  },
};
// highlight-next-line
await graph.getState(config);
StateSnapshot {
  values: {
    messages: [
      HumanMessage { content: "hi! I'm bob" },
      AIMessage { content: "Hi Bob! How are you doing today?" },
      HumanMessage { content: "what's my name?" },
      AIMessage { content: "Your name is Bob." }
    ]
  },
  next: [],
  config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1f5b-6704-8004-820c16b69a5a' } },
  metadata: {
    source: 'loop',
    writes: { call_model: { messages: AIMessage { content: "Your name is Bob." } } },
    step: 4,
    parents: {},
    thread_id: '1'
  },
  createdAt: '2025-05-05T16:01:24.680462+00:00',
  parentConfig: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1790-6b0a-8003-baf965b6a38f' } },
  tasks: [],
  interrupts: []
}

查看线程历史(检查点)

config = {
    "configurable": {
        # highlight-next-line
        "thread_id": "1"
    }
}
# highlight-next-line
list(graph.get_state_history(config))
[
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}}, 'step': 4, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:24.680462+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?")]},
        next=('call_model',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        metadata={'source': 'loop', 'writes': None, 'step': 3, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863421+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8ab4155e-6b15-b885-9ce5-bed69a2c305c', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Your name is Bob.')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=('__start__',),
        config={...},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "what's my name?"}]}}, 'step': 2, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863173+00:00',
        parent_config={...}
        tasks=(PregelTask(id='24ba39d6-6db1-4c9b-f4c5-682aeaf38dcd', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "what's my name?"}]}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=(),
        config={...},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}}, 'step': 1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.862295+00:00',
        parent_config={...}
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob")]},
        next=('call_model',),
        config={...},
        metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.278960+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8cbd75e0-3720-b056-04f7-71ac805140a0', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-0870-6ce2-bfff-1f3f14c3e565'}},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}}, 'step': -1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.277497+00:00',
        parent_config=None,
        tasks=(PregelTask(id='d458367b-8265-812c-18e2-33001d199ce6', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}),),
        interrupts=()
    )
]
const config = {
  configurable: {
    // highlight-next-line
    thread_id: "1",
  },
};
// highlight-next-line
const history = [];
for await (const state of graph.getStateHistory(config)) {
  history.push(state);
}
[
  StateSnapshot {
    values: {
      messages: [
        HumanMessage { content: "hi! I'm bob" },
        AIMessage { content: "Hi Bob! How are you doing today? Is there anything I can help you with?" },
        HumanMessage { content: "what's my name?" },
        AIMessage { content: "Your name is Bob." }
      ]
    },
    next: [],
    config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1f5b-6704-8004-820c16b69a5a' } },
    metadata: { source: 'loop', writes: { call_model: { messages: AIMessage { content: "Your name is Bob." } } }, step: 4, parents: {}, thread_id: '1' },
    createdAt: '2025-05-05T16:01:24.680462+00:00',
    parentConfig: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1790-6b0a-8003-baf965b6a38f' } },
    tasks: [],
    interrupts: []
  },
  // ... 更多状态快照
]

解耦返回值与保存值

使用 entrypoint.final 可以将返回给调用者的内容与持久化到检查点的内容解耦。这在以下情况下很有用:

  • 你想返回一个计算结果(例如摘要或状态),但保存不同的内部值供下次调用使用。
  • 你需要控制下次运行时传递给 previous 参数的内容。
from typing import Optional
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: Optional[int]) -> entrypoint.final[int, int]:
    previous = previous or 0
    total = previous + n
    # 将 *前一个* 值返回给调用者,但将 *新的* 总计保存到检查点。
    return entrypoint.final(value=previous, save=total)

config = {"configurable": {"thread_id": "my-thread"}}

print(accumulate.invoke(1, config=config))  # 0
print(accumulate.invoke(2, config=config))  # 1
print(accumulate.invoke(3, config=config))  # 3
import { entrypoint, MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const accumulate = entrypoint(
  { checkpointer, name: "accumulate" },
  async (n: number, previous?: number) => {
    const prev = previous || 0;
    const total = prev + n;
    // 将 *前一个* 值返回给调用者,但将 *新的* 总计保存到检查点。
    return entrypoint.final({ value: prev, save: total });
  }
);

const config = { configurable: { thread_id: "my-thread" } };

console.log(await accumulate.invoke(1, config)); // 0
console.log(await accumulate.invoke(2, config)); // 1
console.log(await accumulate.invoke(3, config)); // 3

聊天机器人示例

使用函数式 API 和 InMemorySaver 检查点器的简单聊天机器人示例。该机器人能够记住之前的对话并从中断处继续。

from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
import { BaseMessage } from "@langchain/core/messages";
import {
  addMessages,
  entrypoint,
  task,
  MemorySaver,
} from "@langchain/langgraph";
import { ChatAnthropic } from "@langchain/anthropic";

const model = new ChatAnthropic({ model: "claude-3-5-sonnet-latest" });

const callModel = task(
  "callModel",
  async (messages: BaseMessage[]): Promise<BaseMessage> => {
    const response = await model.invoke(messages);
    return response;
  }
);

const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (
    inputs: BaseMessage[],
    previous?: BaseMessage[]
  ): Promise<BaseMessage> => {
    let messages = inputs;
    if (previous) {
      messages = addMessages(previous, inputs);
    }

    const response = await callModel(messages);
    return entrypoint.final({
      value: response,
      save: addMessages(messages, response),
    });
  }
);

const config = { configurable: { thread_id: "1" } };
const inputMessage = { role: "user", content: "hi! I'm bob" };

for await (const chunk of await workflow.stream([inputMessage], {
  ...config,
  streamMode: "values",
})) {
  console.log(chunk.content);
}

const inputMessage2 = { role: "user", content: "what's my name?" };
for await (const chunk of await workflow.stream([inputMessage2], {
  ...config,
  streamMode: "values",
})) {
  console.log(chunk.content);
}
扩展示例:构建简单聊天机器人

如何添加线程级持久化(函数式 API):展示如何向函数式 API 工作流添加线程级持久化并实现简单的聊天机器人。

长期记忆

长期记忆 允许在不同的 线程 ID 之间存储信息。这对于在一次对话中了解给定用户的信息并在另一次对话中使用它非常有用。

扩展示例:添加长期记忆

如何添加跨线程持久化(函数式 API):展示如何向函数式 API 工作流添加跨线程持久化并实现简单的聊天机器人。

工作流

智能代理

与其他库集成