@@ -7,7 +7,6 @@ package logs
7
7
import (
8
8
"context"
9
9
"encoding/json"
10
- "fmt"
11
10
"sync"
12
11
13
12
"github.com/open-policy-agent/opa/v1/logging"
@@ -44,6 +43,7 @@ func newEventBuffer(bufferSizeLimitEvents int64, client rest.Client, uploadPath
44
43
45
44
func (b * eventBuffer ) WithMetrics (m metrics.Metrics ) * eventBuffer {
46
45
b .metrics = m
46
+ b .encoder .metrics = m
47
47
return b
48
48
}
49
49
@@ -158,18 +158,22 @@ func (b *eventBuffer) readEvent() *bufferItem {
158
158
}
159
159
}
160
160
161
- // processEvent serializes the event and determines if the ND cache needs to be dropped
162
- func (b * eventBuffer ) processEvent (event * bufferItem ) ([][]byte , error ) {
161
+ // Handle serializes the event and determines if the ND cache needs to be dropped
162
+ func (b * eventBuffer ) Handle (event * bufferItem ) [][]byte {
163
+ if event == nil {
164
+ return nil
165
+ }
166
+
163
167
if event .chunk != nil { // this is a chunk that has failed to upload and is being retried
164
- return [][]byte {event .chunk }, nil
168
+ return [][]byte {event .chunk }
165
169
}
166
170
167
171
serialized , err := json .Marshal (event .EventV1 )
168
172
169
173
// The non-deterministic cache (NDBuiltinCache) could cause issues, if it is too big or can't be encoded try to drop it.
170
174
if err != nil || int64 (len (serialized )) >= b .uploadSizeLimitBytes {
171
175
if event .NDBuiltinCache == nil {
172
- return nil , fmt . Errorf ("upload event size (%d) exceeds upload_size_limit_bytes (%d), dropping event with decision ID: %v" ,
176
+ b . logError ("upload event size (%d) exceeds upload_size_limit_bytes (%d), dropping event with decision ID: %v" ,
173
177
int64 (len (serialized )), b .uploadSizeLimitBytes , event .DecisionID )
174
178
}
175
179
@@ -179,10 +183,10 @@ func (b *eventBuffer) processEvent(event *bufferItem) ([][]byte, error) {
179
183
serialized , err = json .Marshal (event .EventV1 )
180
184
if err != nil {
181
185
b .incrMetric (logEncodingFailureCounterName )
182
- return nil , fmt . Errorf ("encoding failure: %v, dropping event with decision ID: %v" , err , event .DecisionID )
186
+ b . logError ("encoding failure: %v, dropping event with decision ID: %v" , err , event .DecisionID )
183
187
}
184
188
if int64 (len (serialized )) > b .uploadSizeLimitBytes {
185
- return nil , fmt . Errorf ("upload event size (%d) exceeds upload_size_limit_bytes (%d), dropping event with decision ID: %v" ,
189
+ b . logError ("upload event size (%d) exceeds upload_size_limit_bytes (%d), dropping event with decision ID: %v" ,
186
190
int64 (len (serialized )), b .uploadSizeLimitBytes , event .DecisionID )
187
191
}
188
192
@@ -194,28 +198,17 @@ func (b *eventBuffer) processEvent(event *bufferItem) ([][]byte, error) {
194
198
if err != nil {
195
199
b .incrMetric (logEncodingFailureCounterName )
196
200
b .logError ("encoding failure: %v, dropping event with decision ID: %v" , err , event .DecisionID )
197
- return nil , err
201
+ return nil
198
202
}
199
203
200
- return result , nil
204
+ return result
201
205
}
202
206
203
207
// Input returns the buffer channel to be read from
204
208
func (b * eventBuffer ) Input () <- chan * bufferItem {
205
209
return b .buffer
206
210
}
207
211
208
- // Handle takes an event and encodes it as a chunk
209
- func (b * eventBuffer ) Handle (event * bufferItem ) [][]byte {
210
- result , err := b .processEvent (event )
211
- if err != nil {
212
- b .logError ("%v" , err )
213
- return nil
214
- }
215
-
216
- return result
217
- }
218
-
219
212
// Flush empties the encoder and returns whatever is ready
220
213
func (b * eventBuffer ) Flush () [][]byte {
221
214
result , err := b .encoder .Flush ()
0 commit comments