Skip to content

Conversation

shentongmartin
Copy link
Contributor

@shentongmartin shentongmartin commented Sep 23, 2025

Feat: Add Robust Graph Interrupt and Resume Mechanism

Summary & Motivation

This pull request introduces a robust and developer-friendly mechanism for interrupting and resuming graph execution. The previous system had several limitations that made handling complex interruptions difficult, including a lack of local state persistence, no deep addressing for nested components, no way to provide targeted resume data, and an ambiguous execution context for components.

This new mechanism is a ground-up redesign that addresses all of these issues by introducing a stable path-based addressing system, a clear and powerful set of APIs for signaling and handling interrupts, and a well-defined workflow for end-users to resume execution.

Key Features

  1. New: Hierarchical Path-Based Addressing:

    • This PR introduces a new, foundational Address system ([]AddressSegment) that gives every component in a graph a unique, hierarchical, and stable stringified ID (e.g., runnable:root;node:A;tool:tool_123).
    • The address is composed of AddressSegments. The AddressSegmentType is an extensible string, allowing developers to define custom types (like process or agent) for their own composite components, in addition to the built-in AddressSegmentNode, AddressSegmentTool, and AddressSegmentRunnable.
    • The framework now automatically manages this address. This enables true "black box" composition, where a Runnable like a sub-graph invoked from within a lambda node automatically inherits the full address of its parent, making its internal interrupt points seamlessly addressable from the outside.
  2. New: Modernized, Address-Aware Interrupt API:

    • Interrupt(ctx, info) & StatefulInterrupt(ctx, info, state): These are the new primary, context-aware functions for creating interrupts. They automatically capture the component's full address from the context, ensuring every interrupt is uniquely addressable.
    • CompositeInterrupt(ctx, info, state, ...errs): A powerful new function designed for composite nodes. It accepts a variadic list of sub-errors and correctly bundles them into a single, hierarchical interrupt that the framework can deconstruct.
    • Deprecations: The old InterruptAndRerun and NewInterruptAndRerunErr functions are now deprecated, as they do not carry address information.
    • WrapInterruptAndRerunIfNeeded(ctx, step, err): To handle legacy components or simple sub-processes that still use the deprecated errors, this new helper function wraps an error with a AddressSegment, making it compatible with the new CompositeInterrupt API.
  3. New: User-Facing Resume Workflow:

    • When an interrupt occurs, the returned error contains InterruptInfo. The user can call interruptInfo.InterruptContexts to get a flat list of all available resumable points.
    • Each item in the list is an InterruptCtx, which contains the user-facing Info and, most importantly, the unique, stable ID of the interrupt point.
    • The user can then use this ID to target a specific resumption by calling Resume(ctx, id) or ResumeWithData(ctx, id, data) to create a new context for the next Invoke call.
  4. New: Component-Facing API (resume.go):

    • GetInterruptState[T](ctx): Allows a component to check if it was previously interrupted and retrieve its persisted state. The return order (wasInterrupted, hasState, state) follows a natural logical flow.
    • GetResumeContext[T](ctx): Allows a component to determine if it is the specific target of a Resume operation and retrieve any associated data. The return order (isResumeFlow, hasData, data) is similarly intuitive.
    • GetCurrentAddress(ctx): Returns the full address of the current component.
  5. Guaranteed One-Time State Consumption: The framework ensures that interrupt state and resume data are consumed only once per address per checkpoint. This is a critical correctness guarantee that prevents bugs from accidental state reuse.

How to Implement a Resumable Component

Simple Component

func MyInterruptibleComponent(ctx context.Context, input any) (any, error) {
    if wasInterrupted, hasState, state := GetInterruptState[*myState](ctx); wasInterrupted {
        if isResume, hasData, data := GetResumeContext[MyResumeData](ctx); isResume {
            // ... handle resume logic using 'state' and 'data' ...
            return "Work completed.", nil
        } else {
            // Re-interrupt, preserving the original state.
            return nil, StatefulInterrupt(ctx, "Still needs attention", state)
        }
    } else {
        // Clean run, interrupt with state if necessary.
        return nil, StatefulInterrupt(ctx, "Needs human input", &myState{...})
    }
}

Composite Component

func MyCompositeNode(ctx context.Context, input any) (any, error) {
    var subProcessErrs []error
    
    for _, subTask := range subTasks {
        subTaskAddrSeg := compose.AddressSegment{Type: "sub_process", ID: subTask.ID}
        
        // Run the sub-task
        _, err := subTask.Run() // Assume this returns a simple, pathless error
        if err != nil {
            // Wrap the simple error to give it a path before passing to CompositeInterrupt.
            wrappedErr := compose.WrapInterruptAndRerunIfNeeded(ctx, subTaskStep, err)
            subProcessErrs = append(subProcessErrs, wrappedErr)
        }
    }

    if len(subProcessErrs) > 0 {
        // Bundle all sub-interrupts into a single composite interrupt.
        return nil, CompositeInterrupt(ctx, "Some sub-tasks need attention", &myCompositeState{...}, subProcessErrs...)
    }

    return "All sub-tasks completed.", nil
}

