跳转至

LangGraph 运行时

@[Pregel] 实现了 LangGraph 的运行时,管理 LangGraph 应用程序的执行。

编译 @[StateGraph][] 或创建 @[entrypoint][] 会产生一个可以用输入调用的 @[Pregel] 实例。

@[Pregel] 实现了 LangGraph 的运行时,管理 LangGraph 应用程序的执行。

编译 @[StateGraph][] 或创建 @[entrypoint][] 会产生一个可以用输入调用的 @[Pregel] 实例。

本指南从高层次解释运行时,并提供直接使用 Pregel 实现应用程序的说明。

注意: @[Pregel] 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。

注意: @[Pregel] 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。

概述

在 LangGraph 中,Pregel 将参与者和**通道**组合成单个应用程序。参与者**从通道读取数据并向通道写入数据。Pregel 按照 **Pregel 算法/**批量同步并行**模型将应用程序的执行组织成多个步骤。

每个步骤包含三个阶段:

  • 计划:确定此步骤要执行哪些**参与者**。例如,在第一步中,选择订阅特殊**输入**通道的**参与者**;在后续步骤中,选择订阅上一步更新的通道的**参与者**。
  • 执行:并行执行所有选定的**参与者**,直到全部完成、一个失败或达到超时。在此阶段,通道更新对参与者不可见,直到下一步。
  • 更新:用此步骤中**参与者**写入的值更新通道。

重复直到没有**参与者**被选中执行,或达到最大步骤数。

参与者

参与者**是一个 PregelNode。它订阅通道,从中读取数据,并向其写入数据。它可以被视为 Pregel 算法中的**参与者PregelNodes 实现 LangChain 的 Runnable 接口。

通道

通道用于参与者(PregelNodes)之间的通信。每个通道都有一个值类型、一个更新类型和一个更新函数——它接受一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来步骤中将数据从链发送给自身。LangGraph 提供了多个内置通道:

  • @[LastValue][]:默认通道,存储发送到通道的最后一个值,对输入和输出值或从一步向下一步发送数据很有用。
  • @[Topic][]:可配置的 PubSub 主题,对在**参与者**之间发送多个值或累积输出很有用。可配置为去重值或在多个步骤过程中累积值。
  • @[BinaryOperatorAggregate][]:存储一个持久值,通过对当前值和发送到通道的每个更新应用二元运算符来更新,对在多个步骤中计算聚合很有用;例如,total = BinaryOperatorAggregate(int, operator.add)
  • @[LastValue]:默认通道,存储发送到通道的最后一个值,对输入和输出值或从一步向下一步发送数据很有用。
  • @[Topic]:可配置的 PubSub 主题,对在**参与者**之间发送多个值或累积输出很有用。可配置为去重值或在多个步骤过程中累积值。
  • @[BinaryOperatorAggregate]:存储一个持久值,通过对当前值和发送到通道的每个更新应用二元运算符来更新,对在多个步骤中计算聚合很有用;例如,total = BinaryOperatorAggregate(int, operator.add)

示例

虽然大多数用户将通过 @[StateGraph][] API 或 @[entrypoint][] 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。

虽然大多数用户将通过 @[StateGraph] API 或 @[entrypoint] 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。

以下是几个不同的示例,让你了解 Pregel API。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}
import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";

const node1 = new NodeBuilder()
  .subscribeOnly("a")
  .do((x: string) => x + x)
  .writeTo("b");

const app = new Pregel({
  nodes: { node1 },
  channels: {
    a: new EphemeralValue<string>(),
    b: new EphemeralValue<string>(),
  },
  inputChannels: ["a"],
  outputChannels: ["b"],
});

await app.invoke({ a: "foo" });
{ b: 'foofoo' }
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
import { LastValue, EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";

const node1 = new NodeBuilder()
  .subscribeOnly("a")
  .do((x: string) => x + x)
  .writeTo("b");

const node2 = new NodeBuilder()
  .subscribeOnly("b")
  .do((x: string) => x + x)
  .writeTo("c");

const app = new Pregel({
  nodes: { node1, node2 },
  channels: {
    a: new EphemeralValue<string>(),
    b: new LastValue<string>(),
    c: new EphemeralValue<string>(),
  },
  inputChannels: ["a"],
  outputChannels: ["b", "c"],
});

await app.invoke({ a: "foo" });
{ b: 'foofoo', c: 'foofoofoofoo' }
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_to("b")
    .do(lambda x: x["b"] + x["b"])
    .write_to("c")
)

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': ['foofoo', 'foofoofoofoo']}
import { EphemeralValue, Topic } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";

const node1 = new NodeBuilder()
  .subscribeOnly("a")
  .do((x: string) => x + x)
  .writeTo("b", "c");

const node2 = new NodeBuilder()
  .subscribeTo("b")
  .do((x: { b: string }) => x.b + x.b)
  .writeTo("c");

const app = new Pregel({
  nodes: { node1, node2 },
  channels: {
    a: new EphemeralValue<string>(),
    b: new EphemeralValue<string>(),
    c: new Topic<string>({ accumulate: true }),
  },
  inputChannels: ["a"],
  outputChannels: ["c"],
});

await app.invoke({ a: "foo" });
{ c: ['foofoo', 'foofoofoofoo'] }

此示例演示如何使用 BinaryOperatorAggregate 通道实现归约器。

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder


node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)

def reducer(current, update):
    if current:
        return current + " | " + update
    else:
        return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
import { EphemeralValue, BinaryOperatorAggregate } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";

const node1 = new NodeBuilder()
  .subscribeOnly("a")
  .do((x: string) => x + x)
  .writeTo("b", "c");

const node2 = new NodeBuilder()
  .subscribeOnly("b")
  .do((x: string) => x + x)
  .writeTo("c");

const reducer = (current: string, update: string) => {
  if (current) {
    return current + " | " + update;
  } else {
    return update;
  }
};

const app = new Pregel({
  nodes: { node1, node2 },
  channels: {
    a: new EphemeralValue<string>(),
    b: new EphemeralValue<string>(),
    c: new BinaryOperatorAggregate<string>({ operator: reducer }),
  },
  inputChannels: ["a"],
  outputChannels: ["c"],
});

await app.invoke({ a: "foo" });

此示例演示如何在图中引入循环,通过让链写入它订阅的通道。执行将继续,直到向通道写入 None 值。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry

example_node = (
    NodeBuilder().subscribe_only("value")
    .do(lambda x: x + x if len(x) < 10 else None)
    .write_to(ChannelWriteEntry("value", skip_none=True))
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"],
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}

此示例演示如何在图中引入循环,通过让链写入它订阅的通道。执行将继续,直到向通道写入 null 值。

import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder, ChannelWriteEntry } from "@langchain/langgraph/pregel";

const exampleNode = new NodeBuilder()
  .subscribeOnly("value")
  .do((x: string) => x.length < 10 ? x + x : null)
  .writeTo(new ChannelWriteEntry("value", { skipNone: true }));

const app = new Pregel({
  nodes: { exampleNode },
  channels: {
    value: new EphemeralValue<string>(),
  },
  inputChannels: ["value"],
  outputChannels: ["value"],
});

await app.invoke({ value: "a" });
{ value: 'aaaaaaaaaaaaaaaa' }

高级 API

LangGraph 提供两个高级 API 来创建 Pregel 应用程序:StateGraph(图 API)函数式 API

@[StateGraph(图 API)][StateGraph] 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用程序。

from typing import TypedDict, Optional

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: Optional[str]
    score: Optional[float]

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")

# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()

@[StateGraph(图 API)][StateGraph] 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用程序。

import { START, StateGraph } from "@langchain/langgraph";

interface Essay {
  topic: string;
  content?: string;
  score?: number;
}

const writeEssay = (essay: Essay) => {
  return {
    content: `Essay about ${essay.topic}`,
  };
};

const scoreEssay = (essay: Essay) => {
  return {
    score: 10
  };
};

const builder = new StateGraph<Essay>({
  channels: {
    topic: null,
    content: null,
    score: null,
  }
})
  .addNode("writeEssay", writeEssay)
  .addNode("scoreEssay", scoreEssay)
  .addEdge(START, "writeEssay");

// Compile the graph.
// This will return a Pregel instance.
const graph = builder.compile();

编译后的 Pregel 实例将与节点和通道列表关联。你可以通过打印来检查节点和通道。

print(graph.nodes)

你将看到类似这样的内容:

{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)

你应该看到类似这样的内容

{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
console.log(graph.nodes);

你将看到类似这样的内容:

{
  __start__: PregelNode { ... },
  writeEssay: PregelNode { ... },
  scoreEssay: PregelNode { ... }
}
console.log(graph.channels);

你应该看到类似这样的内容

{
  topic: LastValue { ... },
  content: LastValue { ... },
  score: LastValue { ... },
  __start__: EphemeralValue { ... },
  writeEssay: EphemeralValue { ... },
  scoreEssay: EphemeralValue { ... },
  'branch:__start__:__self__:writeEssay': EphemeralValue { ... },
  'branch:__start__:__self__:scoreEssay': EphemeralValue { ... },
  'branch:writeEssay:__self__:writeEssay': EphemeralValue { ... },
  'branch:writeEssay:__self__:scoreEssay': EphemeralValue { ... },
  'branch:scoreEssay:__self__:writeEssay': EphemeralValue { ... },
  'branch:scoreEssay:__self__:scoreEssay': EphemeralValue { ... },
  'start:writeEssay': EphemeralValue { ... }
}

函数式 API 中,你可以使用 @[entrypoint][] 来创建 Pregel 应用程序。entrypoint 装饰器允许你定义一个接受输入并返回输出的函数。

from typing import TypedDict, Optional

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint

class Essay(TypedDict):
    topic: str
    content: Optional[str]
    score: Optional[float]


checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}

函数式 API 中,你可以使用 @[entrypoint][] 来创建 Pregel 应用程序。entrypoint 装饰器允许你定义一个接受输入并返回输出的函数。

import { MemorySaver } from "@langchain/langgraph";
import { entrypoint } from "@langchain/langgraph/func";

interface Essay {
  topic: string;
  content?: string;
  score?: number;
}

const checkpointer = new MemorySaver();

const writeEssay = entrypoint(
  { checkpointer, name: "writeEssay" },
  async (essay: Essay) => {
    return {
      content: `Essay about ${essay.topic}`,
    };
  }
);

console.log("Nodes: ");
console.log(writeEssay.nodes);
console.log("Channels: ");
console.log(writeEssay.channels);
Nodes:
{ writeEssay: PregelNode { ... } }
Channels:
{
  __start__: EphemeralValue { ... },
  __end__: LastValue { ... },
  __previous__: LastValue { ... }
}