Skip to content

fix: use compressed event size to close chunk #7517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions v1/plugins/logs/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ const (
// written to the encoder and the encoder outputs chunks that are fit to the
// configured limit.
type chunkEncoder struct {
limit int64
limit int64
bytesWritten int
buf *bytes.Buffer
w *gzip.Writer
metrics metrics.Metrics

// The soft limit is a dynamic limit that will maximize the amount of events that fit in each chunk.
// After creating a chunk it will determine if it should scale up and down based on the chunk size vs the limit.
// If the chunk didn't reach the limit perhaps future events could have been added.
softLimit int64
softLimitScaleUpExponent float64
softLimitScaleDownExponent float64
bytesWritten int
buf *bytes.Buffer
w *gzip.Writer
metrics metrics.Metrics
}

func newChunkEncoder(limit int64) *chunkEncoder {
Expand Down Expand Up @@ -141,18 +145,30 @@ func (enc *chunkEncoder) reset() ([][]byte, error) {
// decisions in the last chunk.
// 3) Equilibrium: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value.

if enc.buf.Len() < int(float64(enc.limit)*encHardLimitThreshold) {
if enc.bytesWritten < int(float64(enc.limit)*encHardLimitThreshold) {
if enc.metrics != nil {
enc.metrics.Counter(encSoftLimitScaleUpCounterName).Incr()
}

mul := int64(math.Pow(float64(softLimitBaseFactor), float64(enc.softLimitScaleUpExponent+1)))
// this can cause enc.softLimit to overflow into a negative value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the circumstances for a scenario where we reach an overflow? Since the thing we're exponentially increasing is upload bytes, for us to overflow, wouldn't the previous successful reset need have had a soft-limit already terabytes in size?

This is intuition talking, and not me doing actual calculus, though, so I may be way off in my estimates. It's very likely I'm missing something here, since you've encountered this in your work and had to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, I updated the PR to now enforce a maximum configurable limit of 4294967296 instead removing the need to check if the soft limit will ever overflow.

enc.softLimit *= mul
enc.softLimitScaleUpExponent += softLimitExponentScaleFactor

// In Go an overflow wraps around using modulo arithmetic, so it could be negative.
// enc.limit*2 is the ceiling for the soft limit, unless that also overflows then it will be (math.MaxInt64 - 1).
if enc.softLimit < 0 || enc.softLimit > enc.limit*2 {
limit := enc.limit * 2
if limit < 0 {
limit = math.MaxInt64 - 1
}
enc.softLimit = limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As always when it comes to math, I'm a bit confused 😅.
Why are we setting the soft-limit to 2x the configured limit (or even higher) here? Won't that cause us to write past the configured limit in WriteBytes()? There is probably some detail I'm missing,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted the math, it won't hurt us anymore 😜

enc.softLimitScaleUpExponent = 0
}
return enc.update(), nil
}

if int(enc.limit) > enc.buf.Len() && enc.buf.Len() >= int(float64(enc.limit)*encHardLimitThreshold) {
if int(enc.limit) > enc.bytesWritten && enc.bytesWritten >= int(float64(enc.limit)*encHardLimitThreshold) {
if enc.metrics != nil {
enc.metrics.Counter(encSoftLimitStableCounterName).Incr()
}
Expand Down
50 changes: 45 additions & 5 deletions v1/plugins/logs/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package logs

import (
"math"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -84,8 +85,9 @@ func TestChunkEncoderSizeLimit(t *testing.T) {
}

func TestChunkEncoderAdaptive(t *testing.T) {

enc := newChunkEncoder(1000).WithMetrics(metrics.New())
// limit is set to 1050, so that 90% is 945 which is larger than the event size of 936
// this will trigger the adaptive changing of the soft limit
enc := newChunkEncoder(1050).WithMetrics(metrics.New())
var result interface{} = false
var expInput interface{} = map[string]interface{}{"method": "GET"}
ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
Expand Down Expand Up @@ -149,9 +151,9 @@ func TestChunkEncoderAdaptive(t *testing.T) {
actualScaleDownEvents := enc.metrics.Counter(encSoftLimitScaleDownCounterName).Value().(uint64)
actualEquiEvents := enc.metrics.Counter(encSoftLimitStableCounterName).Value().(uint64)

expectedScaleUpEvents := uint64(8)
expectedScaleDownEvents := uint64(3)
expectedEquiEvents := uint64(0)
expectedScaleUpEvents := uint64(25)
expectedScaleDownEvents := uint64(25)
expectedEquiEvents := uint64(75)

if actualScaleUpEvents != expectedScaleUpEvents {
t.Fatalf("Expected scale up events %v but got %v", expectedScaleUpEvents, actualScaleUpEvents)
Expand Down Expand Up @@ -179,3 +181,41 @@ func decodeChunks(t *testing.T, bs [][]byte) int {
}
return numEvents
}

func TestReset(t *testing.T) {
tests := []struct {
name string
limit int64
expectedSoftLimit int64
}{
{
name: "limit 100",
limit: 100,
expectedSoftLimit: 200,
},
{
name: "limit maxt int64 - 1 ",
limit: math.MaxInt64 - 1,
expectedSoftLimit: math.MaxInt64 - 1,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

enc := newChunkEncoder(tc.limit)

for range 100 {
_, err := enc.reset()
if err != nil {
t.Fatal(err)
}

if enc.softLimit != tc.expectedSoftLimit {
t.Fatalf("softLimit (%d) exceeds limit (%d)", enc.softLimit, tc.expectedSoftLimit)
}
}
})
}

}
36 changes: 22 additions & 14 deletions v1/plugins/logs/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func TestPluginStartSameInput(t *testing.T) {
fixture := newTestFixture(t)
defer fixture.server.stop()

fixture.server.ch = make(chan []EventV1, 3)
fixture.server.ch = make(chan []EventV1, 4)
var result interface{} = false

ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
Expand Down Expand Up @@ -441,12 +441,15 @@ func TestPluginStartSameInput(t *testing.T) {
chunk1 := <-fixture.server.ch
chunk2 := <-fixture.server.ch
chunk3 := <-fixture.server.ch
chunk4 := <-fixture.server.ch
expLen1 := 122
expLen2 := 242
expLen3 := 36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this test to remain equivalent, maybe we should raise the number of events fired, or tweak the upload limit, so that we keep producing multiple chunks here?

expLen2 := 121
expLen3 := 121
expLen4 := 36

if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len(chunk3) != expLen3 {
t.Fatalf("Expected chunk lens %v, %v, and %v but got: %v, %v, and %v", expLen1, expLen2, expLen3, len(chunk1), len(chunk2), len(chunk3))
if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len(chunk3) != expLen3 || len(chunk4) != expLen4 {
t.Fatalf("Expected chunk lens %v, %v, %v and %v but got: %v, %v, %v and %v",
expLen1, expLen2, expLen3, expLen4, len(chunk1), len(chunk2), len(chunk3), len(chunk4))
}

var expInput interface{} = map[string]interface{}{"method": "GET"}
Expand All @@ -472,8 +475,8 @@ func TestPluginStartSameInput(t *testing.T) {
Metrics: msAsFloat64,
}

if !reflect.DeepEqual(chunk3[expLen3-1], exp) {
t.Fatalf("Expected %+v but got %+v", exp, chunk3[expLen3-1])
if !reflect.DeepEqual(chunk4[expLen4-1], exp) {
t.Fatalf("Expected %+v but got %+v", exp, chunk4[expLen4-1])
}

if fixture.plugin.status.Code != "" {
Expand All @@ -489,7 +492,7 @@ func TestPluginStartChangingInputValues(t *testing.T) {
fixture := newTestFixture(t)
defer fixture.server.stop()

fixture.server.ch = make(chan []EventV1, 3)
fixture.server.ch = make(chan []EventV1, 4)
var result interface{} = false

ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
Expand Down Expand Up @@ -521,12 +524,15 @@ func TestPluginStartChangingInputValues(t *testing.T) {
chunk1 := <-fixture.server.ch
chunk2 := <-fixture.server.ch
chunk3 := <-fixture.server.ch
chunk4 := <-fixture.server.ch
expLen1 := 124
expLen2 := 247
expLen3 := 29
expLen2 := 123
expLen3 := 123
expLen4 := 30

if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len((chunk3)) != expLen3 {
t.Fatalf("Expected chunk lens %v, %v and %v but got: %v, %v and %v", expLen1, expLen2, expLen3, len(chunk1), len(chunk2), len(chunk3))
if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len(chunk3) != expLen3 || len(chunk4) != expLen4 {
t.Fatalf("Expected chunk lens %v, %v, %v and %v but got: %v, %v, %v and %v",
expLen1, expLen2, expLen3, expLen4, len(chunk1), len(chunk2), len(chunk3), len(chunk4))
}

exp := EventV1{
Expand All @@ -544,8 +550,8 @@ func TestPluginStartChangingInputValues(t *testing.T) {
Timestamp: ts,
}

if !reflect.DeepEqual(chunk3[expLen3-1], exp) {
t.Fatalf("Expected %+v but got %+v", exp, chunk3[expLen3-1])
if !reflect.DeepEqual(chunk4[expLen4-1], exp) {
t.Fatalf("Expected %+v but got %+v", exp, chunk4[expLen4-1])
}
}

Expand Down Expand Up @@ -3564,6 +3570,8 @@ type testServer struct {
}

func (t *testServer) handle(w http.ResponseWriter, r *http.Request) {
t.t.Helper()

gr, err := gzip.NewReader(r.Body)
if err != nil {
t.t.Fatal(err)
Expand Down
Loading