User-Facing Interaction Pattern

// 1. First invocation is interrupted
_, err := graph.Invoke(ctx, input, WithCheckPointID(cpID))

// 2. Extract interrupt info and get all resumable points
interruptInfo, _ := ExtractInterruptInfo(err)
interruptContexts := interruptInfo.InterruptContexts // Get a flat list of InterruptCtx

// 3. Prepare a new context to resume a specific point using its ID
resumeCtx := ResumeWithData(context.Background(), interruptContexts[0].ID, &myResumeData{...})

// 4. Resume the graph execution
output, err := graph.Invoke(resumeCtx, input, WithCheckPointID(cpID))

功能: 增加健壮的图中断与恢复机制

概要与动机

本次 PR 引入了一套健壮且对开发者友好的图执行中断与恢复机制。此前的系统在处理复杂中断场景时存在一些限制,包括:缺少局部状态持久化、对嵌套组件缺少深度寻址能力、无法提供定向的恢复数据,以及组件的执行上下文模糊不清。

这个新机制是一次彻底的重新设计,通过引入一个稳定的、基于路径的寻址系统,一套清晰、强大的用于发信号和处理中断的 API,以及一个为终端用户定义的、清晰的恢复工作流,解决了所有这些问题。

核心功能

  1. 新功能: 分层的、基于路径的寻址:

    • 本次 PR 引入了一个全新的、基础性的 Address 系统 ([]AddressSegment),它为图中的每个组件提供了一个唯一的、分层的、稳定的字符串化 ID (例如, runnable:root;node:A;tool:tool_123)。
    • 地址由 AddressSegment 组成。AddressSegmentType 是一个可扩展的 string,除了内置的 AddressSegmentNode, AddressSegmentTool, 和 AddressSegmentRunnable 之外,还允许开发者为他们自己的复合组件定义自定义类型 (如 processagent)。
    • 框架现在会自动管理这个地址。这实现了真正的“黑盒”组合,其中像在 lambda 节点内调用的子图这样的 Runnable 会自动继承其父节点的完整地址,使其内部的中断点可以从外部无缝寻址。
  2. 新功能: 现代化的、路径感知的中断 API:

    • Interrupt(ctx, info) & StatefulInterrupt(ctx, info, state): 这是用于创建中断的、新的主要上下文感知函数。它们会自动从上下文中捕获组件的完整地址,确保每个中断都是唯一可寻址的。
    • CompositeInterrupt(ctx, info, state, ...errs): 一个为复合节点设计的强大的新函数。它接受一个可变参数的子错误列表,并正确地将它们捆绑成一个单一的、分层的中断错误,框架可以将其解构。
    • 弃用: 旧的、不含地址的 InterruptAndRerunNewInterruptAndRerunErr 函数现已弃用,因为它们不携带地址信息。
    • WrapInterruptAndRerunIfNeeded(ctx, step, err): 为了处理仍在使用已弃用错误的遗留组件或简单子流程,这个新的辅助函数用一个 PathStep 包装一个无地址的错误,使其与新的 CompositeInterrupt API 兼容。
  3. 新功能: 面向用户的恢复工作流:

    • 当发生中断时,返回的错误包含 InterruptInfo。用户可以调用 interruptInfo.InterruptContexts 来获取一个包含所有可用恢复点的扁平列表。
    • 列表中的每一项都是一个 InterruptCtx,其中包含面向用户的 Info,以及最重要地,中断点的唯一、稳定的 ID
    • 然后,用户可以使用这个 ID,通过调用 Resume(ctx, id)ResumeWithData(ctx, id, data) 来为下一次 Invoke 调用创建一个新的上下文,从而实现定向恢复。
  4. 新功能: 面向组件的 API (resume.go):

    • GetInterruptState[T](ctx): 允许组件检查它之前是否被中断过,并检索其持久化的状态。其返回值顺序 (wasInterrupted, hasState, state) 遵循自然的逻辑流程。
    • GetResumeContext[T](ctx): 允许组件判断它是否是 Resume 操作的特定目标,并检索任何关联的数据。其返回值顺序 (isResumeFlow, hasData, data) 同样直观。
    • GetCurrentAddress(ctx): 返回当前执行组件的完整地址。
  5. 保证状态一次性消费: 框架确保中断状态和恢复数据在每个检查点中,对于同一地址只被消费一次。这是一个关键的正确性保证,可以防止因意外重用状态而导致的错误。

如何实现一个可恢复的组件

