Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/component/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,5 @@ var (
// Buffer errors.
var (
ErrMessageTooLarge = errors.New("message body larger than buffer space")
ErrLimitReached = errors.New("adding message to buffer will exceed size limit")
)
33 changes: 24 additions & 9 deletions internal/impl/pure/buffer_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ It is possible to batch up messages sent from this buffer using a [batch policy]
Field(service.NewIntField("limit").
Description(`The maximum buffer size (in bytes) to allow before applying backpressure upstream.`).
Default(524288000)).
Field(service.NewBoolField("spillover").
Description("Whether to drop incoming messages that will exceed the buffer limit.").
Advanced().
Default(false)).
Field(service.NewInternalField(bs))
}

Expand All @@ -68,6 +72,11 @@ func newMemoryBufferFromConfig(conf *service.ParsedConfig, res *service.Resource
return nil, err
}

spilloverEnabled, err := conf.FieldBool("spillover")
if err != nil {
return nil, err
}

batchingEnabled, err := conf.FieldBool("batch_policy", "enabled")
if err != nil {
return nil, err
Expand All @@ -88,7 +97,7 @@ func newMemoryBufferFromConfig(conf *service.ParsedConfig, res *service.Resource
}
}

return newMemoryBuffer(limit, batcher), nil
return newMemoryBuffer(limit, spilloverEnabled, batcher), nil
}

//------------------------------------------------------------------------------
Expand All @@ -102,19 +111,21 @@ type memoryBuffer struct {
batches []measuredBatch
bytes int

cap int
cond *sync.Cond
endOfInput bool
closed bool
cap int
spilloverEnabled bool
cond *sync.Cond
endOfInput bool
closed bool

batcher *service.Batcher
}

func newMemoryBuffer(capacity int, batcher *service.Batcher) *memoryBuffer {
func newMemoryBuffer(capacity int, spilloverEnabled bool, batcher *service.Batcher) *memoryBuffer {
return &memoryBuffer{
cap: capacity,
cond: sync.NewCond(&sync.Mutex{}),
batcher: batcher,
cap: capacity,
spilloverEnabled: spilloverEnabled,
cond: sync.NewCond(&sync.Mutex{}),
batcher: batcher,
}
}

Expand Down Expand Up @@ -250,6 +261,10 @@ func (m *memoryBuffer) WriteBatch(ctx context.Context, msgBatch service.MessageB
}

for (m.bytes + extraBytes) > m.cap {
if m.spilloverEnabled {
return component.ErrLimitReached
}

m.cond.Wait()
if m.closed {
return component.ErrTypeClosed
Expand Down
63 changes: 63 additions & 0 deletions internal/impl/pure/buffer_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,66 @@ batch_policy:
msgEqual(t, "hello", m[0])
require.NoError(t, ackFunc(ctx, nil))
}

func TestMemorySpilloverBasic(t *testing.T) {
n := 100

ctx := context.Background()
block := memBufFromConf(t, `
limit: 100000
spillover: true
`)
defer block.Close(ctx)

for i := range n {
if err := block.WriteBatch(ctx, service.MessageBatch{
service.NewMessage([]byte("hello")),
service.NewMessage([]byte("world")),
service.NewMessage([]byte("12345")),
service.NewMessage(fmt.Appendf([]byte{}, "test%v", i)),
}, func(ctx context.Context, err error) error { return nil }); err != nil {
t.Error(err)
}
}

for i := range n {
m, ackFunc, err := block.ReadBatch(ctx)
require.NoError(t, err)
require.Len(t, m, 4)
msgEqual(t, fmt.Sprintf("test%v", i), m[3])
require.NoError(t, ackFunc(ctx, nil))
}
}

func TestMemorySpilloverOnLimit(t *testing.T) {
ctx := context.Background()
block := memBufFromConf(t, `
limit: 5
spillover: true
`)
defer block.Close(ctx)

if err := block.WriteBatch(ctx, service.MessageBatch{
service.NewMessage([]byte("hello")),
}, func(ctx context.Context, err error) error { return nil }); err != nil {
t.Error(err)
}

err := block.WriteBatch(ctx, service.MessageBatch{
service.NewMessage([]byte("hello")),
}, func(ctx context.Context, err error) error { return nil })
require.Equal(t, component.ErrLimitReached, err)

block.EndOfInput()

m, ackFunc, err := block.ReadBatch(ctx)
require.NoError(t, err)
require.Len(t, m, 1)
msgEqual(t, "hello", m[0])
require.NoError(t, ackFunc(ctx, nil))

m, ackFunc, err = block.ReadBatch(ctx)
require.Equal(t, service.ErrEndOfBuffer, err)
require.Empty(t, m)
require.Nil(t, ackFunc)
}
9 changes: 9 additions & 0 deletions website/docs/components/buffers/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ buffer:
buffer:
memory:
limit: 524288000
spillover: false
batch_policy:
enabled: false
count: 0
Expand Down Expand Up @@ -82,6 +83,14 @@ The maximum buffer size (in bytes) to allow before applying backpressure upstrea
Type: `int`
Default: `524288000`

### `spillover`

Whether to drop incoming messages that will exceed the buffer limit.


Type: `bool`
Default: `false`

### `batch_policy`

Optionally configure a policy to flush buffered messages in batches.
Expand Down