Skip to content

Commit 1b3df8f

Browse files
committed
remove flaky test and make result chan earlier
Signed-off-by: sspaink <[email protected]>
1 parent 4b83fca commit 1b3df8f

File tree

2 files changed

+6
-79
lines changed

2 files changed

+6
-79
lines changed

Diff for: v1/plugins/logs/plugin.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,10 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {
588588
preparedMask: *newPrepareOnce(),
589589
}
590590

591+
if *parsedConfig.Reporting.Trigger == plugins.TriggerImmediate {
592+
plugin.immediateResult = make(chan [][]byte)
593+
}
594+
591595
switch parsedConfig.Reporting.BufferType {
592596
case eventBufferType:
593597
plugin.eventBuffer = newEventBuffer(
@@ -832,8 +836,6 @@ func (p *Plugin) compilerUpdated(storage.Transaction) {
832836
func (p *Plugin) loop() {
833837
ctx, cancel := context.WithCancel(context.Background())
834838

835-
var retry int
836-
837839
var errCh chan error
838840
var stop chan chan struct{}
839841
if *p.config.Reporting.Trigger == plugins.TriggerImmediate && p.config.Service != "" {
@@ -842,7 +844,6 @@ func (p *Plugin) loop() {
842844

843845
switch p.config.Reporting.BufferType {
844846
case sizeBufferType:
845-
p.immediateResult = make(chan [][]byte)
846847
go immediateUpload[[][]byte](ctx,
847848
errCh,
848849
stop,
@@ -859,6 +860,8 @@ func (p *Plugin) loop() {
859860
}
860861
}
861862

863+
var retry int
864+
862865
for {
863866
var waitC chan struct{}
864867

Diff for: v1/plugins/logs/plugin_test.go

-76
Original file line numberDiff line numberDiff line change
@@ -3800,79 +3800,3 @@ func TestPluginTriggerImmediateErrorHandling(t *testing.T) {
38003800
})
38013801
}
38023802
}
3803-
3804-
func TestPluginImmediateMaxDelayFlush(t *testing.T) {
3805-
t.Parallel()
3806-
3807-
tests := []struct {
3808-
name string
3809-
bufferType string
3810-
uploadLimit int64
3811-
}{
3812-
{
3813-
name: "event buffer upload immediately",
3814-
bufferType: eventBufferType,
3815-
},
3816-
{
3817-
name: "size buffer upload immediately",
3818-
bufferType: sizeBufferType,
3819-
uploadLimit: 215,
3820-
},
3821-
}
3822-
3823-
for _, tc := range tests {
3824-
t.Run(tc.name, func(t *testing.T) {
3825-
ctx := context.Background()
3826-
3827-
maxDelay := int64(1)
3828-
3829-
triggerMode := plugins.TriggerImmediate
3830-
fixture := newTestFixture(t, testFixtureOptions{
3831-
ReportingBufferType: tc.bufferType,
3832-
ReportingTrigger: &triggerMode,
3833-
ReportingUploadSizeLimitBytes: tc.uploadLimit,
3834-
ReportingMaxDelay: maxDelay,
3835-
})
3836-
defer fixture.server.stop()
3837-
3838-
fixture.server.ch = make(chan []EventV1, 8)
3839-
3840-
if err := fixture.plugin.Start(ctx); err != nil {
3841-
t.Fatal(err)
3842-
}
3843-
3844-
var input any = make(map[string]interface{})
3845-
var result any = false
3846-
3847-
input = generateInputMap(1)
3848-
id := "abc"
3849-
err := fixture.plugin.Log(ctx, logServerInfo(id, input, result))
3850-
if err != nil {
3851-
t.Fatal(err)
3852-
}
3853-
err = fixture.plugin.Log(ctx, logServerInfo(id, input, result))
3854-
if err != nil {
3855-
t.Fatal(err)
3856-
}
3857-
3858-
// wait for the immediate loop to flush the single event
3859-
time.Sleep(time.Duration(maxDelay) * time.Second * 2)
3860-
3861-
close(fixture.server.ch)
3862-
3863-
for events := range fixture.server.ch {
3864-
if len(events) < 1 {
3865-
t.Fatal("no events received")
3866-
}
3867-
3868-
if events[0].DecisionID != id {
3869-
t.Fatalf("Unexpected decision ID received '%s' but expected '%s'", events[0].DecisionID, id)
3870-
}
3871-
3872-
return
3873-
}
3874-
3875-
t.Fatalf("no events received")
3876-
})
3877-
}
3878-
}

0 commit comments

Comments
 (0)