简单组件

func MyInterruptibleComponent(ctx context.Context, input any) (any, error) {
    if wasInterrupted, hasState, state := GetInterruptState[*myState](ctx); wasInterrupted {
        if isResume, hasData, data := GetResumeContext[MyResumeData](ctx); isResume {
            // ... 使用 'state' 和 'data' 处理恢复逻辑 ...
            return "Work completed.", nil
        } else {
            // 重新中断,保留原始状态
            return nil, StatefulInterrupt(ctx, "Still needs attention", state)
        }
    } else {
        // 清洁运行,必要时带状态中断
        return nil, StatefulInterrupt(ctx, "Needs human input", &myState{...})
    }
}

复合组件

func MyCompositeNode(ctx context.Context, input any) (any, error) {
    var subProcessErrs []error
    
    for _, subTask := range subTasks {
        subTaskAddrSeg := compose.AddressSegment{Type: "sub_process", ID: subTask.ID}
        
        // 运行子任务
        _, err := subTask.Run() // 假设这返回一个简单的、无路径的错误
        if err != nil {
            // 在传递给 CompositeInterrupt 之前,包装简单错误以赋予其路径
            wrappedErr := compose.WrapInterruptAndRerunIfNeeded(ctx, subTaskStep, err)
            subProcessErrs = append(subProcessErrs, wrappedErr)
        }
    }

    if len(subProcessErrs) > 0 {
        // 将所有子中断捆绑成一个复合中断
        return nil, CompositeInterrupt(ctx, "Some sub-tasks need attention", &myCompositeState{...}, subProcessErrs...)
    }

    return "All sub-tasks completed.", nil
}

面向用户的交互模式

// 1. 第一次调用被中断
_, err := graph.Invoke(ctx, input, WithCheckPointID(cpID))

// 2. 提取中断信息并获取所有可恢复点
interruptInfo, _ := ExtractInterruptInfo(err)
interruptContexts := interruptInfo.InterruptContexts // 拿到一个 InterruptCtx 的扁平列表

// 3. 准备一个新的上下文,使用其 ID 来恢复一个特定的点
resumeCtx := ResumeWithData(context.Background(), interruptContexts[0].ID, &myResumeData{...})

// 4. 恢复图的执行
output, err := graph.Invoke(resumeCtx, input, WithCheckPointID(cpID))

Copy link

github-actions bot commented Sep 23, 2025

📊 Coverage Report:

File coverage threshold (20%) satisfied:	PASS
Package coverage threshold (30%) satisfied:	PASS
Total coverage threshold (83%) satisfied:	PASS
Total test coverage: 83.6% (6465/7735)

@shentongmartin shentongmartin force-pushed the refactor/graph_resume branch 4 times, most recently from c26f251 to 8882e60 Compare September 25, 2025 09:04
@shentongmartin shentongmartin force-pushed the refactor/graph_resume branch 2 times, most recently from d264744 to 838550a Compare September 28, 2025 13:03
Change-Id: I30b5fe3a8122b86fdd2818bf66dec18bc155e33f
…ions

Change-Id: I2e6357acec95433e648fbdc8509ef44e6eb22dce
Change-Id: I67d05742e079b1fe7ee11dbadec9ac5c4c3ffd8c
This change exports the  function to , making it a public part of the API.

This is essential for developers who want to build their own custom composite nodes. To correctly implement the  and  interfaces, a custom node needs the ability to create distinct execution contexts for its internal, interruptible sub-components.

Exporting this function provides the necessary hook for that advanced use case. All internal call sites within the  package have been updated to use the new exported function name.

Change-Id: Ibbc651131333860064454f691b346ab87149824c
Change-Id: I54b60cd3ee79e20e92ec1fe1b2e870f18763f5b6
Change-Id: Ifad728c6c6ea59cb21b2e622ebad51f249b5fc91
Change-Id: Ia7f0b548fae95f610e4e59333c9ec9e60e13b827
…f clear it

Change-Id: I9ab6290573b1faf6bda39a282e7986df61f2307d
…ass in PathStep

Change-Id: I43c40b77ba74c6408dd3e930ea330bf4099e4f24
…and GetResumeContext

Change-Id: Ia97f4245db7edce7e784443cb36c14cc211043ce
Change-Id: Icd7dc35e924b1a9b58a933d6f5a7aaf0ab51c57a
Change-Id: Ifae23e99c2392a4e1f1e938cb725e76c5a71f886
Change-Id: I689c9ef7799020f0e988fd724620641efb71ef80
Change-Id: I5cf93f1a00ea0aaa07cde62ac5d0749a4a3912b1
Change-Id: If26cc015084802d7509ef9a8a46f17902a374e2a
Change-Id: I3d845adc10698f67443d11543e6cd6269dc69877
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant