diff --git a/bold/containers/events/producer.go b/bold/containers/events/producer.go index 2df0605773..abbd3909da 100644 --- a/bold/containers/events/producer.go +++ b/bold/containers/events/producer.go @@ -22,6 +22,7 @@ type Producer[T any] struct { subs []*Subscription[T] doneListener chan subId // channel to listen for IDs of subscriptions to be remove. broadcastTimeout time.Duration // maximum duration to wait for an event to be sent. + nextId subId // monotonically increasing id for stable subscription identification } type ProducerOpt[T any] func(*Producer[T]) @@ -60,13 +61,17 @@ func (ep *Producer[T]) Start(ctx context.Context) { select { case id := <-ep.doneListener: ep.Lock() - // Check if id overflows the length of the slice. - if int(id) >= len(ep.subs) { - ep.Unlock() - continue + // Find the subscription by stable id and remove it if present. + idx := -1 + for i, s := range ep.subs { + if s.id == id { + idx = i + break + } + } + if idx >= 0 { + ep.subs = append(ep.subs[:idx], ep.subs[idx+1:]...) } - // Otherwise, clear the subscription from the list. - ep.subs = append(ep.subs[:id], ep.subs[id+1:]...) ep.Unlock() case <-ctx.Done(): close(ep.doneListener) @@ -82,10 +87,11 @@ func (ep *Producer[T]) Subscribe() *Subscription[T] { ep.Lock() defer ep.Unlock() sub := &Subscription[T]{ - id: subId(len(ep.subs)), // Assign a unique ID based on the current count of subscriptions + id: ep.nextId, // Assign a stable, monotonically increasing ID events: make(chan T), done: ep.doneListener, } + ep.nextId++ ep.subs = append(ep.subs, sub) return sub } diff --git a/bold/containers/events/producer_test.go b/bold/containers/events/producer_test.go index 1b09a1f9b4..0c42a5f4b8 100644 --- a/bold/containers/events/producer_test.go +++ b/bold/containers/events/producer_test.go @@ -68,3 +68,44 @@ func TestEventProducer_Start(t *testing.T) { t.Error("Expected to end after context cancellation") } } + +func TestRemovalUsesStableId(t *testing.T) { + // This test ensures that removing subscriptions uses stable IDs rather than slice indices. + // Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly + // remove the first (index 0) and the third (now at index 1 after compaction), leaving + // the second subscription in place instead of the third. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + producer := NewProducer[int]() + go producer.Start(ctx) + + s0 := producer.Subscribe() + s1 := producer.Subscribe() + s2 := producer.Subscribe() + + // Cancel first two subscriptions; they will send their IDs to doneListener via Next. + for _, s := range []*Subscription[int]{s0, s1} { + c, cancelSub := context.WithCancel(context.Background()) + cancelSub() + _, shouldEnd := s.Next(c) + require.True(t, shouldEnd) + } + + // Wait until the producer processes removal and only one subscription remains. + deadline := time.Now().Add(2 * time.Second) + for { + producer.RLock() + remaining := len(producer.subs) + producer.RUnlock() + if remaining == 1 || time.Now().After(deadline) { + break + } + time.Sleep(5 * time.Millisecond) + } + + producer.RLock() + require.Equal(t, 1, len(producer.subs)) + require.Same(t, s2, producer.subs[0]) + producer.RUnlock() +}