Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/21316.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for customizable cache keys to manage logs cursor
49 changes: 30 additions & 19 deletions datadog_checks_base/datadog_checks/base/checks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,10 @@
import re
from collections import deque
from os.path import basename
from typing import ( # noqa: F401
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
Callable,
Deque,
Dict,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
Deque, # noqa: F401
)

import lazy_loader
Expand All @@ -33,6 +24,7 @@
from datadog_checks.base.constants import ServiceCheck
from datadog_checks.base.errors import ConfigurationError
from datadog_checks.base.utils.agent.utils import should_profile_memory
from datadog_checks.base.utils.cache_key import CacheKey, CacheKeyManager, CacheKeyType, FullConfigCacheKey
from datadog_checks.base.utils.common import ensure_bytes, to_native_string
from datadog_checks.base.utils.fips import enable_fips
from datadog_checks.base.utils.format import json
Expand Down Expand Up @@ -291,6 +283,10 @@ def __init__(self, *args, **kwargs):
self._config_model_instance = None # type: Any
self._config_model_shared = None # type: Any

# The cache key used for persistent caching is managed through teh cache manager. Each cache key has a deferred
# initialization since all properties might not be available during the check initialization time.
self.cache_key_manager = CacheKeyManager(self)

# Functions that will be called exactly once (if successful) before the first check run
self.check_initializations = deque() # type: Deque[Callable[[], None]]

Expand Down Expand Up @@ -491,6 +487,14 @@ def in_developer_mode(self):
self._log_deprecation('in_developer_mode')
return False

def logs_persistent_cache_key(self) -> CacheKey:
"""
Returns the cache key for the logs persistent cache.

Override this method to modify how the log cursor is persisted between agent restarts.
"""
return FullConfigCacheKey(self)

def log_typos_in_options(self, user_config, models_config, level):
# See Performance Optimizations in this package's README.md.
from jellyfish import jaro_winkler_similarity
Expand Down Expand Up @@ -1009,13 +1013,24 @@ def send_log(self, data, cursor=None, stream='default'):
attributes['timestamp'] = int(timestamp * 1000)

datadog_agent.send_log(json.encode(attributes), self.check_id)

if cursor is not None:
self.write_persistent_cache('log_cursor_{}'.format(stream), json.encode(cursor))
self.store_log_cursor(cursor, stream)

def store_log_cursor(self, cursor: dict[str, Any], stream: str = 'default'):
"""Stores the log cursor in the persistent cache."""
self.cache_key_manager.add(cache_key_type=CacheKeyType.LOG_CURSOR, key_factory=self.logs_persistent_cache_key)
cache_key = self.cache_key_manager.get(cache_key_type=CacheKeyType.LOG_CURSOR)
self.write_persistent_cache(cache_key.key_for(context=f'log_cursor_{stream}'), json.encode(cursor))

def get_log_cursor(self, stream='default'):
# type: (str) -> dict[str, Any] | None
"""Returns the most recent log cursor from disk."""
data = self.read_persistent_cache('log_cursor_{}'.format(stream))
self.cache_key_manager.add(cache_key_type=CacheKeyType.LOG_CURSOR, key_factory=self.logs_persistent_cache_key)

cache_key = self.cache_key_manager.get(cache_key_type=CacheKeyType.LOG_CURSOR)
data = self.read_persistent_cache(cache_key.key_for(f'log_cursor_{stream}'))

return json.decode(data) if data else None

def _log_deprecation(self, deprecation_key, *args):
Expand Down Expand Up @@ -1082,10 +1097,6 @@ def entrypoint(self, *args, **kwargs):

return entrypoint

def _persistent_cache_id(self, key):
# type: (str) -> str
return '{}_{}'.format(self.check_id, key)

def read_persistent_cache(self, key):
# type: (str) -> str
"""Returns the value previously stored with `write_persistent_cache` for the same `key`.
Expand All @@ -1094,7 +1105,7 @@ def read_persistent_cache(self, key):
key (str):
the key to retrieve
"""
return datadog_agent.read_persistent_cache(self._persistent_cache_id(key))
return datadog_agent.read_persistent_cache(f"{self.name}_{key}")

def write_persistent_cache(self, key, value):
# type: (str, str) -> None
Expand All @@ -1110,7 +1121,7 @@ def write_persistent_cache(self, key, value):
value (str):
the value to store
"""
datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value)
datadog_agent.write_persistent_cache(f"{self.name}_{key}", value)

def set_external_tags(self, external_tags):
# type: (Sequence[ExternalTagType]) -> None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from .base import CacheKey
from .full_config import FullConfigCacheKey
from .config_set import ConfigSetCacheKey
from .manager import CacheKeyManager, CacheKeyType

__all__ = ["CacheKey", "FullConfigCacheKey", "ConfigSetCacheKey", "CacheKeyManager", "CacheKeyType"]
34 changes: 34 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/cache_key/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from datadog_checks.base import AgentCheck


class CacheKey(ABC):
def __init__(self, check: AgentCheck):
"""Abstract base class for cache keys management.

