From c7b203e31f6a575528fde4537b0c8c56f534c9ac Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Tue, 2 Sep 2025 16:31:26 +0300 Subject: [PATCH 1/6] fix(events): use stable subscription ids and remove by id --- bold/containers/events/producer.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/bold/containers/events/producer.go b/bold/containers/events/producer.go index 2df0605773..abbd3909da 100644 --- a/bold/containers/events/producer.go +++ b/bold/containers/events/producer.go @@ -22,6 +22,7 @@ type Producer[T any] struct { subs []*Subscription[T] doneListener chan subId // channel to listen for IDs of subscriptions to be remove. broadcastTimeout time.Duration // maximum duration to wait for an event to be sent. + nextId subId // monotonically increasing id for stable subscription identification } type ProducerOpt[T any] func(*Producer[T]) @@ -60,13 +61,17 @@ func (ep *Producer[T]) Start(ctx context.Context) { select { case id := <-ep.doneListener: ep.Lock() - // Check if id overflows the length of the slice. - if int(id) >= len(ep.subs) { - ep.Unlock() - continue + // Find the subscription by stable id and remove it if present. + idx := -1 + for i, s := range ep.subs { + if s.id == id { + idx = i + break + } + } + if idx >= 0 { + ep.subs = append(ep.subs[:idx], ep.subs[idx+1:]...) } - // Otherwise, clear the subscription from the list. - ep.subs = append(ep.subs[:id], ep.subs[id+1:]...) ep.Unlock() case <-ctx.Done(): close(ep.doneListener) @@ -82,10 +87,11 @@ func (ep *Producer[T]) Subscribe() *Subscription[T] { ep.Lock() defer ep.Unlock() sub := &Subscription[T]{ - id: subId(len(ep.subs)), // Assign a unique ID based on the current count of subscriptions + id: ep.nextId, // Assign a stable, monotonically increasing ID events: make(chan T), done: ep.doneListener, } + ep.nextId++ ep.subs = append(ep.subs, sub) return sub } From a3c22c9f0a4ff31b573e8dd0069d2068c6997f7a Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Tue, 9 Sep 2025 14:45:56 +0300 Subject: [PATCH 2/6] Update producer.go --- bold/containers/events/producer.go | 195 ++++++++++++----------------- 1 file changed, 83 insertions(+), 112 deletions(-) diff --git a/bold/containers/events/producer.go b/bold/containers/events/producer.go index abbd3909da..5dd82d7b16 100644 --- a/bold/containers/events/producer.go +++ b/bold/containers/events/producer.go @@ -6,135 +6,106 @@ package events import ( "context" - "sync" + "testing" "time" -) -const ( - defaultBroadcastTimeout = time.Millisecond * 500 - defaultSubscriptionBufferSize = 10 + "github.com/stretchr/testify/require" ) -// Producer manages event subscriptions and broadcasts events to them. -type Producer[T any] struct { - sync.RWMutex - subscriptionBufferSize int - subs []*Subscription[T] - doneListener chan subId // channel to listen for IDs of subscriptions to be remove. - broadcastTimeout time.Duration // maximum duration to wait for an event to be sent. - nextId subId // monotonically increasing id for stable subscription identification +func TestSubscribe(t *testing.T) { + producer := NewProducer[int]() + sub := producer.Subscribe() + require.Equal(t, 1, len(producer.subs)) + require.NotNil(t, sub) } -type ProducerOpt[T any] func(*Producer[T]) - -// WithBroadcastTimeout enables the amount of time the broadcaster will wait to send -// to each subscriber before dropping the send. -func WithBroadcastTimeout[T any](timeout time.Duration) ProducerOpt[T] { - return func(ep *Producer[T]) { - ep.broadcastTimeout = timeout +func TestBroadcast(t *testing.T) { + producer := NewProducer[int]() + sub := producer.Subscribe() + done := make(chan bool) + go func() { + event, shouldEnd := sub.Next(context.Background()) + require.False(t, shouldEnd) + require.Equal(t, 42, event) + done <- true + }() + ctx := context.Background() + producer.Broadcast(ctx, 42) + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Test timed out waiting for event") } } -// WithSubscriptionBuffer customizes the size of the subscription buffer channel. -func WithSubscriptionBuffer[T any](size int) ProducerOpt[T] { - return func(ep *Producer[T]) { - ep.subscriptionBufferSize = size - } -} +func TestBroadcastTimeout(t *testing.T) { + timeout := 50 * time.Millisecond + producer := NewProducer(WithBroadcastTimeout[int](timeout)) + sub := producer.Subscribe() -func NewProducer[T any](opts ...ProducerOpt[T]) *Producer[T] { - producer := &Producer[T]{ - subs: make([]*Subscription[T], 0), - subscriptionBufferSize: defaultSubscriptionBufferSize, - doneListener: make(chan subId, 100), - broadcastTimeout: defaultBroadcastTimeout, - } - for _, opt := range opts { - opt(producer) - } - return producer -} + go func() { + // Delay sending to simulate timeout scenario + time.Sleep(100 * time.Millisecond) + sub.events <- 42 + }() -// Start begins listening for subscription cancelation requests or context cancelation. -func (ep *Producer[T]) Start(ctx context.Context) { - for { - select { - case id := <-ep.doneListener: - ep.Lock() - // Find the subscription by stable id and remove it if present. - idx := -1 - for i, s := range ep.subs { - if s.id == id { - idx = i - break - } - } - if idx >= 0 { - ep.subs = append(ep.subs[:idx], ep.subs[idx+1:]...) - } - ep.Unlock() - case <-ctx.Done(): - close(ep.doneListener) - ep.subs = nil - return - } - } + event, shouldEnd := sub.Next(context.Background()) + require.False(t, shouldEnd) + require.Equal(t, 42, event) } -// Subscribe returns a handle to a new event subscription, -// adding it to the list of active subscriptions. -func (ep *Producer[T]) Subscribe() *Subscription[T] { - ep.Lock() - defer ep.Unlock() - sub := &Subscription[T]{ - id: ep.nextId, // Assign a stable, monotonically increasing ID - events: make(chan T), - done: ep.doneListener, - } - ep.nextId++ - ep.subs = append(ep.subs, sub) - return sub -} +func TestEventProducer_Start(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + producer := NewProducer[int]() + go producer.Start(ctx) -// Broadcast sends an event to all active subscriptions, respecting a configured timeout or context. -// It spawns goroutines to send events to each subscription so as to not block the producer to submitting -// to all consumers. Broadcast should be used if not all consumers are expected to consume the event, -// within a reasonable time, or if the configured broadcast timeout is short enough. -func (ep *Producer[T]) Broadcast(ctx context.Context, event T) { - ep.RLock() - defer ep.RUnlock() - for _, sub := range ep.subs { - go func(listener *Subscription[T]) { - select { - case listener.events <- event: - case <-time.After(ep.broadcastTimeout): - case <-ctx.Done(): - } - }(sub) + sub := producer.Subscribe() + + // Simulate removing the subscription. + cancel() + _, shouldEnd := sub.Next(ctx) + if !shouldEnd { + t.Error("Expected to end after context cancellation") } } -type subId int +func TestRemovalUsesStableId(t *testing.T) { + // This test ensures that removing subscriptions uses stable IDs rather than slice indices. + // Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly + // remove the first (index 0) and the third (now at index 1 after compaction), leaving + // the second subscription in place instead of the third. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() -// Subscription defines a generic handle to a subscription of -// events from a producer. -type Subscription[T any] struct { - id subId - events chan T - done chan subId -} + producer := NewProducer[int]() + go producer.Start(ctx) -// Next waits for the next event or context cancelation, returning the event or an error. -func (es *Subscription[T]) Next(ctx context.Context) (T, bool) { - var zeroVal T - for { - select { - case ev := <-es.events: - return ev, false - case <-ctx.Done(): - es.done <- es.id - close(es.events) - return zeroVal, true - } - } + s0 := producer.Subscribe() + s1 := producer.Subscribe() + s2 := producer.Subscribe() + + // Cancel first two subscriptions; they will send their IDs to doneListener via Next. + for _, s := range []*Subscription[int]{s0, s1} { + c, cancelSub := context.WithCancel(context.Background()) + cancelSub() + _, shouldEnd := s.Next(c) + require.True(t, shouldEnd) + } + + // Wait until the producer processes removal and only one subscription remains. + deadline := time.Now().Add(2 * time.Second) + for { + producer.RLock() + remaining := len(producer.subs) + producer.RUnlock() + if remaining == 1 || time.Now().After(deadline) { + break + } + time.Sleep(5 * time.Millisecond) + } + + producer.RLock() + require.Equal(t, 1, len(producer.subs)) + require.Same(t, s2, producer.subs[0]) + producer.RUnlock() } From eaab3b9dbe8ff656a9e748c459d96a2e535fd177 Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Thu, 11 Sep 2025 10:36:35 +0300 Subject: [PATCH 3/6] Update producer.go --- bold/containers/events/producer.go | 195 +++++++++++++++++------------ 1 file changed, 112 insertions(+), 83 deletions(-) diff --git a/bold/containers/events/producer.go b/bold/containers/events/producer.go index 5dd82d7b16..abbd3909da 100644 --- a/bold/containers/events/producer.go +++ b/bold/containers/events/producer.go @@ -6,106 +6,135 @@ package events import ( "context" - "testing" + "sync" "time" +) - "github.com/stretchr/testify/require" +const ( + defaultBroadcastTimeout = time.Millisecond * 500 + defaultSubscriptionBufferSize = 10 ) -func TestSubscribe(t *testing.T) { - producer := NewProducer[int]() - sub := producer.Subscribe() - require.Equal(t, 1, len(producer.subs)) - require.NotNil(t, sub) +// Producer manages event subscriptions and broadcasts events to them. +type Producer[T any] struct { + sync.RWMutex + subscriptionBufferSize int + subs []*Subscription[T] + doneListener chan subId // channel to listen for IDs of subscriptions to be remove. + broadcastTimeout time.Duration // maximum duration to wait for an event to be sent. + nextId subId // monotonically increasing id for stable subscription identification } -func TestBroadcast(t *testing.T) { - producer := NewProducer[int]() - sub := producer.Subscribe() - done := make(chan bool) - go func() { - event, shouldEnd := sub.Next(context.Background()) - require.False(t, shouldEnd) - require.Equal(t, 42, event) - done <- true - }() - ctx := context.Background() - producer.Broadcast(ctx, 42) - select { - case <-done: - case <-time.After(2 * time.Second): - t.Fatal("Test timed out waiting for event") +type ProducerOpt[T any] func(*Producer[T]) + +// WithBroadcastTimeout enables the amount of time the broadcaster will wait to send +// to each subscriber before dropping the send. +func WithBroadcastTimeout[T any](timeout time.Duration) ProducerOpt[T] { + return func(ep *Producer[T]) { + ep.broadcastTimeout = timeout } } -func TestBroadcastTimeout(t *testing.T) { - timeout := 50 * time.Millisecond - producer := NewProducer(WithBroadcastTimeout[int](timeout)) - sub := producer.Subscribe() - - go func() { - // Delay sending to simulate timeout scenario - time.Sleep(100 * time.Millisecond) - sub.events <- 42 - }() - - event, shouldEnd := sub.Next(context.Background()) - require.False(t, shouldEnd) - require.Equal(t, 42, event) +// WithSubscriptionBuffer customizes the size of the subscription buffer channel. +func WithSubscriptionBuffer[T any](size int) ProducerOpt[T] { + return func(ep *Producer[T]) { + ep.subscriptionBufferSize = size + } } -func TestEventProducer_Start(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - producer := NewProducer[int]() - go producer.Start(ctx) - - sub := producer.Subscribe() - - // Simulate removing the subscription. - cancel() - _, shouldEnd := sub.Next(ctx) - if !shouldEnd { - t.Error("Expected to end after context cancellation") +func NewProducer[T any](opts ...ProducerOpt[T]) *Producer[T] { + producer := &Producer[T]{ + subs: make([]*Subscription[T], 0), + subscriptionBufferSize: defaultSubscriptionBufferSize, + doneListener: make(chan subId, 100), + broadcastTimeout: defaultBroadcastTimeout, + } + for _, opt := range opts { + opt(producer) } + return producer } -func TestRemovalUsesStableId(t *testing.T) { - // This test ensures that removing subscriptions uses stable IDs rather than slice indices. - // Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly - // remove the first (index 0) and the third (now at index 1 after compaction), leaving - // the second subscription in place instead of the third. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +// Start begins listening for subscription cancelation requests or context cancelation. +func (ep *Producer[T]) Start(ctx context.Context) { + for { + select { + case id := <-ep.doneListener: + ep.Lock() + // Find the subscription by stable id and remove it if present. + idx := -1 + for i, s := range ep.subs { + if s.id == id { + idx = i + break + } + } + if idx >= 0 { + ep.subs = append(ep.subs[:idx], ep.subs[idx+1:]...) + } + ep.Unlock() + case <-ctx.Done(): + close(ep.doneListener) + ep.subs = nil + return + } + } +} - producer := NewProducer[int]() - go producer.Start(ctx) +// Subscribe returns a handle to a new event subscription, +// adding it to the list of active subscriptions. +func (ep *Producer[T]) Subscribe() *Subscription[T] { + ep.Lock() + defer ep.Unlock() + sub := &Subscription[T]{ + id: ep.nextId, // Assign a stable, monotonically increasing ID + events: make(chan T), + done: ep.doneListener, + } + ep.nextId++ + ep.subs = append(ep.subs, sub) + return sub +} - s0 := producer.Subscribe() - s1 := producer.Subscribe() - s2 := producer.Subscribe() +// Broadcast sends an event to all active subscriptions, respecting a configured timeout or context. +// It spawns goroutines to send events to each subscription so as to not block the producer to submitting +// to all consumers. Broadcast should be used if not all consumers are expected to consume the event, +// within a reasonable time, or if the configured broadcast timeout is short enough. +func (ep *Producer[T]) Broadcast(ctx context.Context, event T) { + ep.RLock() + defer ep.RUnlock() + for _, sub := range ep.subs { + go func(listener *Subscription[T]) { + select { + case listener.events <- event: + case <-time.After(ep.broadcastTimeout): + case <-ctx.Done(): + } + }(sub) + } +} - // Cancel first two subscriptions; they will send their IDs to doneListener via Next. - for _, s := range []*Subscription[int]{s0, s1} { - c, cancelSub := context.WithCancel(context.Background()) - cancelSub() - _, shouldEnd := s.Next(c) - require.True(t, shouldEnd) - } +type subId int - // Wait until the producer processes removal and only one subscription remains. - deadline := time.Now().Add(2 * time.Second) - for { - producer.RLock() - remaining := len(producer.subs) - producer.RUnlock() - if remaining == 1 || time.Now().After(deadline) { - break - } - time.Sleep(5 * time.Millisecond) - } +// Subscription defines a generic handle to a subscription of +// events from a producer. +type Subscription[T any] struct { + id subId + events chan T + done chan subId +} - producer.RLock() - require.Equal(t, 1, len(producer.subs)) - require.Same(t, s2, producer.subs[0]) - producer.RUnlock() +// Next waits for the next event or context cancelation, returning the event or an error. +func (es *Subscription[T]) Next(ctx context.Context) (T, bool) { + var zeroVal T + for { + select { + case ev := <-es.events: + return ev, false + case <-ctx.Done(): + es.done <- es.id + close(es.events) + return zeroVal, true + } + } } From 9c341e0eb5fc841593a594709326bec3cb677eb4 Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Thu, 11 Sep 2025 10:37:13 +0300 Subject: [PATCH 4/6] Update producer_test.go --- bold/containers/events/producer_test.go | 41 +++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/bold/containers/events/producer_test.go b/bold/containers/events/producer_test.go index 1b09a1f9b4..5dd82d7b16 100644 --- a/bold/containers/events/producer_test.go +++ b/bold/containers/events/producer_test.go @@ -68,3 +68,44 @@ func TestEventProducer_Start(t *testing.T) { t.Error("Expected to end after context cancellation") } } + +func TestRemovalUsesStableId(t *testing.T) { + // This test ensures that removing subscriptions uses stable IDs rather than slice indices. + // Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly + // remove the first (index 0) and the third (now at index 1 after compaction), leaving + // the second subscription in place instead of the third. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + producer := NewProducer[int]() + go producer.Start(ctx) + + s0 := producer.Subscribe() + s1 := producer.Subscribe() + s2 := producer.Subscribe() + + // Cancel first two subscriptions; they will send their IDs to doneListener via Next. + for _, s := range []*Subscription[int]{s0, s1} { + c, cancelSub := context.WithCancel(context.Background()) + cancelSub() + _, shouldEnd := s.Next(c) + require.True(t, shouldEnd) + } + + // Wait until the producer processes removal and only one subscription remains. + deadline := time.Now().Add(2 * time.Second) + for { + producer.RLock() + remaining := len(producer.subs) + producer.RUnlock() + if remaining == 1 || time.Now().After(deadline) { + break + } + time.Sleep(5 * time.Millisecond) + } + + producer.RLock() + require.Equal(t, 1, len(producer.subs)) + require.Same(t, s2, producer.subs[0]) + producer.RUnlock() +} From beb7ea823c7e65b6e82cfa37199d925bafbd51b9 Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Thu, 11 Sep 2025 10:37:47 +0300 Subject: [PATCH 5/6] Update producer.go From 95a02e78d444360a35669df0dbdd4b31af774249 Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Thu, 11 Sep 2025 15:05:29 +0300 Subject: [PATCH 6/6] Update producer_test.go --- bold/containers/events/producer_test.go | 76 ++++++++++++------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/bold/containers/events/producer_test.go b/bold/containers/events/producer_test.go index 5dd82d7b16..0c42a5f4b8 100644 --- a/bold/containers/events/producer_test.go +++ b/bold/containers/events/producer_test.go @@ -70,42 +70,42 @@ func TestEventProducer_Start(t *testing.T) { } func TestRemovalUsesStableId(t *testing.T) { - // This test ensures that removing subscriptions uses stable IDs rather than slice indices. - // Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly - // remove the first (index 0) and the third (now at index 1 after compaction), leaving - // the second subscription in place instead of the third. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - producer := NewProducer[int]() - go producer.Start(ctx) - - s0 := producer.Subscribe() - s1 := producer.Subscribe() - s2 := producer.Subscribe() - - // Cancel first two subscriptions; they will send their IDs to doneListener via Next. - for _, s := range []*Subscription[int]{s0, s1} { - c, cancelSub := context.WithCancel(context.Background()) - cancelSub() - _, shouldEnd := s.Next(c) - require.True(t, shouldEnd) - } - - // Wait until the producer processes removal and only one subscription remains. - deadline := time.Now().Add(2 * time.Second) - for { - producer.RLock() - remaining := len(producer.subs) - producer.RUnlock() - if remaining == 1 || time.Now().After(deadline) { - break - } - time.Sleep(5 * time.Millisecond) - } - - producer.RLock() - require.Equal(t, 1, len(producer.subs)) - require.Same(t, s2, producer.subs[0]) - producer.RUnlock() + // This test ensures that removing subscriptions uses stable IDs rather than slice indices. + // Before the fix, deleting two subscriptions by their IDs 0 and 1 would incorrectly + // remove the first (index 0) and the third (now at index 1 after compaction), leaving + // the second subscription in place instead of the third. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + producer := NewProducer[int]() + go producer.Start(ctx) + + s0 := producer.Subscribe() + s1 := producer.Subscribe() + s2 := producer.Subscribe() + + // Cancel first two subscriptions; they will send their IDs to doneListener via Next. + for _, s := range []*Subscription[int]{s0, s1} { + c, cancelSub := context.WithCancel(context.Background()) + cancelSub() + _, shouldEnd := s.Next(c) + require.True(t, shouldEnd) + } + + // Wait until the producer processes removal and only one subscription remains. + deadline := time.Now().Add(2 * time.Second) + for { + producer.RLock() + remaining := len(producer.subs) + producer.RUnlock() + if remaining == 1 || time.Now().After(deadline) { + break + } + time.Sleep(5 * time.Millisecond) + } + + producer.RLock() + require.Equal(t, 1, len(producer.subs)) + require.Same(t, s2, producer.subs[0]) + producer.RUnlock() }