Skip to content
Merged
20 changes: 13 additions & 7 deletions bold/containers/events/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Producer[T any] struct {
subs []*Subscription[T]
doneListener chan subId // channel to listen for IDs of subscriptions to be remove.
broadcastTimeout time.Duration // maximum duration to wait for an event to be sent.
nextId subId // monotonically increasing id for stable subscription identification
}

type ProducerOpt[T any] func(*Producer[T])
Expand Down Expand Up @@ -60,13 +61,17 @@ func (ep *Producer[T]) Start(ctx context.Context) {
select {
case id := <-ep.doneListener:
ep.Lock()
// Check if id overflows the length of the slice.
if int(id) >= len(ep.subs) {
ep.Unlock()
continue
// Find the subscription by stable id and remove it if present.
idx := -1
for i, s := range ep.subs {
if s.id == id {
idx = i
break
}
}
if idx >= 0 {
ep.subs = append(ep.subs[:idx], ep.subs[idx+1:]...)
}
// Otherwise, clear the subscription from the list.
ep.subs = append(ep.subs[:id], ep.subs[id+1:]...)
ep.Unlock()
case <-ctx.Done():
close(ep.doneListener)
Expand All @@ -82,10 +87,11 @@ func (ep *Producer[T]) Subscribe() *Subscription[T] {
ep.Lock()
defer ep.Unlock()
sub := &Subscription[T]{
id: subId(len(ep.subs)), // Assign a unique ID based on the current count of subscriptions
id: ep.nextId, // Assign a stable, monotonically increasing ID
events: make(chan T),
done: ep.doneListener,
}
ep.nextId++
ep.subs = append(ep.subs, sub)
return sub
}
Expand Down
41 changes: 41 additions & 0 deletions bold/containers/events/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,44 @@ func TestEventProducer_Start(t *testing.T) {
t.Error("Expected to end after context cancellation")
}
}

func TestRemovalUsesStableId(t *testing.T) {
// This test ensures that removing subscriptions uses stable IDs rather than slice indices.
// Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly
// remove the first (index 0) and the third (now at index 1 after compaction), leaving
// the second subscription in place instead of the third.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

producer := NewProducer[int]()
go producer.Start(ctx)

s0 := producer.Subscribe()
s1 := producer.Subscribe()
s2 := producer.Subscribe()

// Cancel first two subscriptions; they will send their IDs to doneListener via Next.
for _, s := range []*Subscription[int]{s0, s1} {
c, cancelSub := context.WithCancel(context.Background())
cancelSub()
_, shouldEnd := s.Next(c)
require.True(t, shouldEnd)
}

// Wait until the producer processes removal and only one subscription remains.
deadline := time.Now().Add(2 * time.Second)
for {
producer.RLock()
remaining := len(producer.subs)
producer.RUnlock()
if remaining == 1 || time.Now().After(deadline) {
break
}
time.Sleep(5 * time.Millisecond)
}

producer.RLock()
require.Equal(t, 1, len(producer.subs))
require.Same(t, s2, producer.subs[0])
producer.RUnlock()
}
Loading