Skip to content

fix: use compressed event size to close chunk #7517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion docs/content/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ included in the actual bundle gzipped tarball.
| `decision_logs.reporting.buffer_size_limit_events` | `int64` | No (default: `10000`) | Decision log buffer size limit by events. OPA will drop old events from the log if this limit is exceeded. By default, 100 events are held. This number has to be greater than zero. Only works with "event" buffer type. |
| `decision_logs.reporting.buffer_size_limit_bytes` | `int64` | No (default: `unlimited`) | Decision log buffer size limit in bytes. OPA will drop old events from the log if this limit is exceeded. By default, no limit is set. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. Only works with "size" buffer type. |
| `decision_logs.reporting.max_decisions_per_second` | `float64` | No | Maximum number of decision log events to buffer per second. OPA will drop events if the rate limit is exceeded. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. |
| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. OPA will chunk uploads to cap message body to this limit. |
| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. This limit enforces the maximum size of a gzip compressed payload of events within the message body. The maximum value allowed is 4294967296 bytes. |
| `decision_logs.reporting.min_delay_seconds` | `int64` | No (default: `300`) | Minimum amount of time to wait between uploads. |
| `decision_logs.reporting.max_delay_seconds` | `int64` | No (default: `600`) | Maximum amount of time to wait between uploads. |
| `decision_logs.reporting.trigger` | `string` | No (default: `periodic`) | Controls how decision logs are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). |
Expand Down
28 changes: 16 additions & 12 deletions docs/content/management-decision-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,22 @@ during the next upload event. OPA also performs an exponential backoff to calcul
when the remote service responds with a non-2xx status.

OPA periodically uploads decision logs to the remote service. In order to conserve network and memory resources, OPA
attempts to fill up each upload chunk with as many events as possible while respecting the user-specified
`upload_size_limit_bytes` config option. OPA defines an adaptive (`soft`) limit that acts as a measure for encoding
as many decisions into each chunk as possible. It uses the below algorithm to optimize the number of log events to
include in a chunk. The algorithm features three phases namely:

`Scale Up`: If the current chunk size is within 90% of the user-configured (`hard`) limit, exponentially increase the
soft limit. The exponential function is 2^x where x has a minimum value of 1

`Scale Down`: If the current chunk size exceeds the hard limit, decrease the soft limit and re-encode the decisions in
the last chunk.

`Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value.
attempts to fill up each message body with as many events as possible while respecting the user-specified
`upload_size_limit_bytes` config option. Each message body is a gzip compressed JSON array and the `upload_size_limit_bytes`
config option represents the gzip compressed size, it can be referred to as the compressed limit. To avoid compressing
each incoming event to get its compressed size to see if the compressed limit is reached, OPA tries to make an educated
guess what the uncompressed limit could be. It does so by using an adaptive limit, referred to as the uncompressed limit,
that gets adjusted by measuring incoming events. This does mean that initially the chunk sizes will most likely be smaller
than the compressed limit, but as OPA consumes more decision events it will adjust the adaptive uncompressed limit to
optimize the messages. The algorithm to adjust the uncompressed limit uses the following criteria:

`Scale Up`: If the current chunk size is within 90% of the user-configured compressed limit, exponentially increase the
uncompressed limit. The exponential function is 2^x where x has a minimum value of 1

`Scale Down`: If the current chunk size exceeds the compressed limit, decrease the uncompressed limit and re-encode the
decisions in the last chunk.

`Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain uncompressed limit value.

When an event containing `nd_builtin_cache` cannot fit into a chunk smaller than `upload_size_limit_bytes`, OPA will
drop the `nd_builtin_cache` key from the event, and will retry encoding the chunk without the non-deterministic
Expand Down
14 changes: 6 additions & 8 deletions v1/plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error)
f(result)
}

config, err := NewConfigBuilder().WithBytes(manager.Config.Discovery).WithServices(manager.Services()).
WithKeyConfigs(manager.PublicKeys()).Parse()
result.logger = manager.Logger().WithFields(map[string]interface{}{"plugin": Name})

config, err := NewConfigBuilder().WithBytes(manager.Config.Discovery).WithServices(manager.Services()).WithKeyConfigs(manager.PublicKeys()).Parse()
if err != nil {
return nil, err
} else if config == nil {
if _, err := getPluginSet(result.factories, manager, manager.Config, result.metrics, nil); err != nil {
if _, err := getPluginSet(result.factories, manager, manager.Config, result.metrics, result.logger, nil); err != nil {
return nil, err
}
return result, nil
Expand Down Expand Up @@ -141,8 +141,6 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error)
Name: Name,
}

result.logger = manager.Logger().WithFields(map[string]interface{}{"plugin": Name})

manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady})
return result, nil
}
Expand Down Expand Up @@ -509,7 +507,7 @@ func (c *Discovery) processBundle(ctx context.Context, b *bundleApi.Bundle) (*pl
return nil, err
}

ps, err := getPluginSet(c.factories, c.manager, overriddenConfig, c.metrics, c.config.Trigger)
ps, err := getPluginSet(c.factories, c.manager, overriddenConfig, c.metrics, c.logger, c.config.Trigger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -584,7 +582,7 @@ type pluginfactory struct {
config interface{}
}

func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager, config *config.Config, m metrics.Metrics, trigger *plugins.TriggerMode) (*pluginSet, error) {
func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager, config *config.Config, m metrics.Metrics, l logging.Logger, trigger *plugins.TriggerMode) (*pluginSet, error) {

// Parse and validate plugin configurations.
pluginNames := []string{}
Expand Down Expand Up @@ -628,7 +626,7 @@ func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager
}

decisionLogsConfig, err := logs.NewConfigBuilder().WithBytes(config.DecisionLogs).WithServices(manager.Services()).
WithPlugins(pluginNames).WithTriggerMode(trigger).Parse()
WithPlugins(pluginNames).WithTriggerMode(trigger).WithLogger(l).Parse()
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions v1/plugins/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3231,7 +3231,7 @@ bundle:
`
manager := getTestManager(t, conf)
trigger := plugins.TriggerManual
_, err := getPluginSet(nil, manager, manager.Config, nil, &trigger)
_, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand Down Expand Up @@ -3268,7 +3268,7 @@ bundles:
`
manager := getTestManager(t, conf)
trigger := plugins.TriggerManual
_, err := getPluginSet(nil, manager, manager.Config, nil, &trigger)
_, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand Down Expand Up @@ -3328,7 +3328,7 @@ bundles:

manager := getTestManager(t, tc.conf)
trigger := plugins.TriggerManual
_, err := getPluginSet(nil, manager, manager.Config, nil, &trigger)
_, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger)

if tc.wantErr {
if err == nil {
Expand Down Expand Up @@ -3393,7 +3393,7 @@ decision_logs:

manager := getTestManager(t, tc.conf)
trigger := plugins.TriggerManual
_, err := getPluginSet(nil, manager, manager.Config, nil, &trigger)
_, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger)

if tc.wantErr {
if err == nil {
Expand Down Expand Up @@ -3464,7 +3464,7 @@ status:

manager := getTestManager(t, tc.conf)
trigger := plugins.TriggerManual
_, err := getPluginSet(nil, manager, manager.Config, nil, &trigger)
_, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger)

if tc.wantErr {
if err == nil {
Expand Down
Loading
Loading