LangGraph 运行时¶
@[Pregel] 实现了 LangGraph 的运行时,管理 LangGraph 应用程序的执行。
编译 @[StateGraph][] 或创建 @[entrypoint][] 会产生一个可以用输入调用的 @[Pregel] 实例。
@[Pregel] 实现了 LangGraph 的运行时,管理 LangGraph 应用程序的执行。
编译 @[StateGraph][] 或创建 @[entrypoint][] 会产生一个可以用输入调用的 @[Pregel] 实例。
本指南从高层次解释运行时,并提供直接使用 Pregel 实现应用程序的说明。
注意: @[Pregel] 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。
注意: @[Pregel] 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。
概述¶
在 LangGraph 中,Pregel 将参与者和**通道**组合成单个应用程序。参与者**从通道读取数据并向通道写入数据。Pregel 按照 **Pregel 算法/**批量同步并行**模型将应用程序的执行组织成多个步骤。
每个步骤包含三个阶段:
- 计划:确定此步骤要执行哪些**参与者**。例如,在第一步中,选择订阅特殊**输入**通道的**参与者**;在后续步骤中,选择订阅上一步更新的通道的**参与者**。
- 执行:并行执行所有选定的**参与者**,直到全部完成、一个失败或达到超时。在此阶段,通道更新对参与者不可见,直到下一步。
- 更新:用此步骤中**参与者**写入的值更新通道。
重复直到没有**参与者**被选中执行,或达到最大步骤数。
参与者¶
参与者**是一个 PregelNode。它订阅通道,从中读取数据,并向其写入数据。它可以被视为 Pregel 算法中的**参与者。PregelNodes 实现 LangChain 的 Runnable 接口。
通道¶
通道用于参与者(PregelNodes)之间的通信。每个通道都有一个值类型、一个更新类型和一个更新函数——它接受一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来步骤中将数据从链发送给自身。LangGraph 提供了多个内置通道:
- @[LastValue][]:默认通道,存储发送到通道的最后一个值,对输入和输出值或从一步向下一步发送数据很有用。
- @[Topic][]:可配置的 PubSub 主题,对在**参与者**之间发送多个值或累积输出很有用。可配置为去重值或在多个步骤过程中累积值。
- @[BinaryOperatorAggregate][]:存储一个持久值,通过对当前值和发送到通道的每个更新应用二元运算符来更新,对在多个步骤中计算聚合很有用;例如,
total = BinaryOperatorAggregate(int, operator.add)
- @[LastValue]:默认通道,存储发送到通道的最后一个值,对输入和输出值或从一步向下一步发送数据很有用。
- @[Topic]:可配置的 PubSub 主题,对在**参与者**之间发送多个值或累积输出很有用。可配置为去重值或在多个步骤过程中累积值。
- @[BinaryOperatorAggregate]:存储一个持久值,通过对当前值和发送到通道的每个更新应用二元运算符来更新,对在多个步骤中计算聚合很有用;例如,
total = BinaryOperatorAggregate(int, operator.add)
示例¶
虽然大多数用户将通过 @[StateGraph][] API 或 @[entrypoint][] 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
虽然大多数用户将通过 @[StateGraph] API 或 @[entrypoint] 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是几个不同的示例,让你了解 Pregel API。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b");
const app = new Pregel({
nodes: { node1 },
channels: {
a: new EphemeralValue<string>(),
b: new EphemeralValue<string>(),
},
inputChannels: ["a"],
outputChannels: ["b"],
});
await app.invoke({ a: "foo" });
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
import { LastValue, EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b");
const node2 = new NodeBuilder()
.subscribeOnly("b")
.do((x: string) => x + x)
.writeTo("c");
const app = new Pregel({
nodes: { node1, node2 },
channels: {
a: new EphemeralValue<string>(),
b: new LastValue<string>(),
c: new EphemeralValue<string>(),
},
inputChannels: ["a"],
outputChannels: ["b", "c"],
});
await app.invoke({ a: "foo" });
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
import { EphemeralValue, Topic } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b", "c");
const node2 = new NodeBuilder()
.subscribeTo("b")
.do((x: { b: string }) => x.b + x.b)
.writeTo("c");
const app = new Pregel({
nodes: { node1, node2 },
channels: {
a: new EphemeralValue<string>(),
b: new EphemeralValue<string>(),
c: new Topic<string>({ accumulate: true }),
},
inputChannels: ["a"],
outputChannels: ["c"],
});
await app.invoke({ a: "foo" });
此示例演示如何使用 BinaryOperatorAggregate 通道实现归约器。
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
def reducer(current, update):
if current:
return current + " | " + update
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
import { EphemeralValue, BinaryOperatorAggregate } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b", "c");
const node2 = new NodeBuilder()
.subscribeOnly("b")
.do((x: string) => x + x)
.writeTo("c");
const reducer = (current: string, update: string) => {
if (current) {
return current + " | " + update;
} else {
return update;
}
};
const app = new Pregel({
nodes: { node1, node2 },
channels: {
a: new EphemeralValue<string>(),
b: new EphemeralValue<string>(),
c: new BinaryOperatorAggregate<string>({ operator: reducer }),
},
inputChannels: ["a"],
outputChannels: ["c"],
});
await app.invoke({ a: "foo" });
此示例演示如何在图中引入循环,通过让链写入它订阅的通道。执行将继续,直到向通道写入 None 值。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
example_node = (
NodeBuilder().subscribe_only("value")
.do(lambda x: x + x if len(x) < 10 else None)
.write_to(ChannelWriteEntry("value", skip_none=True))
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"],
)
app.invoke({"value": "a"})
此示例演示如何在图中引入循环,通过让链写入它订阅的通道。执行将继续,直到向通道写入 null 值。
import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder, ChannelWriteEntry } from "@langchain/langgraph/pregel";
const exampleNode = new NodeBuilder()
.subscribeOnly("value")
.do((x: string) => x.length < 10 ? x + x : null)
.writeTo(new ChannelWriteEntry("value", { skipNone: true }));
const app = new Pregel({
nodes: { exampleNode },
channels: {
value: new EphemeralValue<string>(),
},
inputChannels: ["value"],
outputChannels: ["value"],
});
await app.invoke({ value: "a" });
高级 API¶
LangGraph 提供两个高级 API 来创建 Pregel 应用程序:StateGraph(图 API)和函数式 API。
@[StateGraph(图 API)][StateGraph] 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用程序。
from typing import TypedDict, Optional
from langgraph.constants import START
from langgraph.graph import StateGraph
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
def score_essay(essay: Essay):
return {
"score": 10
}
builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
@[StateGraph(图 API)][StateGraph] 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用程序。
import { START, StateGraph } from "@langchain/langgraph";
interface Essay {
topic: string;
content?: string;
score?: number;
}
const writeEssay = (essay: Essay) => {
return {
content: `Essay about ${essay.topic}`,
};
};
const scoreEssay = (essay: Essay) => {
return {
score: 10
};
};
const builder = new StateGraph<Essay>({
channels: {
topic: null,
content: null,
score: null,
}
})
.addNode("writeEssay", writeEssay)
.addNode("scoreEssay", scoreEssay)
.addEdge(START, "writeEssay");
// Compile the graph.
// This will return a Pregel instance.
const graph = builder.compile();
编译后的 Pregel 实例将与节点和通道列表关联。你可以通过打印来检查节点和通道。
你将看到类似这样的内容:
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
你应该看到类似这样的内容
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
你将看到类似这样的内容:
你应该看到类似这样的内容
{
topic: LastValue { ... },
content: LastValue { ... },
score: LastValue { ... },
__start__: EphemeralValue { ... },
writeEssay: EphemeralValue { ... },
scoreEssay: EphemeralValue { ... },
'branch:__start__:__self__:writeEssay': EphemeralValue { ... },
'branch:__start__:__self__:scoreEssay': EphemeralValue { ... },
'branch:writeEssay:__self__:writeEssay': EphemeralValue { ... },
'branch:writeEssay:__self__:scoreEssay': EphemeralValue { ... },
'branch:scoreEssay:__self__:writeEssay': EphemeralValue { ... },
'branch:scoreEssay:__self__:scoreEssay': EphemeralValue { ... },
'start:writeEssay': EphemeralValue { ... }
}
在函数式 API 中,你可以使用 @[entrypoint][] 来创建 Pregel 应用程序。entrypoint 装饰器允许你定义一个接受输入并返回输出的函数。
from typing import TypedDict, Optional
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}
在函数式 API 中,你可以使用 @[entrypoint][] 来创建 Pregel 应用程序。entrypoint 装饰器允许你定义一个接受输入并返回输出的函数。
import { MemorySaver } from "@langchain/langgraph";
import { entrypoint } from "@langchain/langgraph/func";
interface Essay {
topic: string;
content?: string;
score?: number;
}
const checkpointer = new MemorySaver();
const writeEssay = entrypoint(
{ checkpointer, name: "writeEssay" },
async (essay: Essay) => {
return {
content: `Essay about ${essay.topic}`,
};
}
);
console.log("Nodes: ");
console.log(writeEssay.nodes);
console.log("Channels: ");
console.log(writeEssay.channels);