跳转至

使用 Server API 实现人机协同

要审查、编辑和批准智能代理或工作流中的工具调用,请使用 LangGraph 的人机协同功能。

动态中断

from langgraph_sdk import get_client
# highlight-next-line
from langgraph_sdk.schema import Command
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# Run the graph until the interrupt is hit.
result = await client.runs.wait(
    thread_id,
    assistant_id,
    input={"some_text": "original text"}   # (1)!
)

print(result['__interrupt__']) # (2)!
# > [
# >     {
# >         'value': {'text_to_revise': 'original text'},
# >         'id': '...',
# >     }
# > ]


# Resume the graph
print(await client.runs.wait(
    thread_id,
    assistant_id,
    # highlight-next-line
    command=Command(resume="Edited text")   # (3)!
))
# > {'some_text': 'Edited text'}
  1. 使用初始状态调用图。
  2. 当图遇到中断时,它返回一个包含负载和元数据的中断对象。
  3. 使用 Command(resume=...) 恢复图,注入人类输入并继续执行。
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });

// Using the graph deployed with the name "agent"
const assistantID = "agent";

// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];

// Run the graph until the interrupt is hit.
const result = await client.runs.wait(
  threadID,
  assistantID,
  { input: { "some_text": "original text" } }   // (1)!
);

console.log(result['__interrupt__']); // (2)!
// > [
// >     {
// >         'value': {'text_to_revise': 'original text'},
// >         'resumable': True,
// >         'ns': ['human_node:fc722478-2f21-0578-c572-d9fc4dd07c3b'],
// >         'when': 'during'
// >     }
// > ]

// Resume the graph
console.log(await client.runs.wait(
    threadID,
    assistantID,
    // highlight-next-line
    { command: { resume: "Edited text" }}   // (3)!
));
// > {'some_text': 'Edited text'}
  1. 使用初始状态调用图。
  2. 当图遇到中断时,它返回一个包含负载和元数据的中断对象。
  3. 使用 { resume: ... } 命令对象恢复图,注入人类输入并继续执行。

创建线程:

curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'

运行图直到遇到中断:

curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"input\": {\"some_text\": \"original text\"}
}"

恢复图:

curl --request POST \
 --url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
 --header 'Content-Type: application/json' \
 --data "{
   \"assistant_id\": \"agent\",
   \"command\": {
     \"resume\": \"Edited text\"
   }
 }"
扩展示例:使用 interrupt

这是可以在 LangGraph API 服务器中运行的示例图。 有关更多详细信息,请参阅 LangGraph Platform 快速入门

from typing import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
# highlight-next-line
from langgraph.types import interrupt, Command

class State(TypedDict):
    some_text: str

def human_node(state: State):
    # highlight-next-line
    value = interrupt( # (1)!
        {
            "text_to_revise": state["some_text"] # (2)!
        }
    )
    return {
        "some_text": value # (3)!
    }


# Build the graph
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")

graph = graph_builder.compile()
  1. interrupt(...)human_node 暂停执行,将给定的负载展现给人类。
  2. 任何 JSON 可序列化的值都可以传递给 interrupt 函数。这里是一个包含要修改的文本的字典。
  3. 一旦恢复,interrupt(...) 的返回值就是人类提供的输入,用于更新状态。

一旦你有了正在运行的 LangGraph API 服务器,你可以使用 LangGraph SDK 与其交互

from langgraph_sdk import get_client
# highlight-next-line
from langgraph_sdk.schema import Command
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# Run the graph until the interrupt is hit.
result = await client.runs.wait(
    thread_id,
    assistant_id,
    input={"some_text": "original text"}   # (1)!
)

print(result['__interrupt__']) # (2)!
# > [
# >     {
# >         'value': {'text_to_revise': 'original text'},
# >         'id': '...',
# >     }
# > ]


# Resume the graph
print(await client.runs.wait(
    thread_id,
    assistant_id,
    # highlight-next-line
    command=Command(resume="Edited text")   # (3)!
))
# > {'some_text': 'Edited text'}
  1. 使用初始状态调用图。
  2. 当图遇到中断时,它返回一个包含负载和元数据的中断对象。
  3. 使用 Command(resume=...) 恢复图,注入人类输入并继续执行。
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });

// Using the graph deployed with the name "agent"
const assistantID = "agent";

// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];

// Run the graph until the interrupt is hit.
const result = await client.runs.wait(
  threadID,
  assistantID,
  { input: { "some_text": "original text" } }   // (1)!
);

