Skip to content

Commit 838550a

Browse files
refactor: rename Path and PathStep to Address and AddressSegment
Change-Id: I689c9ef7799020f0e988fd724620641efb71ef80
1 parent ebce372 commit 838550a

File tree

7 files changed

+130
-123
lines changed

7 files changed

+130
-123
lines changed

compose/checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ type checkpoint struct {
128128

129129
SubGraphs map[string]*checkpoint
130130

131-
InterruptPoints []*interruptStateForPath
131+
InterruptPoints []*interruptStateForAddress
132132
}
133133

134134
type stateModifierKey struct{}

compose/generic_graph.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func compileAnyGraph[I, O any](ctx context.Context, g AnyGraph, opts ...GraphCom
142142
}
143143

144144
ctxWrapper := func(ctx context.Context, opts ...Option) context.Context {
145-
return initGraphCallbacks(AppendPathStep(ctx, PathStepRunnable, option.graphName), cr.nodeInfo, cr.meta, opts...)
145+
return initGraphCallbacks(AppendAddressSegment(ctx, AddressSegmentRunnable, option.graphName), cr.nodeInfo, cr.meta, opts...)
146146
}
147147

148148
rp, err := toGenericRunnable[I, O](cr, ctxWrapper)

compose/graph_run.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,13 @@ type interruptTempInfo struct {
402402
interruptAfterNodes []string
403403
interruptRerunExtra map[string]any
404404

405-
interruptPoints []*interruptStateForPath
405+
interruptPoints []*interruptStateForAddress
406406
interruptContexts []*InterruptCtx
407407
}
408408

