Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: fix issue prevent checkin local_metadata from being updated

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: fleet-server

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
40 changes: 23 additions & 17 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,30 +1037,36 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]
return nil, nil
}

// Check for empty string - not valid metadata
if str, ok := reqLocalMeta.(string); ok && strings.TrimSpace(str) == "" {
zlog.Warn().Msg("local metadata empty; won't update metadata")
return nil, nil
}

// Deserialize the agent's metadata copy
var agentLocalMeta interface{}
if err := json.Unmarshal(agent.LocalMetadata, &agentLocalMeta); err != nil {
return nil, fmt.Errorf("parseMeta local: %w", err)
if agent.LocalMetadata != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why is this nil check necessary? json.Unmarshal(nil, &agentLocalMeta) should result in agentLocalMeta == nil, which is the zero value of agentLocalMeta anyway, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this actually would fail if the local_metadata was missing. Seems like we don't actually hit this is the real world. Seems elasticsearch always returns it as local_metadata: {}. The unit test now tests both cases, so I added this here just to be more defensive.

if err := json.Unmarshal(agent.LocalMetadata, &agentLocalMeta); err != nil {
// if it has metadata in the document it has to be JSON or the mapping is incorrect
return nil, fmt.Errorf("parseMeta local: %w", err)
}
}

var outMeta []byte

// Compare the deserialized meta structures and return the bytes to update if different
if !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) {

zlog.Trace().
RawJSON("oldLocalMeta", agent.LocalMetadata).
RawJSON("newLocalMeta", req.LocalMetadata).
Msg("local metadata not equal")
// Compare the deserialized meta structures, already the same means no update needs to occur.
if reflect.DeepEqual(reqLocalMeta, agentLocalMeta) {
return nil, nil
}

zlog.Info().
RawJSON("req.LocalMeta", req.LocalMetadata).
Msg("applying new local metadata")
zlog.Trace().
RawJSON("oldLocalMeta", agent.LocalMetadata).
RawJSON("newLocalMeta", req.LocalMetadata).
Msg("local metadata not equal")

outMeta = req.LocalMetadata
}
zlog.Info().
RawJSON("req.LocalMeta", req.LocalMetadata).
Msg("applying new local metadata")

return outMeta, nil
return req.LocalMetadata, nil
}

func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, *[]string, error) {
Expand Down
128 changes: 122 additions & 6 deletions internal/pkg/api/handleCheckin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,11 +1050,12 @@ func TestValidateCheckinRequest(t *testing.T) {
verCon := mustBuildConstraints("8.0.0")

tests := []struct {
name string
req *http.Request
cfg *config.Server
expErr error
expValid validatedCheckin
name string
req *http.Request
cfg *config.Server
currentMeta json.RawMessage
expErr error
expValid validatedCheckin
}{
{
name: "Invalid JSON",
Expand Down Expand Up @@ -1118,6 +1119,121 @@ func TestValidateCheckinRequest(t *testing.T) {
rawMeta: []byte(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`),
},
},
{
name: "local metadata matches",
req: &http.Request{
Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "fips": true}}}}`)),
},
expErr: nil,
currentMeta: json.RawMessage(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`),
cfg: &config.Server{
Limits: config.ServerLimits{
CheckinLimit: config.Limit{
MaxBody: 0,
},
},
},
expValid: validatedCheckin{
rawMeta: nil, // no need to update
},
},
{
name: "local metadata different JSON formatting",
req: &http.Request{
// JSON with specific key ordering
Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "version": "8.0.0"}}, "host": {"hostname": "test-host"}}}`)),
},
expErr: nil,
// Same content but different key ordering in JSON - when unmarshaled and compared
// with reflect.DeepEqual they should be equal, but raw bytes are different
currentMeta: json.RawMessage(`{"host":{"hostname":"test-host"},"elastic":{"agent":{"version":"8.0.0","id":"testid"}}}`),
cfg: &config.Server{
Limits: config.ServerLimits{
CheckinLimit: config.Limit{
MaxBody: 0,
},
},
},
expValid: validatedCheckin{
rawMeta: nil, // should recognize as same content despite different formatting
},
},
{
name: "local metadata is empty string and agent has nil",
req: &http.Request{
Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)),
},
expErr: nil,
currentMeta: nil,
cfg: &config.Server{
Limits: config.ServerLimits{
CheckinLimit: config.Limit{
MaxBody: 0,
},
},
},
expValid: validatedCheckin{
// don't update metadata
rawMeta: nil,
},
},
{
name: "local metadata is empty string and agent has different value",
req: &http.Request{
Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)),
},
expErr: nil,
currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`),
cfg: &config.Server{
Limits: config.ServerLimits{
CheckinLimit: config.Limit{
MaxBody: 0,
},
},
},
expValid: validatedCheckin{
// don't update metadata
rawMeta: nil,
},
},
{
name: "local metadata is null and agent has nil",
req: &http.Request{
Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)),
},
expErr: nil,
currentMeta: nil,
cfg: &config.Server{
Limits: config.ServerLimits{
CheckinLimit: config.Limit{
MaxBody: 0,
},
},
},
expValid: validatedCheckin{
// don't update metadata
rawMeta: nil,
},
},
{
name: "local metadata is null and agent has existing metadata",
req: &http.Request{
Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)),
},
expErr: nil,
currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`),
cfg: &config.Server{
Limits: config.ServerLimits{
CheckinLimit: config.Limit{
MaxBody: 0,
},
},
},
expValid: validatedCheckin{
// don't update metadata
rawMeta: nil,
},
},
}