Any implementation of this class provides the logic to generate cache keys to be used in the Agent persistent
cache.
"""
self.check = check

@abstractmethod
def base_key(self) -> str:
"""
Abstract method that derives the cache key for the particular implementation.
This method must return a stable key that only differs between instances based on the
specific implmentation of the invalidation logic.
"""

def key_for(self, context: str) -> str:
"""
Returns a key that is a combination of the base key and the provided context.
"""
return f"{self.base_key()}_{context}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from collections.abc import Collection
from typing import TYPE_CHECKING

from .base import CacheKey

if TYPE_CHECKING:
from typing import Any, Iterable

from datadog_checks.base import AgentCheck


class ConfigSetCacheKey(CacheKey):
"""
Cache key that is derived from a subset of the check's config options.
When the subset of config options changes, the cache is invalidated.
"""

def __init__(
self,
check: AgentCheck,
config_options: Collection[str],
):
super().__init__(check)
self.config_options = set(config_options)
# Config cannot change on the fly, so we can cache the key
self.__key: str | None = None

def base_key(self) -> str:
if self.__key is not None:
return self.__key

merged_config = self.check.init_config | self.check.instance
selected_values = tuple(values for key, values in merged_config.items() if key in self.config_options)
self.__key = str(hash(self.__sorted_values(selected_values)))
return self.__key

def __sorted_values(self, values: Iterable[Any]) -> tuple[str, ...]:
sorted_values = []

for value in values:
if isinstance(value, (list, tuple, set, frozenset)):
sorted_values.extend(self.__sorted_values(value))
elif isinstance(value, dict):
for key, dict_value in value.items():
sorted_values.append(f"{key}:{self.__sorted_values(dict_value)}")
else:
sorted_values.append(str(value))
return tuple(sorted(sorted_values))
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from .base import CacheKey


class FullConfigCacheKey(CacheKey):
"""
Cache key based on the check_id of the check where it is being used.

The check_id includes a digest of the full configuration of the check. The cache is invalidated
whenever the configuration of the check changes.
"""

def base_key(self) -> str:
# The check_id is injected by the agent containing the config digest
return str(self.check.check_id)
76 changes: 76 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/cache_key/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from enum import Enum, auto
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable

from datadog_checks.base import AgentCheck

from .base import CacheKey
from .full_config import FullConfigCacheKey


class CacheKeyType(Enum):
"""Enum used to identify the type of cache key."""

LOG_CURSOR = auto()


class CacheKeyManager:
def __init__(self, check: AgentCheck):
"""
Manager of cache keys for the persistent cache.
This class defined the different kinds of persistent cache keys to be used when adding and retrieving
from the agent persistent cache. The AentCheck can use this manager to to ensure that the correct cache key is
used in each consistently through the different check invocations.
"""
self.keys: dict[CacheKeyType, CacheKey] = {}
self.check = check
self.default_cache_key = FullConfigCacheKey(self.check)

def __retrieve_cache_key(
self,
key_type: CacheKeyType,
default_factory: Callable[[], CacheKey] | None = None,
) -> CacheKey:
if (key := self.keys.get(key_type)) is not None:
return key

if default_factory is None:
self.check.log.warning(
"{key_type} cache key requested wihtout a default factory supplied. "
"Using default full config cache key.",
extra={"key_type": key_type.name},
)

self.keys[key_type] = default_factory() if default_factory is not None else self.default_cache_key
return self.keys[key_type]

def has_cache_key(self, key_type: CacheKeyType) -> bool:
return key_type in self.keys

def get(self, *, cache_key_type: CacheKeyType, default_factory: Callable[[], CacheKey] | None = None) -> CacheKey:
"""
Returns the cache key for the given cache key type.
"""
return self.__retrieve_cache_key(
key_type=cache_key_type,
default_factory=default_factory,
)

def add(self, *, cache_key_type: CacheKeyType, key_factory: Callable[[], CacheKey], override: bool = False):
"""
Adds the cache key for the given cache key type.
The provided cache key is only added if the cache key type is not already in the manager. To force the addition
of the cache key, set the `override` argument to `True`.
"""
if cache_key_type in self.keys and not override:
return
self.keys[cache_key_type] = key_factory()
38 changes: 36 additions & 2 deletions datadog_checks_base/tests/base/checks/test_agent_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from datadog_checks.base import AgentCheck, to_native_string
from datadog_checks.base import __version__ as base_package_version
from datadog_checks.base.utils.cache_key import CacheKey

from .utils import BaseModelTest

Expand Down Expand Up @@ -42,8 +43,7 @@ def test_check_version():


def test_persistent_cache(datadog_agent):
check = AgentCheck()
check.check_id = 'test'
check = AgentCheck(name="test")

check.write_persistent_cache('foo', 'bar')

Expand Down Expand Up @@ -558,6 +558,40 @@ def test_cursor(self, datadog_agent):
)
assert check.get_log_cursor() == {'data': '2'}

def test_cursor_with_custom_cache_key_after_restart(self):
class ConstantCacheKey(CacheKey):
def base_key(self) -> str:
return "always_the_same"

class TestCheck(AgentCheck):
def logs_persistent_cache_key(self) -> CacheKey:
return ConstantCacheKey(self)

check = TestCheck(name="test", init_config={}, instances=[{}])
check.send_log({'message': 'foo'}, cursor={'data': '1'})

assert check.get_log_cursor() == {'data': '1'}

new_check = TestCheck(name="test", init_config={}, instances=[{}])
assert new_check.get_log_cursor() == {'data': '1'}

def test_cursor_invalidated_for_different_check_name(self):
class ConstantCacheKey(CacheKey):
def base_key(self) -> str:
return "always_the_same"

class TestCheck(AgentCheck):
def logs_persistent_cache_key(self) -> CacheKey:
return ConstantCacheKey(self)

check = TestCheck(name="test", init_config={}, instances=[{}])
check.send_log({'message': 'foo'}, cursor={'data': '1'})

assert check.get_log_cursor() == {'data': '1'}

new_check = TestCheck(name="another_test", init_config={}, instances=[{}])
assert new_check.get_log_cursor() is None

def test_no_cursor(self, datadog_agent):
check = AgentCheck('check_name', {}, [{}])
check.check_id = 'test'
Expand Down
Empty file.
Loading
Loading