409409
func (it *interruptTempInfo) processInterruptErr(ire *interruptAndRerun) {
410-
it.interruptPoints = append(it.interruptPoints, &interruptStateForPath{
411-
P: ire.path,
410+
it.interruptPoints = append(it.interruptPoints, &interruptStateForAddress{
411+
Addr: ire.path,
412412
S: &interruptState{
413413
Interrupted: true,
414414
State: ire.state,
@@ -656,7 +656,7 @@ func (r *runner) createTasks(ctx context.Context, nodeMap map[string]any, optMap
656656
}
657657

658658
nextTasks = append(nextTasks, &task{
659-
ctx: AppendPathStep(ctx, PathStepNode, nodeKey),
659+
ctx: AppendAddressSegment(ctx, AddressSegmentNode, nodeKey),
660660
nodeKey: nodeKey,
661661
call: call,
662662
input: nodeInput,
@@ -716,7 +716,7 @@ func (r *runner) restoreTasks(
716716
}
717717

718718
newTask := &task{
719-
ctx: AppendPathStep(ctx, PathStepNode, key),
719+
ctx: AppendAddressSegment(ctx, AddressSegmentNode, key),
720720
nodeKey: key,
721721
call: call,
722722
input: input,

compose/interrupt.go

Lines changed: 62 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -50,28 +50,28 @@ func NewInterruptAndRerunErr(extra any) error {
5050
}
5151

5252
type wrappedInterruptAndRerun struct {
53-
ps Path
53+
ps Address
5454
inner error
5555
}
5656

5757
func (w *wrappedInterruptAndRerun) Error() string {
58-
return fmt.Sprintf("interrupt and rerun at path %s: %s", w.ps.String(), w.inner.Error())
58+
return fmt.Sprintf("interrupt and rerun at address %s: %s", w.ps.String(), w.inner.Error())
5959
}
6060

6161
func (w *wrappedInterruptAndRerun) Unwrap() error {
6262
return w.inner
6363
}
6464

6565
// WrapInterruptAndRerunIfNeeded wraps the InterruptAndRerun error, or the error returned by
66-
// NewInterruptAndRerunErr, with the current path.
66+
// NewInterruptAndRerunErr, with the current execution address.
6767
// If the error is returned by either Interrupt, StatefulInterrupt or CompositeInterrupt,
6868
// it will be returned as-is without wrapping
69-
func WrapInterruptAndRerunIfNeeded(ctx context.Context, step PathStep, err error) error {
70-
path, _ := GetCurrentPath(ctx)
71-
newPath := append(append([]PathStep{}, path...), step)
69+
func WrapInterruptAndRerunIfNeeded(ctx context.Context, step AddressSegment, err error) error {
70+
addr, _ := GetCurrentAddress(ctx)
71+
newAddr := append(append([]AddressSegment{}, addr...), step)
7272
if errors.Is(err, InterruptAndRerun) {
7373
return &wrappedInterruptAndRerun{
74-
ps: newPath,
74+
ps: newAddr,
7575
inner: err,
7676
}
7777
}
@@ -80,7 +80,7 @@ func WrapInterruptAndRerunIfNeeded(ctx context.Context, step PathStep, err error
8080
if errors.As(err, &ire) {
8181
if ire.path == nil {
8282
return &wrappedInterruptAndRerun{
83-
ps: newPath,
83+
ps: newAddr,
8484
inner: err,
8585
}
8686
}
@@ -92,60 +92,60 @@ func WrapInterruptAndRerunIfNeeded(ctx context.Context, step PathStep, err error
9292
return ie
9393
}
9494

95-
return fmt.Errorf("failed to wrap error as pathed InterruptAndRerun: %w", err)
95+
return fmt.Errorf("failed to wrap error as addressed InterruptAndRerun: %w", err)
9696
}
9797

98-
// Interrupt creates a special error that signals the graph execution engine to interrupt
99-
// the current run at the component's specific path and save a checkpoint.
98+
// Interrupt creates a special error that signals the execution engine to interrupt
99+
// the current run at the component's specific address and save a checkpoint.
100100
//
101101
// This is the standard way for a single, non-composite component to signal a resumable interruption.
102102
//
103-
// - ctx: The context of the running component, used to retrieve the current execution path.
103+
// - ctx: The context of the running component, used to retrieve the current execution address.
104104
// - info: User-facing information about the interrupt. This is not persisted but is exposed to the
105105
// calling application via the InterruptCtx to provide context (e.g., a reason for the pause).
106106
func Interrupt(ctx context.Context, info any) error {
107107
var interruptID string
108-
path, pathExist := GetCurrentPath(ctx)
109-
if pathExist {
110-
interruptID = path.String()
108+
addr, addrExist := GetCurrentAddress(ctx)
109+
if addrExist {
110+
interruptID = addr.String()
111111
}
112112

113-
return &interruptAndRerun{info: info, interruptID: &interruptID, path: path}
113+
return &interruptAndRerun{info: info, interruptID: &interruptID, path: addr}
114114
}
115115

116-
// StatefulInterrupt creates a special error that signals the graph execution engine to interrupt
117-
// the current run at the component's specific path and save a checkpoint.
116+
// StatefulInterrupt creates a special error that signals the execution engine to interrupt
117+
// the current run at the component's specific address and save a checkpoint.
118118
//
119119
// This is the standard way for a single, non-composite component to signal a resumable interruption.
120120
//
121-
// - ctx: The context of the running component, used to retrieve the current execution path.
121+
// - ctx: The context of the running component, used to retrieve the current execution address.
122122
// - info: User-facing information about the interrupt. This is not persisted but is exposed to the
123123
// calling application via the InterruptCtx to provide context (e.g., a reason for the pause).
124124
// - state: The internal state that the interrupting component needs to persist to be able to resume
125125
// its work later. This state is saved in the checkpoint and will be provided back to the component
126126
// upon resumption via GetInterruptState.
127127
func StatefulInterrupt(ctx context.Context, info any, state any) error {
128128
var interruptID string
129-
path, pathExist := GetCurrentPath(ctx)
130-
if pathExist {
131-
interruptID = path.String()
129+
addr, addrExist := GetCurrentAddress(ctx)
130+
if addrExist {
131+
interruptID = addr.String()
132132
}
133133

134-
return &interruptAndRerun{info: info, state: state, interruptID: &interruptID, path: path}
134+
return &interruptAndRerun{info: info, state: state, interruptID: &interruptID, path: addr}
135135
}
136136

137137
type interruptAndRerun struct {
138138
info any // for end-user, probably the human-being
139139
state any // for persistence, when resuming, use GetInterruptState to fetch it at the interrupt location
140140
interruptID *string
141-
path Path
141+
path Address
142142
errs []*interruptAndRerun
143143
}
144144

145145
// CompositeInterrupt creates a special error that signals a composite interruption.
146146
// It is designed for "composite" nodes (like ToolsNode) that manage multiple, independent,
147147
// interruptible sub-processes. It bundles multiple sub-interrupt errors into a single error
148-
// that the graph engine can deconstruct into a flat list of resumable points.
148+
// that the engine can deconstruct into a flat list of resumable points.
149149
//
150150
// This function is robust and can handle several types of errors from sub-processes:
151151
//
@@ -155,7 +155,7 @@ type interruptAndRerun struct {
155155
//
156156
// - An error containing `InterruptInfo` returned by a `Runnable` (e.g., a Graph within a lambda node).
157157
//
158-
// - An error returned by 'WrapInterruptAndRerunIfNeeded' for the legacy InterruptAndRerun error,
158+
// - An error returned by \'WrapInterruptAndRerunIfNeeded\' for the legacy InterruptAndRerun error,
159159
// and for the error returned by the deprecated NewInterruptAndRerunErr.
160160
//
161161
// Parameters:
@@ -178,7 +178,7 @@ type interruptAndRerun struct {
178178
// the deprecated NewInterruptAndRerunErr function, you must wrap it using WrapInterruptAndRerunIfNeeded first
179179
// before passing them into this function.
180180
func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error) error {
181-
path, _ := GetCurrentPath(ctx)
181+
addr, _ := GetCurrentAddress(ctx)
182182
var cErrs []*interruptAndRerun
183183
for _, err := range errs {
184184
wrapped := &wrappedInterruptAndRerun{}
@@ -228,11 +228,11 @@ func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error)
228228

229229
return fmt.Errorf("composite interrupt but one of the sub error is not interrupt and rerun error: %w", err)
230230
}
231-
return &interruptAndRerun{errs: cErrs, path: path, state: state, info: info}
231+
return &interruptAndRerun{errs: cErrs, path: addr, state: state, info: info}
232232
}
233233

234234
func (i *interruptAndRerun) Error() string {
235-
return fmt.Sprintf("interrupt and rerun: %v for path: %s", i.info, i.path.String())
235+
return fmt.Sprintf("interrupt and rerun: %v for address: %s", i.info, i.path.String())
236236
}
237237

238238
func IsInterruptRerunError(err error) (any, bool) {
@@ -261,27 +261,27 @@ type InterruptInfo struct {
261261
InterruptContexts []*InterruptCtx
262262
}
263263

264-
// PathStepType defines the type of a segment in an interrupt path.
265-
type PathStepType string
264+
// AddressSegmentType defines the type of a segment in an execution address.
265+
type AddressSegmentType string
266266

267267
const (
268-
// PathStepNode represents a segment of a path that corresponds to a graph node.
269-
PathStepNode PathStepType = "node"
270-
// PathStepTool represents a segment of a path that corresponds to a specific tool call within a ToolsNode.
271-
PathStepTool PathStepType = "tool"
272-
// PathStepRunnable represents a segment of a path that corresponds to an instance of the Runnable interface.
268+
// AddressSegmentNode represents a segment of an address that corresponds to a graph node.
269+
AddressSegmentNode AddressSegmentType = "node"
270+
// AddressSegmentTool represents a segment of an address that corresponds to a specific tool call within a ToolsNode.
271+
AddressSegmentTool AddressSegmentType = "tool"
272+
// AddressSegmentRunnable represents a segment of an address that corresponds to an instance of the Runnable interface.
273273
// Currently the possible Runnable types are: Graph, Workflow and Chain.
274274
// Note that for sub-graphs added through AddGraphNode to another graph is not a Runnable.
275-
// So a PathStepRunnable indicates a standalone Root level Graph,
275+
// So a AddressSegmentRunnable indicates a standalone Root level Graph,
276276
// or a Root level Graph inside a node such as Lambda node.
277-
PathStepRunnable PathStepType = "runnable"
277+
AddressSegmentRunnable AddressSegmentType = "runnable"
278278
)
279279

280-
// Path represents a full, hierarchical path to an interrupt point.
281-
type Path []PathStep
280+
// Address represents a full, hierarchical address to a point in the execution structure.
281+
type Address []AddressSegment
282282

283-
// String converts a Path into its unique string representation.
284-
func (p Path) String() string {
283+
// String converts an Address into its unique string representation.
284+
func (p Address) String() string {
285285
var sb strings.Builder
286286
for i, s := range p {
287287
sb.WriteString(string(s.Type))
@@ -294,7 +294,7 @@ func (p Path) String() string {
294294
return sb.String()
295295
}
296296

297-
func (p Path) Equals(other Path) bool {
297+
func (p Address) Equals(other Address) bool {
298298
if len(p) != len(other) {
299299
return false
300300
}
@@ -306,23 +306,32 @@ func (p Path) Equals(other Path) bool {
306306
return true
307307
}
308308

309-
// PathStep represents a single segment in the hierarchical path to an interrupt point.
310-
// A sequence of PathSegments uniquely identifies a location within a potentially nested graph structure.
311-
type PathStep struct {
312-
// Type indicates whether this path segment is a graph node or a tool call.
313-
Type PathStepType
309+
func (p Address) DeepCopy() Address {
310+
if p == nil {
311+
return nil
312+
}
313+
cpy := make(Address, len(p))
314+
copy(cpy, p)
315+
return cpy
316+
}
317+
318+
// AddressSegment represents a single segment in the hierarchical address of an execution point.
319+
// A sequence of AddressSegments uniquely identifies a location within a potentially nested structure.
320+
type AddressSegment struct {
321+
// Type indicates whether this address segment is a graph node, a tool call, an agent, etc.
322+
Type AddressSegmentType
314323
// ID is the unique identifier for this segment, e.g., the node's key or the tool call's ID.
315324
ID string
316325
}
317326

318327
// InterruptCtx provides a complete, user-facing context for a single, resumable interrupt point.
319328
type InterruptCtx struct {
320-
// ID is the unique, fully-qualified path to the interrupt point.
321-
// It is constructed by joining the individual Path segments, e.g., "node:graph_a;node:tools;tool:tool_call_123".
329+
// ID is the unique, fully-qualified address of the interrupt point.
330+
// It is constructed by joining the individual Address segments, e.g., "agent:A;node:graph_a;tool:tool_call_123".
322331
// This ID should be used when providing resume data via ResumeWithData.
323332
ID string
324-
// Path is the structured sequence of PathStep segments that leads to the interrupt point.
325-
Path Path
333+
// Path is the structured sequence of AddressSegment segments that leads to the interrupt point.
334+
Path Address
326335
// Info is the user-facing information associated with the interrupt, provided by the component that triggered it.
327336
Info any
328337
}
@@ -365,7 +374,7 @@ type subGraphInterruptError struct {
365374
Info *InterruptInfo
366375
CheckPoint *checkpoint
367376

368-
InterruptPoints []*interruptStateForPath
377+
InterruptPoints []*interruptStateForAddress
369378
InterruptContexts []*InterruptCtx
370379
}
371380

0 commit comments

Comments
 (0)