for _, tc := range tests {
Expand All @@ -1126,7 +1242,7 @@ func TestValidateCheckinRequest(t *testing.T) {
assert.NoError(t, err)
wr := httptest.NewRecorder()
logger := testlog.SetLogger(t)
valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: json.RawMessage(`{}`)})
valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: tc.currentMeta})
if tc.expErr == nil {
assert.NoError(t, err)
assert.Equal(t, tc.expValid.rawMeta, valid.rawMeta)
Expand Down
29 changes: 22 additions & 7 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func WithUnhealthyReason(reason *[]string) Option {

func WithMeta(meta []byte) Option {
return func(pending *pendingT) {
if len(meta) == 0 {
// no real meta; do nothing
return
}
if pending.extra == nil {
pending.extra = &extraT{}
}
Expand Down Expand Up @@ -94,6 +98,10 @@ func WithVer(ver string) Option {

func WithComponents(components []byte) Option {
return func(pending *pendingT) {
if len(components) == 0 {
// no real components; do nothing
return
}
if pending.extra == nil {
pending.extra = &extraT{}
}
Expand Down Expand Up @@ -198,16 +206,23 @@ func (bc *Bulk) timestamp() string {
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, opts ...Option) error {
bc.mut.Lock()
pending := pendingT{
ts: bc.timestamp(),
defer bc.mut.Unlock()

// possible there is already a check-in queued for the same ID
// if that is present we need to be sure to use that pending
// instead of creating a new one
pending, ok := bc.pending[id]
if !ok {
pending = pendingT{
ts: bc.timestamp(),
}
}

for _, opt := range opts {
opt(&pending)
}

bc.pending[id] = pending
bc.mut.Unlock()
return nil
}

Expand Down Expand Up @@ -341,16 +356,16 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) {
}
}

// Update local metadata if provided
if pending.extra.meta != nil {
// Update local metadata if provided (and has a value)
if len(pending.extra.meta) > 0 {
// Surprise: The json encoder compacts this raw JSON during
// the encode process, so there my be unexpected memory overhead:
// https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507
fields[dl.FieldLocalMetadata] = json.RawMessage(pending.extra.meta)
}

// Update components if provided
if pending.extra.components != nil {
// Update components if provided (and has a value)
if len(pending.extra.components) > 0 {
fields[dl.FieldComponents] = json.RawMessage(pending.extra.components)
}

Expand Down
54 changes: 54 additions & 0 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,60 @@ func TestBulkSimple(t *testing.T) {
}
}

func TestBulkReusePending(t *testing.T) {
ctx := testlog.SetLogger(t).WithContext(t.Context())

const (
agentID = "test-agent-id"
status = "online"
message = "test message"
)

meta := []byte(`{"test":"metadata"}`)

// Matcher that validates both the existing field (status) and new field (meta) are present
matchAccumulatedOps := func(ops []bulk.MultiOp) bool {
if len(ops) != 1 {
t.Errorf("Expected 1 operation, got %d", len(ops))
return false
}
if ops[0].ID != agentID {
t.Errorf("Expected ID %s, got %s", agentID, ops[0].ID)
return false
}

type updateT struct {
Status string `json:"last_checkin_status"`
Meta json.RawMessage `json:"local_metadata"`
}

m := make(map[string]updateT)
err := json.Unmarshal(ops[0].Body, &m)
require.NoErrorf(t, err, "unable to validate operation body %s", string(ops[0].Body))

sub, ok := m["doc"]
require.True(t, ok, "unable to validate operation: expected doc")

assert.Equal(t, status, sub.Status, "Expected status from first CheckIn to be preserved")
assert.Equal(t, json.RawMessage(meta), sub.Meta, "Expected metadata from second CheckIn to be added")
return true
}

mockBulk := ftesting.NewMockBulk()
mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchAccumulatedOps), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once()

bc := NewBulk(mockBulk)

err := bc.CheckIn(agentID, WithStatus(status), WithMessage(message))
require.NoError(t, err)
err = bc.CheckIn(agentID, WithMeta(meta))
require.NoError(t, err)
err = bc.flush(ctx)
require.NoError(t, err)

mockBulk.AssertExpectations(t)
}

func validateTimestamp(tb testing.TB, start time.Time, ts string) {
t1, err := time.Parse(time.RFC3339, ts)
require.NoErrorf(tb, err, "expected %q to be in RFC 3339 format", ts)
Expand Down
Loading