Skip to content

Commit eb87914

Browse files
authored
add an option to disable the deadline timeout for the FSM broadcast (#19)
* add an option to disable the deadline timeout for the FSM broadcast
1 parent 96c45be commit eb87914

File tree

3 files changed

+91
-27
lines changed

3 files changed

+91
-27
lines changed

README.md

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,18 @@ currentState := machine.GetState()
134134

135135
### State Change Notifications
136136

137+
#### Broadcast Modes
138+
139+
Subscribers can operate in three different modes based on their timeout setting:
140+
141+
**Async Mode (timeout=0)**: State updates are dropped if the channel is full. Non-blocking transitions. This is the default behavior.
142+
143+
**Sync Mode (positive timeout)**: Blocks state transitions until all sync subscribers read the update or timeout. Never drops state updates unless timeout is reached.
144+
145+
**Infinite Blocking Mode (negative timeout)**: Blocks state transitions indefinitely until all infinite subscribers read the update. Never drops state updates or times out.
146+
137147
```go
138-
// Get notification channel
148+
// Get notification channel with default async behavior (timeout=0)
139149
ctx, cancel := context.WithCancel(context.Background())
140150
defer cancel()
141151

@@ -149,18 +159,23 @@ go func() {
149159
}
150160
}()
151161

152-
// Alternative: Add a direct subscriber channel
153-
stateCh := make(chan string, 1)
154-
unsubscribe := machine.AddSubscriber(stateCh)
162+
// Use sync broadcast with a 10s timeout (WithSyncBroadcast is a shortcut for settings a 10s timeout)
163+
syncChan := machine.GetStateChanWithOptions(ctx, fsm.WithSyncBroadcast())
155164

156-
// Process state updates in a separate goroutine
165+
// Use sync broadcast with 1hr custom timeout
166+
timeoutChan := machine.GetStateChanWithOptions(ctx, fsm.WithSyncTimeout(1*time.Hour))
167+
168+
// Use infinite blocking (never times out)
169+
infiniteChan := machine.GetStateChanWithOptions(ctx,
170+
fsm.WithSyncTimeout(-1),
171+
)
172+
173+
// Read and print all state changes from the channel
157174
go func() {
158-
for state := range stateCh {
159-
fmt.Println("State updated:", state)
175+
for state := range syncChan {
176+
fmt.Println("State:", state)
160177
}
161178
}()
162-
163-
defer unsubscribe() // Call to stop receiving updates
164179
```
165180

166181
## Complete Example

stateChan.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import (
2323
)
2424

2525
const (
26-
defaultSyncTimeout = 10 * time.Second
26+
defaultAsyncTimeout = 0
27+
defaultSyncTimeout = 10 * time.Second
2728
)
2829

2930
// SubscriberOption configures subscriber behavior
@@ -33,7 +34,6 @@ type SubscriberOption func(*subscriberConfig)
3334
type subscriberConfig struct {
3435
sendInitial bool
3536
customChannel chan string
36-
syncBroadcast bool
3737
syncTimeout time.Duration
3838
}
3939

@@ -62,7 +62,7 @@ func WithCustomChannel(ch chan string) SubscriberOption {
6262
// is delivered to the channel, rather than dropping messages for full channels
6363
func WithSyncBroadcast() SubscriberOption {
6464
return func(config *subscriberConfig) {
65-
config.syncBroadcast = true
65+
config.syncTimeout = defaultSyncTimeout
6666
}
6767
}
6868

@@ -85,7 +85,7 @@ func (fsm *Machine) GetStateChanWithOptions(
8585
) <-chan string {
8686
config := &subscriberConfig{
8787
sendInitial: true,
88-
syncTimeout: defaultSyncTimeout,
88+
syncTimeout: defaultAsyncTimeout,
8989
}
9090

9191
for _, opt := range opts {
@@ -147,7 +147,7 @@ func (fsm *Machine) GetStateChanBuffer(ctx context.Context, chanBufferSize int)
147147
func (fsm *Machine) AddSubscriber(ch chan string) func() {
148148
config := &subscriberConfig{
149149
sendInitial: true,
150-
syncTimeout: defaultSyncTimeout,
150+
syncTimeout: defaultAsyncTimeout,
151151
}
152152
return fsm.addSubscriberWithConfig(ch, config)
153153
}
@@ -183,8 +183,9 @@ func (fsm *Machine) unsubscribe(ch chan string) {
183183
}
184184

185185
// broadcast sends the new state to all subscriber channels.
186-
// For async subscribers, if a channel is full, the state change is skipped for that channel, and a warning is logged.
187-
// For sync subscribers, the broadcast blocks until the message is delivered or times out after 10 seconds.
186+
// Subscribers with negative timeout block indefinitely until delivered.
187+
// Subscribers with timeout=0 behave asynchronously (drop messages if channel is full).
188+
// Subscribers with positive timeout block until delivered or timeout.
188189
// This, and the other subscriber-related methods, use a standard mutex instead of an RWMutex,
189190
// because the broadcast sends should always be serial, and never concurrent, otherwise the order
190191
// of state change notifications could be unpredictable.
@@ -208,21 +209,15 @@ func (fsm *Machine) broadcast(state string) {
208209
return true
209210
}
210211

211-
if config.syncBroadcast {
212-
// Handle sync broadcast with timeout in parallel goroutines
212+
if config.syncTimeout < 0 {
213+
// Handle infinite blocking broadcast
213214
wg.Add(1)
214215
go func(ch chan string) {
215216
defer wg.Done()
216-
select {
217-
case ch <- state:
218-
logger.Debug("State delivered to synchronous subscriber")
219-
case <-time.After(config.syncTimeout):
220-
logger.Warn("Synchronous subscriber blocked; state delivery timed out",
221-
"timeout", config.syncTimeout,
222-
"channel_capacity", cap(ch), "channel_length", len(ch))
223-
}
217+
ch <- state
218+
logger.Debug("State delivered to blocking subscriber")
224219
}(ch)
225-
} else {
220+
} else if config.syncTimeout == 0 {
226221
// Handle async broadcast (non-blocking)
227222
select {
228223
case ch <- state:
@@ -231,6 +226,20 @@ func (fsm *Machine) broadcast(state string) {
231226
logger.Debug("Asynchronous subscriber channel full; state delivery skipped",
232227
"channel_capacity", cap(ch), "channel_length", len(ch))
233228
}
229+
} else {
230+
// Handle sync broadcast with positive timeout
231+
wg.Add(1)
232+
go func(ch chan string, timeout time.Duration) {
233+
defer wg.Done()
234+
select {
235+
case ch <- state:
236+
logger.Debug("State delivered to synchronous subscriber")
237+
case <-time.After(timeout):
238+
logger.Warn("Synchronous subscriber blocked; state delivery timed out",
239+
"timeout", timeout,
240+
"channel_capacity", cap(ch), "channel_length", len(ch))
241+
}
242+
}(ch, config.syncTimeout)
234243
}
235244
return true // continue iteration
236245
})

stateChan_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,4 +1139,44 @@ func TestFSM_GetStateChanWithOptions(t *testing.T) {
11391139
}
11401140
}, time.Second, 10*time.Millisecond, "Async subscriber should receive state despite blocked sync subscriber")
11411141
})
1142+
1143+
t.Run("Negative timeout blocks indefinitely", func(t *testing.T) {
1144+
if testing.Short() {
1145+
t.Skip("Skipping in short mode")
1146+
}
1147+
1148+
fsm, err := New(nil, StatusNew, TypicalTransitions)
1149+
require.NoError(t, err)
1150+
ctx := t.Context()
1151+
1152+
// Create a permanently blocked sync subscriber with negative timeout
1153+
blockedSyncCh := make(chan string) // Unbuffered, no readers
1154+
fsm.GetStateChanWithOptions(ctx,
1155+
WithSyncTimeout(-1), // Negative timeout should block indefinitely
1156+
WithCustomChannel(blockedSyncCh),
1157+
WithoutInitialState(),
1158+
)
1159+
1160+
// Create a normal async subscriber that should not be blocked
1161+
asyncCh := fsm.GetStateChanWithOptions(ctx, WithBufferSize(2))
1162+
<-asyncCh // Consume initial state
1163+
1164+
// Start transition in a goroutine since it should block indefinitely
1165+
transitionDone := make(chan struct{})
1166+
go func() {
1167+
defer close(transitionDone)
1168+
err := fsm.Transition(StatusBooting)
1169+
require.NoError(t, err)
1170+
}()
1171+
1172+
// Transition should never complete within 11 seconds due to negative timeout
1173+
assert.Never(t, func() bool {
1174+
select {
1175+
case <-transitionDone:
1176+
return true
1177+
default:
1178+
return false
1179+
}
1180+
}, 11*time.Second, 100*time.Millisecond, "Transition should not complete with negative timeout blocking indefinitely")
1181+
})
11421182
}

0 commit comments

Comments
 (0)