Skip to content

Commit 23a5685

Browse files
committed
remove error channel, handle upload error within immediate upload loop.
Signed-off-by: sspaink <[email protected]>
1 parent 1b3df8f commit 23a5685

File tree

2 files changed

+13
-20
lines changed

2 files changed

+13
-20
lines changed

Diff for: v1/plugins/logs/plugin.go

+11-20
Original file line numberDiff line numberDiff line change
@@ -836,26 +836,23 @@ func (p *Plugin) compilerUpdated(storage.Transaction) {
836836
func (p *Plugin) loop() {
837837
ctx, cancel := context.WithCancel(context.Background())
838838

839-
var errCh chan error
840-
var stop chan chan struct{}
839+
var stopImmediateLoop chan chan struct{}
841840
if *p.config.Reporting.Trigger == plugins.TriggerImmediate && p.config.Service != "" {
842-
errCh = make(chan error)
843-
stop = make(chan chan struct{})
844-
841+
stopImmediateLoop = make(chan chan struct{})
845842
switch p.config.Reporting.BufferType {
846843
case sizeBufferType:
847844
go immediateUpload[[][]byte](ctx,
848-
errCh,
849-
stop,
845+
stopImmediateLoop,
850846
p.config.Reporting.MaxDelaySeconds,
851847
p,
848+
p,
852849
)
853850
case eventBufferType:
854851
go immediateUpload[*bufferItem](ctx,
855-
errCh,
856-
stop,
852+
stopImmediateLoop,
857853
p.config.Reporting.MaxDelaySeconds,
858854
p.eventBuffer,
855+
p,
859856
)
860857
}
861858
}
@@ -898,16 +895,14 @@ func (p *Plugin) loop() {
898895
}
899896

900897
select {
901-
case err := <-errCh:
902-
p.handleUploadError(err)
903898
case <-waitC:
904899
case update := <-p.reconfig:
905900
p.reconfigure(ctx, update.config)
906901
update.done <- struct{}{}
907902
case done := <-p.stop:
908903
if *p.config.Reporting.Trigger == plugins.TriggerImmediate {
909904
d := make(chan struct{})
910-
stop <- d
905+
stopImmediateLoop <- d
911906
<-d
912907
}
913908
cancel()
@@ -1303,7 +1298,7 @@ type buffer[T any] interface {
13031298
}
13041299

13051300
// immediateUpload continuously waits to receive a chunk or event and then attempts an upload
1306-
func immediateUpload[T any](ctx context.Context, errCh chan error, stop chan chan struct{}, maxDelay *int64, b buffer[T]) {
1301+
func immediateUpload[T any](ctx context.Context, stopImmediateLoop chan chan struct{}, maxDelay *int64, b buffer[T], p *Plugin) {
13071302
timer := time.NewTimer(time.Duration(*maxDelay))
13081303

13091304
for {
@@ -1315,18 +1310,14 @@ func immediateUpload[T any](ctx context.Context, errCh chan error, stop chan cha
13151310
chunks = b.Handle(e)
13161311
case <-timer.C:
13171312
chunks = b.Flush()
1318-
case d := <-stop:
1319-
d <- struct{}{}
1313+
case done := <-stopImmediateLoop:
1314+
done <- struct{}{}
13201315
return
13211316
}
13221317

13231318
err := b.UploadChunks(ctx, chunks)
13241319
if err != nil {
1325-
// this is consumed by p.loop to set the plugin status
1326-
errCh <- err
1327-
// delay briefly to prevent overloading the service
1328-
time.Sleep(minRetryDelay)
1329-
1320+
p.handleUploadError(err)
13301321
continue
13311322
}
13321323

Diff for: v1/plugins/logs/plugin_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -3782,6 +3782,8 @@ func TestPluginTriggerImmediateErrorHandling(t *testing.T) {
37823782
t.Fatal(err)
37833783
}
37843784

3785+
fixture.plugin.Stop(ctx)
3786+
37853787
// wait until there is an error entry
37863788
var errReceived string
37873789
for {

0 commit comments

Comments
 (0)