diff --git a/internal/component/errors.go b/internal/component/errors.go index 4bf4409d5..f88fa9515 100644 --- a/internal/component/errors.go +++ b/internal/component/errors.go @@ -102,4 +102,5 @@ var ( // Buffer errors. var ( ErrMessageTooLarge = errors.New("message body larger than buffer space") + ErrLimitReached = errors.New("adding message to buffer will exceed size limit") ) diff --git a/internal/impl/pure/buffer_memory.go b/internal/impl/pure/buffer_memory.go index ce531fb4e..de0d504a5 100644 --- a/internal/impl/pure/buffer_memory.go +++ b/internal/impl/pure/buffer_memory.go @@ -48,6 +48,10 @@ It is possible to batch up messages sent from this buffer using a [batch policy] Field(service.NewIntField("limit"). Description(`The maximum buffer size (in bytes) to allow before applying backpressure upstream.`). Default(524288000)). + Field(service.NewBoolField("spillover"). + Description("Whether to drop incoming messages that will exceed the buffer limit."). + Advanced(). + Default(false)). Field(service.NewInternalField(bs)) } @@ -68,6 +72,11 @@ func newMemoryBufferFromConfig(conf *service.ParsedConfig, res *service.Resource return nil, err } + spilloverEnabled, err := conf.FieldBool("spillover") + if err != nil { + return nil, err + } + batchingEnabled, err := conf.FieldBool("batch_policy", "enabled") if err != nil { return nil, err @@ -88,7 +97,7 @@ func newMemoryBufferFromConfig(conf *service.ParsedConfig, res *service.Resource } } - return newMemoryBuffer(limit, batcher), nil + return newMemoryBuffer(limit, spilloverEnabled, batcher), nil } //------------------------------------------------------------------------------ @@ -102,19 +111,21 @@ type memoryBuffer struct { batches []measuredBatch bytes int - cap int - cond *sync.Cond - endOfInput bool - closed bool + cap int + spilloverEnabled bool + cond *sync.Cond + endOfInput bool + closed bool batcher *service.Batcher } -func newMemoryBuffer(capacity int, batcher *service.Batcher) *memoryBuffer { +func newMemoryBuffer(capacity int, spilloverEnabled bool, batcher *service.Batcher) *memoryBuffer { return &memoryBuffer{ - cap: capacity, - cond: sync.NewCond(&sync.Mutex{}), - batcher: batcher, + cap: capacity, + spilloverEnabled: spilloverEnabled, + cond: sync.NewCond(&sync.Mutex{}), + batcher: batcher, } } @@ -250,6 +261,10 @@ func (m *memoryBuffer) WriteBatch(ctx context.Context, msgBatch service.MessageB } for (m.bytes + extraBytes) > m.cap { + if m.spilloverEnabled { + return component.ErrLimitReached + } + m.cond.Wait() if m.closed { return component.ErrTypeClosed diff --git a/internal/impl/pure/buffer_memory_test.go b/internal/impl/pure/buffer_memory_test.go index de496377c..c957da8ee 100644 --- a/internal/impl/pure/buffer_memory_test.go +++ b/internal/impl/pure/buffer_memory_test.go @@ -476,3 +476,66 @@ batch_policy: msgEqual(t, "hello", m[0]) require.NoError(t, ackFunc(ctx, nil)) } + +func TestMemorySpilloverBasic(t *testing.T) { + n := 100 + + ctx := context.Background() + block := memBufFromConf(t, ` +limit: 100000 +spillover: true +`) + defer block.Close(ctx) + + for i := range n { + if err := block.WriteBatch(ctx, service.MessageBatch{ + service.NewMessage([]byte("hello")), + service.NewMessage([]byte("world")), + service.NewMessage([]byte("12345")), + service.NewMessage(fmt.Appendf([]byte{}, "test%v", i)), + }, func(ctx context.Context, err error) error { return nil }); err != nil { + t.Error(err) + } + } + + for i := range n { + m, ackFunc, err := block.ReadBatch(ctx) + require.NoError(t, err) + require.Len(t, m, 4) + msgEqual(t, fmt.Sprintf("test%v", i), m[3]) + require.NoError(t, ackFunc(ctx, nil)) + } +} + +func TestMemorySpilloverOnLimit(t *testing.T) { + ctx := context.Background() + block := memBufFromConf(t, ` +limit: 5 +spillover: true +`) + defer block.Close(ctx) + + if err := block.WriteBatch(ctx, service.MessageBatch{ + service.NewMessage([]byte("hello")), + }, func(ctx context.Context, err error) error { return nil }); err != nil { + t.Error(err) + } + + err := block.WriteBatch(ctx, service.MessageBatch{ + service.NewMessage([]byte("hello")), + }, func(ctx context.Context, err error) error { return nil }) + require.Equal(t, component.ErrLimitReached, err) + + block.EndOfInput() + + m, ackFunc, err := block.ReadBatch(ctx) + require.NoError(t, err) + require.Len(t, m, 1) + msgEqual(t, "hello", m[0]) + require.NoError(t, ackFunc(ctx, nil)) + + m, ackFunc, err = block.ReadBatch(ctx) + require.Equal(t, service.ErrEndOfBuffer, err) + require.Empty(t, m) + require.Nil(t, ackFunc) +} diff --git a/website/docs/components/buffers/memory.md b/website/docs/components/buffers/memory.md index 8e507dadb..acf835f4d 100644 --- a/website/docs/components/buffers/memory.md +++ b/website/docs/components/buffers/memory.md @@ -47,6 +47,7 @@ buffer: buffer: memory: limit: 524288000 + spillover: false batch_policy: enabled: false count: 0 @@ -82,6 +83,14 @@ The maximum buffer size (in bytes) to allow before applying backpressure upstrea Type: `int` Default: `524288000` +### `spillover` + +Whether to drop incoming messages that will exceed the buffer limit. + + +Type: `bool` +Default: `false` + ### `batch_policy` Optionally configure a policy to flush buffered messages in batches.