console.log(result['__interrupt__']); // (2)!
// > [
// >     {
// >         'value': {'text_to_revise': 'original text'},
// >         'resumable': True,
// >         'ns': ['human_node:fc722478-2f21-0578-c572-d9fc4dd07c3b'],
// >         'when': 'during'
// >     }
// > ]

// Resume the graph
console.log(await client.runs.wait(
    threadID,
    assistantID,
    // highlight-next-line
    { command: { resume: "Edited text" }}   // (3)!
));
// > {'some_text': 'Edited text'}
  1. 使用初始状态调用图。
  2. 当图遇到中断时,它返回一个包含负载和元数据的中断对象。
  3. 使用 { resume: ... } 命令对象恢复图,注入人类输入并继续执行。

创建线程:

curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'

运行图直到遇到中断:

curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"input\": {\"some_text\": \"original text\"}
}"

恢复图:

curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"command\": {
    \"resume\": \"Edited text\"
  }
}"

静态中断

静态中断(也称为静态断点)在节点执行之前或之后触发。

Warning

**不**推荐将静态中断用于人机协同工作流。它们最适合用于调试和测试。

你可以在编译时通过指定 interrupt_beforeinterrupt_after 来设置静态中断:

# highlight-next-line
graph = graph_builder.compile( # (1)!
    # highlight-next-line
    interrupt_before=["node_a"], # (2)!
    # highlight-next-line
    interrupt_after=["node_b", "node_c"], # (3)!
)
  1. 断点在 compile 时设置。
  2. interrupt_before 指定在节点执行之前应暂停执行的节点。
  3. interrupt_after 指定在节点执行之后应暂停执行的节点。

或者,你可以在运行时设置静态中断:

# highlight-next-line
await client.runs.wait( # (1)!
    thread_id,
    assistant_id,
    inputs=inputs,
    # highlight-next-line
    interrupt_before=["node_a"], # (2)!
    # highlight-next-line
    interrupt_after=["node_b", "node_c"] # (3)!
)
  1. 使用 interrupt_beforeinterrupt_after 参数调用 client.runs.wait。这是运行时配置,可以在每次调用时更改。
  2. interrupt_before 指定在节点执行之前应暂停执行的节点。
  3. interrupt_after 指定在节点执行之后应暂停执行的节点。
// highlight-next-line
await client.runs.wait( // (1)!
    threadID,
    assistantID,
    {
    input: input,
    // highlight-next-line
    interruptBefore: ["node_a"], // (2)!
    // highlight-next-line
    interruptAfter: ["node_b", "node_c"] // (3)!
    }
)
  1. 使用 interruptBeforeinterruptAfter 参数调用 client.runs.wait。这是运行时配置,可以在每次调用时更改。
  2. interruptBefore 指定在节点执行之前应暂停执行的节点。
  3. interruptAfter 指定在节点执行之后应暂停执行的节点。
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
    \"assistant_id\": \"agent\",
    \"interrupt_before\": [\"node_a\"],
    \"interrupt_after\": [\"node_b\", \"node_c\"],
    \"input\": <INPUT>
}"

以下示例展示了如何添加静态中断:

from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# Run the graph until the breakpoint
result = await client.runs.wait(
    thread_id,
    assistant_id,
    input=inputs   # (1)!
)

# Resume the graph
await client.runs.wait(
    thread_id,
    assistant_id,
    input=None   # (2)!
)
  1. 运行图直到遇到第一个断点。
  2. 通过为输入传递 None 来恢复图。这将运行图直到遇到下一个断点。
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });

// Using the graph deployed with the name "agent"
const assistantID = "agent";

// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];

// Run the graph until the breakpoint
const result = await client.runs.wait(
  threadID,
  assistantID,
  { input: input }   // (1)!
);

// Resume the graph
await client.runs.wait(
  threadID,
  assistantID,
  { input: null }   // (2)!
);
  1. 运行图直到遇到第一个断点。
  2. 通过为输入传递 null 来恢复图。这将运行图直到遇到下一个断点。

创建线程:

curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'

运行图直到遇到断点:

curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"input\": <INPUT>
}"

恢复图:

curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\"
}"

了解更多

  • 人机协同概念指南:了解有关 LangGraph 人机协同功能的更多信息。
  • 常见模式:了解如何实现诸如批准/拒绝操作、请求用户输入、工具调用审查和验证人类输入等模式。