diff --git a/docs/guides/code_examples/service_locator/service_storage_configuration.py b/docs/guides/code_examples/service_locator/service_storage_configuration.py index 4c7370b77b..580e6d348f 100644 --- a/docs/guides/code_examples/service_locator/service_storage_configuration.py +++ b/docs/guides/code_examples/service_locator/service_storage_configuration.py @@ -1,7 +1,9 @@ import asyncio from datetime import timedelta +from crawlee import service_locator from crawlee.configuration import Configuration +from crawlee.storage_clients import MemoryStorageClient from crawlee.storages import Dataset @@ -11,10 +13,16 @@ async def main() -> None: headless=False, persist_state_interval=timedelta(seconds=30), ) + # Set the custom configuration as the global default configuration. + service_locator.set_configuration(configuration) - # Pass the configuration to the dataset (or other storage) when opening it. - dataset = await Dataset.open( - configuration=configuration, + # Use the global defaults when creating the dataset (or other storage). + dataset_1 = await Dataset.open() + + # Or set explicitly specific configuration if + # you do not want to rely on global defaults. + dataset_2 = await Dataset.open( + storage_client=MemoryStorageClient(), configuration=configuration ) diff --git a/docs/upgrading/upgrading_to_v1.md b/docs/upgrading/upgrading_to_v1.md index 3a03e8e1e7..4f94d54c3d 100644 --- a/docs/upgrading/upgrading_to_v1.md +++ b/docs/upgrading/upgrading_to_v1.md @@ -189,6 +189,97 @@ The interface for custom storage clients has been simplified: - Collection storage clients have been removed. - The number of methods that have to be implemented have been reduced. +## ServiceLocator changes + +### ServiceLocator is stricter with registering services +You can register the services just once, and you can no longer override already registered services. + +**Before (v0.6):** +```python +from crawlee import service_locator +from crawlee.storage_clients import MemoryStorageClient + +service_locator.set_storage_client(MemoryStorageClient()) +service_locator.set_storage_client(MemoryStorageClient()) +``` +**Now (v1.0):** + +```python +from crawlee import service_locator +from crawlee.storage_clients import MemoryStorageClient + +service_locator.set_storage_client(MemoryStorageClient()) +service_locator.set_storage_client(MemoryStorageClient()) # Raises an error +``` + +### BasicCrawler has its own instance of ServiceLocator to track its own services +Explicitly passed services to the crawler can be different the global ones accessible in `crawlee.service_locator`. `BasicCrawler` no longer causes the global services in `service_locator` to be set to the crawler's explicitly passed services. + +**Before (v0.6):** +```python +from crawlee import service_locator +from crawlee.crawlers import BasicCrawler +from crawlee.storage_clients import MemoryStorageClient +from crawlee.storages import Dataset + + +async def main() -> None: + custom_storage_client = MemoryStorageClient() + crawler = BasicCrawler(storage_client=custom_storage_client) + + assert service_locator.get_storage_client() is custom_storage_client + assert await crawler.get_dataset() is await Dataset.open() +``` +**Now (v1.0):** + +```python +from crawlee import service_locator +from crawlee.crawlers import BasicCrawler +from crawlee.storage_clients import MemoryStorageClient +from crawlee.storages import Dataset + + +async def main() -> None: + custom_storage_client = MemoryStorageClient() + crawler = BasicCrawler(storage_client=custom_storage_client) + + assert service_locator.get_storage_client() is not custom_storage_client + assert await crawler.get_dataset() is not await Dataset.open() +``` + +This allows two crawlers with different services at the same time. + +**Now (v1.0):** + +```python +from crawlee.crawlers import BasicCrawler +from crawlee.storage_clients import MemoryStorageClient, FileSystemStorageClient +from crawlee.configuration import Configuration +from crawlee.events import LocalEventManager + +custom_configuration_1 = Configuration() +custom_event_manager_1 = LocalEventManager.from_config(custom_configuration_1) +custom_storage_client_1 = MemoryStorageClient() + +custom_configuration_2 = Configuration() +custom_event_manager_2 = LocalEventManager.from_config(custom_configuration_2) +custom_storage_client_2 = FileSystemStorageClient() + +crawler_1 = BasicCrawler( + configuration=custom_configuration_1, + event_manager=custom_event_manager_1, + storage_client=custom_storage_client_1, +) + +crawler_2 = BasicCrawler( + configuration=custom_configuration_2, + event_manager=custom_event_manager_2, + storage_client=custom_storage_client_2, + ) + +# use crawlers without runtime crash... +``` + ## Other smaller updates There are more smaller updates. diff --git a/pyproject.toml b/pyproject.toml index 489640f7c4..b8c1279316 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ "protego>=0.5.0", "psutil>=6.0.0", "pydantic-settings>=2.2.0,!=2.7.0,!=2.7.1,!=2.8.0", - "pydantic>=2.8.0,!=2.10.0,!=2.10.1,!=2.10.2", + "pydantic>=2.11.0", "pyee>=9.0.0", "tldextract>=5.1.0", "typing-extensions>=4.1.0", diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 90d44db65e..55af9da1dd 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -113,7 +113,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter: Args: config: The `Configuration` instance. Uses the global (default) one if not provided. """ - config = service_locator.get_configuration() + config = config or service_locator.get_configuration() # Compute the maximum memory size based on the provided configuration. If `memory_mbytes` is provided, # it uses that value. Otherwise, it calculates the `max_memory_size` as a proportion of the system's diff --git a/src/crawlee/_service_locator.py b/src/crawlee/_service_locator.py index 3418e4d4e3..9c4eebcae8 100644 --- a/src/crawlee/_service_locator.py +++ b/src/crawlee/_service_locator.py @@ -11,6 +11,10 @@ if TYPE_CHECKING: from crawlee.storages._storage_instance_manager import StorageInstanceManager +from logging import getLogger + +logger = getLogger(__name__) + @docs_group('Configuration') class ServiceLocator: @@ -19,23 +23,24 @@ class ServiceLocator: All services are initialized to its default value lazily. """ - def __init__(self) -> None: - self._configuration: Configuration | None = None - self._event_manager: EventManager | None = None - self._storage_client: StorageClient | None = None - self._storage_instance_manager: StorageInstanceManager | None = None + global_storage_instance_manager: StorageInstanceManager | None = None - # Flags to check if the services were already set. - self._configuration_was_retrieved = False - self._event_manager_was_retrieved = False - self._storage_client_was_retrieved = False + def __init__( + self, + configuration: Configuration | None = None, + event_manager: EventManager | None = None, + storage_client: StorageClient | None = None, + ) -> None: + self._configuration = configuration + self._event_manager = event_manager + self._storage_client = storage_client def get_configuration(self) -> Configuration: """Get the configuration.""" if self._configuration is None: + logger.warning('No configuration set, implicitly creating and using default Configuration.') self._configuration = Configuration() - self._configuration_was_retrieved = True return self._configuration def set_configuration(self, configuration: Configuration) -> None: @@ -47,7 +52,10 @@ def set_configuration(self, configuration: Configuration) -> None: Raises: ServiceConflictError: If the configuration has already been retrieved before. """ - if self._configuration_was_retrieved: + if self._configuration is configuration: + # Same instance, no need to anything + return + if self._configuration: raise ServiceConflictError(Configuration, configuration, self._configuration) self._configuration = configuration @@ -55,13 +63,14 @@ def set_configuration(self, configuration: Configuration) -> None: def get_event_manager(self) -> EventManager: """Get the event manager.""" if self._event_manager is None: - self._event_manager = ( - LocalEventManager().from_config(config=self._configuration) - if self._configuration - else LocalEventManager.from_config() - ) + logger.warning('No event manager set, implicitly creating and using default LocalEventManager.') + if self._configuration is None: + logger.warning( + 'Implicit creation of event manager will implicitly set configuration as side effect. ' + 'It is advised to explicitly first set the configuration instead.' + ) + self._event_manager = LocalEventManager().from_config(config=self._configuration) - self._event_manager_was_retrieved = True return self._event_manager def set_event_manager(self, event_manager: EventManager) -> None: @@ -73,7 +82,10 @@ def set_event_manager(self, event_manager: EventManager) -> None: Raises: ServiceConflictError: If the event manager has already been retrieved before. """ - if self._event_manager_was_retrieved: + if self._event_manager is event_manager: + # Same instance, no need to anything + return + if self._event_manager: raise ServiceConflictError(EventManager, event_manager, self._event_manager) self._event_manager = event_manager @@ -81,9 +93,14 @@ def set_event_manager(self, event_manager: EventManager) -> None: def get_storage_client(self) -> StorageClient: """Get the storage client.""" if self._storage_client is None: + logger.warning('No storage client set, implicitly creating and using default FileSystemStorageClient.') + if self._configuration is None: + logger.warning( + 'Implicit creation of storage client will implicitly set configuration as side effect. ' + 'It is advised to explicitly first set the configuration instead.' + ) self._storage_client = FileSystemStorageClient() - self._storage_client_was_retrieved = True return self._storage_client def set_storage_client(self, storage_client: StorageClient) -> None: @@ -95,21 +112,24 @@ def set_storage_client(self, storage_client: StorageClient) -> None: Raises: ServiceConflictError: If the storage client has already been retrieved before. """ - if self._storage_client_was_retrieved: + if self._storage_client is storage_client: + # Same instance, no need to anything + return + if self._storage_client: raise ServiceConflictError(StorageClient, storage_client, self._storage_client) self._storage_client = storage_client @property def storage_instance_manager(self) -> StorageInstanceManager: - """Get the storage instance manager.""" - if self._storage_instance_manager is None: + """Get the storage instance manager. It is global manager shared by all instances of ServiceLocator.""" + if ServiceLocator.global_storage_instance_manager is None: # Import here to avoid circular imports. from crawlee.storages._storage_instance_manager import StorageInstanceManager # noqa: PLC0415 - self._storage_instance_manager = StorageInstanceManager() + ServiceLocator.global_storage_instance_manager = StorageInstanceManager() - return self._storage_instance_manager + return ServiceLocator.global_storage_instance_manager service_locator = ServiceLocator() diff --git a/src/crawlee/configuration.py b/src/crawlee/configuration.py index c825711565..c0ee228e9d 100644 --- a/src/crawlee/configuration.py +++ b/src/crawlee/configuration.py @@ -28,7 +28,7 @@ class Configuration(BaseSettings): Settings can also be configured via environment variables, prefixed with `CRAWLEE_`. """ - model_config = SettingsConfigDict(populate_by_name=True) + model_config = SettingsConfigDict(validate_by_name=True, validate_by_alias=True) internal_timeout: Annotated[timedelta | None, Field(alias='crawlee_internal_timeout')] = None """Timeout for the internal asynchronous operations.""" diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 1c49b57188..0a6d4eae87 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -27,6 +27,7 @@ from crawlee._autoscaling import AutoscaledPool, Snapshotter, SystemStatus from crawlee._log_config import configure_logger, get_configured_log_level, string_to_log_level from crawlee._request import Request, RequestOptions, RequestState +from crawlee._service_locator import ServiceLocator from crawlee._types import ( BasicCrawlingContext, EnqueueLinksKwargs, @@ -346,14 +347,23 @@ def __init__( _logger: A logger instance, typically provided by a subclass, for consistent logging labels. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. """ - if configuration: - service_locator.set_configuration(configuration) - if storage_client: - service_locator.set_storage_client(storage_client) - if event_manager: - service_locator.set_event_manager(event_manager) + implicit_event_manager_with_explicit_config = False + if not configuration: + configuration = service_locator.get_configuration() + elif not event_manager: + implicit_event_manager_with_explicit_config = True - config = service_locator.get_configuration() + if not storage_client: + storage_client = service_locator.get_storage_client() + + if not event_manager: + event_manager = service_locator.get_event_manager() + + self._service_locator = ServiceLocator( + configuration=configuration, storage_client=storage_client, event_manager=event_manager + ) + + config = self._service_locator.get_configuration() # Core components self._request_manager = request_manager @@ -419,6 +429,11 @@ def __init__( httpx_logger = logging.getLogger('httpx') # Silence HTTPX logger httpx_logger.setLevel(logging.DEBUG if get_configured_log_level() <= logging.DEBUG else logging.WARNING) self._logger = _logger or logging.getLogger(__name__) + if implicit_event_manager_with_explicit_config: + self._logger.warning( + 'No event manager set, implicitly using event manager from global service_locator.' + 'It is advised to explicitly set the event manager if explicit configuration is used as well.' + ) self._statistics_log_format = statistics_log_format # Statistics @@ -548,7 +563,10 @@ async def _get_proxy_info(self, request: Request, session: Session | None) -> Pr async def get_request_manager(self) -> RequestManager: """Return the configured request manager. If none is configured, open and return the default request queue.""" if not self._request_manager: - self._request_manager = await RequestQueue.open() + self._request_manager = await RequestQueue.open( + storage_client=self._service_locator.get_storage_client(), + configuration=self._service_locator.get_configuration(), + ) return self._request_manager @@ -560,7 +578,13 @@ async def get_dataset( alias: str | None = None, ) -> Dataset: """Return the `Dataset` with the given ID or name. If none is provided, return the default one.""" - return await Dataset.open(id=id, name=name, alias=alias) + return await Dataset.open( + id=id, + name=name, + alias=alias, + storage_client=self._service_locator.get_storage_client(), + configuration=self._service_locator.get_configuration(), + ) async def get_key_value_store( self, @@ -570,7 +594,13 @@ async def get_key_value_store( alias: str | None = None, ) -> KeyValueStore: """Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS.""" - return await KeyValueStore.open(id=id, name=name, alias=alias) + return await KeyValueStore.open( + id=id, + name=name, + alias=alias, + storage_client=self._service_locator.get_storage_client(), + configuration=self._service_locator.get_configuration(), + ) def error_handler( self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext] @@ -686,7 +716,7 @@ def sigint_handler() -> None: return final_statistics async def _run_crawler(self) -> None: - event_manager = service_locator.get_event_manager() + event_manager = self._service_locator.get_event_manager() self._crawler_state_rec_task.start() @@ -791,7 +821,13 @@ async def get_data( Returns: The retrieved data. """ - dataset = await Dataset.open(id=dataset_id, name=dataset_name, alias=dataset_alias) + dataset = await Dataset.open( + id=dataset_id, + name=dataset_name, + alias=dataset_alias, + storage_client=self._service_locator.get_storage_client(), + configuration=self._service_locator.get_configuration(), + ) return await dataset.get_data(**kwargs) async def export_data( @@ -813,7 +849,13 @@ async def export_data( dataset_name: The name of the Dataset to export from (global scope, named storage). dataset_alias: The alias of the Dataset to export from (run scope, unnamed storage). """ - dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias) + dataset = await Dataset.open( + id=dataset_id, + name=dataset_name, + alias=dataset_alias, + storage_client=self._service_locator.get_storage_client(), + configuration=self._service_locator.get_configuration(), + ) path = path if isinstance(path, Path) else Path(path) dst = path.open('w', newline='') @@ -1528,7 +1570,7 @@ def _log_status_message(self, message: str, level: LogLevel = 'DEBUG') -> None: async def _crawler_state_task(self) -> None: """Emit a persist state event with the given migration status.""" - event_manager = service_locator.get_event_manager() + event_manager = self._service_locator.get_event_manager() current_state = self.statistics.state diff --git a/src/crawlee/storage_clients/_base/_storage_client.py b/src/crawlee/storage_clients/_base/_storage_client.py index fba8507337..472574c860 100644 --- a/src/crawlee/storage_clients/_base/_storage_client.py +++ b/src/crawlee/storage_clients/_base/_storage_client.py @@ -6,6 +6,8 @@ from crawlee._utils.docs import docs_group if TYPE_CHECKING: + from collections.abc import Hashable + from crawlee.configuration import Configuration from ._dataset_client import DatasetClient @@ -28,6 +30,13 @@ class StorageClient(ABC): (where applicable), and consistent access patterns across all storage types it supports. """ + def get_additional_cache_key(self, configuration: Configuration) -> Hashable: # noqa: ARG002 + """Return a cache key that can differentiate between different storages of this client. + + Can be based on configuration or on the client itself. By default, returns an empty string. + """ + return '' + @abstractmethod async def create_dataset_client( self, diff --git a/src/crawlee/storage_clients/_file_system/_storage_client.py b/src/crawlee/storage_clients/_file_system/_storage_client.py index 94f183db2a..350d4ab534 100644 --- a/src/crawlee/storage_clients/_file_system/_storage_client.py +++ b/src/crawlee/storage_clients/_file_system/_storage_client.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import TYPE_CHECKING + from typing_extensions import override from crawlee._utils.docs import docs_group @@ -10,6 +12,9 @@ from ._key_value_store_client import FileSystemKeyValueStoreClient from ._request_queue_client import FileSystemRequestQueueClient +if TYPE_CHECKING: + from collections.abc import Hashable + @docs_group('Storage clients') class FileSystemStorageClient(StorageClient): @@ -29,6 +34,11 @@ class FileSystemStorageClient(StorageClient): Use it only when running a single crawler process at a time. """ + @override + def get_additional_cache_key(self, configuration: Configuration) -> Hashable: + # Even different client instances should return same storage if the storage_dir is the same. + return configuration.storage_dir + @override async def create_dataset_client( self, diff --git a/src/crawlee/storages/_dataset.py b/src/crawlee/storages/_dataset.py index 3e4b4c4981..ffe1df415f 100644 --- a/src/crawlee/storages/_dataset.py +++ b/src/crawlee/storages/_dataset.py @@ -107,13 +107,19 @@ async def open( configuration = service_locator.get_configuration() if configuration is None else configuration storage_client = service_locator.get_storage_client() if storage_client is None else storage_client + client_opener_coro = storage_client.create_dataset_client( + id=id, name=name, alias=alias, configuration=configuration + ) + additional_cache_key = storage_client.get_additional_cache_key(configuration=configuration) + return await service_locator.storage_instance_manager.open_storage_instance( cls, id=id, name=name, alias=alias, - configuration=configuration, - client_opener=storage_client.create_dataset_client, + client_opener_coro=client_opener_coro, + storage_client_type=storage_client.__class__, + additional_cache_key=additional_cache_key, ) @override diff --git a/src/crawlee/storages/_key_value_store.py b/src/crawlee/storages/_key_value_store.py index ee38166f23..00064e9b99 100644 --- a/src/crawlee/storages/_key_value_store.py +++ b/src/crawlee/storages/_key_value_store.py @@ -119,13 +119,19 @@ async def open( configuration = service_locator.get_configuration() if configuration is None else configuration storage_client = service_locator.get_storage_client() if storage_client is None else storage_client + client_opener_coro = storage_client.create_kvs_client( + id=id, name=name, alias=alias, configuration=configuration + ) + additional_cache_key = storage_client.get_additional_cache_key(configuration=configuration) + return await service_locator.storage_instance_manager.open_storage_instance( cls, id=id, name=name, + client_opener_coro=client_opener_coro, alias=alias, - configuration=configuration, - client_opener=storage_client.create_kvs_client, + storage_client_type=storage_client.__class__, + additional_cache_key=additional_cache_key, ) @override diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index 0141a52e06..a51cd8984d 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -125,13 +125,17 @@ async def open( configuration = service_locator.get_configuration() if configuration is None else configuration storage_client = service_locator.get_storage_client() if storage_client is None else storage_client + client_opener_coro = storage_client.create_rq_client(id=id, name=name, alias=alias, configuration=configuration) + additional_cache_key = storage_client.get_additional_cache_key(configuration=configuration) + return await service_locator.storage_instance_manager.open_storage_instance( cls, id=id, name=name, alias=alias, - configuration=configuration, - client_opener=storage_client.create_rq_client, + client_opener_coro=client_opener_coro, + storage_client_type=storage_client.__class__, + additional_cache_key=additional_cache_key, ) @override diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index ea86ac7311..fe63ae6be4 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -1,21 +1,49 @@ from __future__ import annotations -from collections.abc import Awaitable, Callable +from collections import defaultdict +from collections.abc import Coroutine, Hashable +from dataclasses import dataclass, field from typing import TYPE_CHECKING, TypeVar from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient -from ._base import Storage - if TYPE_CHECKING: - from crawlee.configuration import Configuration + from crawlee.storage_clients import StorageClient + + from ._base import Storage T = TypeVar('T', bound='Storage') + +@dataclass +class _StorageClientCache: + """Cache for specific storage client. + + Example: + Storage=Dataset, id='123', additional_cache_key="some_path" will be located in + storage = by_id[Dataset]['123'][some_path] + """ + + by_id: defaultdict[type[Storage], defaultdict[str, defaultdict[Hashable, Storage]]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict())) + ) + """Cache for storage instances by ID, separated by storage type and additional hash key.""" + + by_name: defaultdict[type[Storage], defaultdict[str, defaultdict[Hashable, Storage]]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict())) + ) + """Cache for storage instances by name, separated by storage type and additional hash key.""" + + by_alias: defaultdict[type[Storage], defaultdict[str, defaultdict[Hashable, Storage]]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict())) + ) + """Cache for storage instances by alias, separated by storage type and additional hash key.""" + + StorageClientType = DatasetClient | KeyValueStoreClient | RequestQueueClient """Type alias for the storage client types.""" -ClientOpener = Callable[..., Awaitable[StorageClientType]] +ClientOpenerCoro = Coroutine[None, None, StorageClientType] """Type alias for the client opener function.""" @@ -26,20 +54,11 @@ class StorageInstanceManager: and provides a unified interface for opening and managing storage instances. """ - _DEFAULT_STORAGE = 'default' + _DEFAULT_STORAGE_ALIAS = '__default__' + """Reserved alias for default unnamed storage.""" def __init__(self) -> None: - self._cache_by_id = dict[type[Storage], dict[str, Storage]]() - """Cache for storage instances by ID, separated by storage type.""" - - self._cache_by_name = dict[type[Storage], dict[str, Storage]]() - """Cache for storage instances by name, separated by storage type.""" - - self._cache_by_alias = dict[type[Storage], dict[str, Storage]]() - """Cache for storage instances by alias, separated by storage type.""" - - self._default_instances = dict[type[Storage], Storage]() - """Cache for default instances of each storage type.""" + self._cache_by_storage_client: dict[type[StorageClient], _StorageClientCache] = defaultdict(_StorageClientCache) async def open_storage_instance( self, @@ -48,18 +67,20 @@ async def open_storage_instance( id: str | None, name: str | None, alias: str | None, - configuration: Configuration, - client_opener: ClientOpener, + storage_client_type: type[StorageClient], + client_opener_coro: ClientOpenerCoro, + additional_cache_key: Hashable = '', ) -> T: """Open a storage instance with caching support. Args: cls: The storage class to instantiate. id: Storage ID. - name: Storage name (global scope, persists across runs). + name: Storage name. (global scope, persists across runs). alias: Storage alias (run scope, creates unnamed storage). - configuration: Configuration object. - client_opener: Function to create the storage client. + storage_client_type: Type of storage client to use. + client_opener_coro: Coroutine to open the storage client when storage instance not found in cache. + additional_cache_key: Additional optional key to differentiate cache entries. Returns: The storage instance. @@ -67,81 +88,92 @@ async def open_storage_instance( Raises: ValueError: If multiple parameters out of `id`, `name`, and `alias` are specified. """ - # Validate input parameters. - specified_params = sum(1 for param in [id, name, alias] if param is not None) - if specified_params > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - # Auto-set alias='default' when no parameters are specified. - # Default unnamed storage is equal to alias=default unnamed storage. - if specified_params == 0: - alias = self._DEFAULT_STORAGE - specified_params = 1 - - # Check cache - if id is not None: - type_cache_by_id = self._cache_by_id.get(cls, {}) - if id in type_cache_by_id: - cached_instance = type_cache_by_id[id] + try: + if name == self._DEFAULT_STORAGE_ALIAS: + raise ValueError( + f'Storage name cannot be "{self._DEFAULT_STORAGE_ALIAS}" as it is reserved for default alias.' + ) + + # Validate input parameters. + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') + + # Auto-set alias='default' when no parameters are specified. + # Default unnamed storage is equal to alias=default unnamed storage. + if specified_params == 0: + alias = self._DEFAULT_STORAGE_ALIAS + + # Check cache + if id is not None and ( + cached_instance := self._cache_by_storage_client[storage_client_type] + .by_id[cls][id] + .get(additional_cache_key) + ): if isinstance(cached_instance, cls): return cached_instance + raise RuntimeError('Cached instance type mismatch.') - if name is not None: - type_cache_by_name = self._cache_by_name.get(cls, {}) - if name in type_cache_by_name: - cached_instance = type_cache_by_name[name] + if name is not None and ( + cached_instance := self._cache_by_storage_client[storage_client_type] + .by_name[cls][name] + .get(additional_cache_key) + ): if isinstance(cached_instance, cls): return cached_instance + raise RuntimeError('Cached instance type mismatch.') - if alias is not None: - type_cache_by_alias = self._cache_by_alias.get(cls, {}) - if alias in type_cache_by_alias: - cached_instance = type_cache_by_alias[alias] + if alias is not None and ( + cached_instance := self._cache_by_storage_client[storage_client_type] + .by_alias[cls][alias] + .get(additional_cache_key) + ): if isinstance(cached_instance, cls): return cached_instance + raise RuntimeError('Cached instance type mismatch.') - # Check for conflicts between named and alias storages - if name is not None: - # Check if there's already an alias storage with the same identifier - type_cache_by_alias = self._cache_by_alias.get(cls, {}) - if name in type_cache_by_alias: + # Check for conflicts between named and alias storages + if alias and ( + self._cache_by_storage_client[storage_client_type].by_name[cls][alias].get(additional_cache_key) + ): raise ValueError( - f'Cannot create named storage "{name}" because an alias storage with the same name already exists. ' - f'Use a different name or drop the existing alias storage first.' + f'Cannot create alias storage "{alias}" because a named storage with the same name already exists. ' + f'Use a different alias or drop the existing named storage first.' ) - if alias is not None: - # Check if there's already a named storage with the same identifier - type_cache_by_name = self._cache_by_name.get(cls, {}) - if alias in type_cache_by_name: + if name and ( + self._cache_by_storage_client[storage_client_type].by_alias[cls][name].get(additional_cache_key) + ): raise ValueError( - f'Cannot create alias storage "{alias}" because a named storage with the same name already exists. ' - f'Use a different alias or drop the existing named storage first.' + f'Cannot create named storage "{name}" because an alias storage with the same name already exists. ' + f'Use a different name or drop the existing alias storage first.' ) - # Create new instance - # Pass the correct parameters to the storage client - if alias is not None: - client = await client_opener(id=id, name=None, alias=alias, configuration=configuration) - else: - client = await client_opener(id=id, name=name, configuration=configuration) - metadata = await client.get_metadata() + # Create new instance + client: KeyValueStoreClient | DatasetClient | RequestQueueClient + client = await client_opener_coro - instance = cls(client, metadata.id, metadata.name) # type: ignore[call-arg] - instance_name = getattr(instance, 'name', None) + metadata = await client.get_metadata() - # Cache the instance - type_cache_by_id = self._cache_by_id.setdefault(cls, {}) - type_cache_by_name = self._cache_by_name.setdefault(cls, {}) - type_cache_by_alias = self._cache_by_alias.setdefault(cls, {}) + instance = cls(client, metadata.id, metadata.name) # type: ignore[call-arg] + instance_name = getattr(instance, 'name', None) - type_cache_by_id[instance.id] = instance - if instance_name is not None: - type_cache_by_name[instance_name] = instance - if alias is not None: - type_cache_by_alias[alias] = instance + # Cache the instance. Always cache by id and cache named or unnamed (alias). + self._cache_by_storage_client[storage_client_type].by_id[cls][instance.id][additional_cache_key] = instance + if instance_name is not None: + self._cache_by_storage_client[storage_client_type].by_name[cls][instance_name][additional_cache_key] = ( + instance + ) + elif alias is not None: + self._cache_by_storage_client[storage_client_type].by_alias[cls][alias][additional_cache_key] = instance + else: + raise RuntimeError('Storage instance must have either a name or an alias.') - return instance + return instance + finally: + # Make sure the client opener is closed. + # If it was awaited, then closing is no operation, if it was not awaited, this is the cleanup. + client_opener_coro.close() def remove_from_cache(self, storage_instance: Storage) -> None: """Remove a storage instance from the cache. @@ -151,30 +183,23 @@ def remove_from_cache(self, storage_instance: Storage) -> None: """ storage_type = type(storage_instance) - # Remove from ID cache - type_cache_by_id = self._cache_by_id.get(storage_type, {}) - if storage_instance.id in type_cache_by_id: - del type_cache_by_id[storage_instance.id] - - # Remove from name cache - if storage_instance.name is not None: - type_cache_by_name = self._cache_by_name.get(storage_type, {}) - if storage_instance.name in type_cache_by_name: - del type_cache_by_name[storage_instance.name] - - # Remove from alias cache - need to search by instance since alias is not stored on the instance - type_cache_by_alias = self._cache_by_alias.get(storage_type, {}) - aliases_to_remove = [alias for alias, instance in type_cache_by_alias.items() if instance is storage_instance] - for alias in aliases_to_remove: - del type_cache_by_alias[alias] - - # Remove from default instances - if storage_type in self._default_instances and self._default_instances[storage_type] is storage_instance: - del self._default_instances[storage_type] + for storage_client_cache in self._cache_by_storage_client.values(): + # Remove from ID cache + for additional_key in storage_client_cache.by_id[storage_type][storage_instance.id]: + del storage_client_cache.by_id[storage_type][storage_instance.id][additional_key] + break + + # Remove from name cache or alias cache. It can never be in both. + if storage_instance.name is not None: + for additional_key in storage_client_cache.by_name[storage_type][storage_instance.name]: + del storage_client_cache.by_name[storage_type][storage_instance.name][additional_key] + break + else: + for alias_key in storage_client_cache.by_alias[storage_type]: + for additional_key in storage_client_cache.by_alias[storage_type][alias_key]: + del storage_client_cache.by_alias[storage_type][alias_key][additional_key] + break def clear_cache(self) -> None: """Clear all cached storage instances.""" - self._cache_by_id.clear() - self._cache_by_name.clear() - self._cache_by_alias.clear() - self._default_instances.clear() + self._cache_by_storage_client = defaultdict(_StorageClientCache) diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index 8317d3deae..cf6682bf31 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -225,7 +225,7 @@ def test_snapshot_pruning_keeps_recent_records_unaffected(snapshotter: Snapshott def test_memory_load_evaluation_logs_warning_on_high_usage(caplog: pytest.LogCaptureFixture) -> None: - config = Configuration(memory_mbytes=ByteSize.from_gb(8).bytes) + config = Configuration(memory_mbytes=ByteSize.from_gb(8).to_mb()) snapshotter = Snapshotter.from_config(config) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index e57f190bc3..4207810280 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -51,28 +51,14 @@ def _prepare_test_env() -> None: # Set the environment variable for the local storage directory to the temporary path. monkeypatch.setenv('CRAWLEE_STORAGE_DIR', str(tmp_path)) - # Reset the flags in the service locator to indicate that no services are explicitly set. This ensures - # a clean state, as services might have been set during a previous test and not reset properly. - service_locator._configuration_was_retrieved = False - service_locator._storage_client_was_retrieved = False - service_locator._event_manager_was_retrieved = False - # Reset the services in the service locator. service_locator._configuration = None service_locator._event_manager = None service_locator._storage_client = None - service_locator._storage_instance_manager = None - - # Reset the retrieval flags - service_locator._configuration_was_retrieved = False - service_locator._event_manager_was_retrieved = False - service_locator._storage_client_was_retrieved = False + service_locator.storage_instance_manager.clear_cache() # Verify that the test environment was set up correctly. assert os.environ.get('CRAWLEE_STORAGE_DIR') == str(tmp_path) - assert service_locator._configuration_was_retrieved is False - assert service_locator._storage_client_was_retrieved is False - assert service_locator._event_manager_was_retrieved is False return _prepare_test_env diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index eff582e603..62ede11e67 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -10,6 +10,7 @@ from collections import Counter from dataclasses import dataclass from datetime import timedelta +from itertools import product from typing import TYPE_CHECKING, Any, Literal, cast from unittest.mock import AsyncMock, Mock, call, patch @@ -27,7 +28,7 @@ from crawlee.request_loaders import RequestList, RequestManagerTandem from crawlee.sessions import Session, SessionPool from crawlee.statistics import FinalStatistics -from crawlee.storage_clients import MemoryStorageClient +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient from crawlee.storages import Dataset, KeyValueStore, RequestQueue if TYPE_CHECKING: @@ -1040,7 +1041,7 @@ async def handler(context: BasicCrawlingContext) -> None: assert stats.requests_finished == 2 -async def test_sets_services() -> None: +async def test_services_no_side_effet_on_crawler_init() -> None: custom_configuration = Configuration() custom_event_manager = LocalEventManager.from_config(custom_configuration) custom_storage_client = MemoryStorageClient() @@ -1051,9 +1052,125 @@ async def test_sets_services() -> None: storage_client=custom_storage_client, ) - assert service_locator.get_configuration() is custom_configuration - assert service_locator.get_event_manager() is custom_event_manager - assert service_locator.get_storage_client() is custom_storage_client + assert service_locator.get_configuration() is not custom_configuration + assert service_locator.get_event_manager() is not custom_event_manager + assert service_locator.get_storage_client() is not custom_storage_client + + +async def test_crawler_uses_default_services() -> None: + custom_configuration = Configuration() + service_locator.set_configuration(custom_configuration) + + custom_event_manager = LocalEventManager.from_config(custom_configuration) + service_locator.set_event_manager(custom_event_manager) + + custom_storage_client = MemoryStorageClient() + service_locator.set_storage_client(custom_storage_client) + + basic_crawler = BasicCrawler() + + assert basic_crawler._service_locator.get_configuration() is custom_configuration + assert basic_crawler._service_locator.get_event_manager() is custom_event_manager + assert basic_crawler._service_locator.get_storage_client() is custom_storage_client + + +async def test_services_crawlers_can_use_different_services() -> None: + custom_configuration_1 = Configuration() + custom_event_manager_1 = LocalEventManager.from_config(custom_configuration_1) + custom_storage_client_1 = MemoryStorageClient() + + custom_configuration_2 = Configuration() + custom_event_manager_2 = LocalEventManager.from_config(custom_configuration_2) + custom_storage_client_2 = MemoryStorageClient() + + _ = BasicCrawler( + configuration=custom_configuration_1, + event_manager=custom_event_manager_1, + storage_client=custom_storage_client_1, + ) + + _ = BasicCrawler( + configuration=custom_configuration_2, + event_manager=custom_event_manager_2, + storage_client=custom_storage_client_2, + ) + + +async def test_crawler_uses_default_storages(tmp_path: Path) -> None: + configuration = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + service_locator.set_configuration(configuration) + + dataset = await Dataset.open() + kvs = await KeyValueStore.open() + rq = await RequestQueue.open() + + crawler = BasicCrawler() + + assert dataset is await crawler.get_dataset() + assert kvs is await crawler.get_key_value_store() + assert rq is await crawler.get_request_manager() + + +async def test_crawler_can_use_other_storages(tmp_path: Path) -> None: + configuration = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + service_locator.set_configuration(configuration) + + dataset = await Dataset.open() + kvs = await KeyValueStore.open() + rq = await RequestQueue.open() + + crawler = BasicCrawler(storage_client=MemoryStorageClient()) + + assert dataset is not await crawler.get_dataset() + assert kvs is not await crawler.get_key_value_store() + assert rq is not await crawler.get_request_manager() + + +async def test_crawler_can_use_other_storages_of_same_type(tmp_path: Path) -> None: + """Test that crawler can use non-global storage of the same type as global storage without conflicts""" + a_path = tmp_path / 'a' + b_path = tmp_path / 'b' + a_path.mkdir() + b_path.mkdir() + expected_paths = { + path / storage + for path, storage in product({a_path, b_path}, {'datasets', 'key_value_stores', 'request_queues'}) + } + + configuration_a = Configuration( + crawlee_storage_dir=str(a_path), # type: ignore[call-arg] + purge_on_start=True, + ) + configuration_b = Configuration( + crawlee_storage_dir=str(b_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + # Set global configuration + service_locator.set_configuration(configuration_a) + service_locator.set_storage_client(FileSystemStorageClient()) + # Create storages based on the global services + dataset = await Dataset.open() + kvs = await KeyValueStore.open() + rq = await RequestQueue.open() + + # Set the crawler to use different storage client + crawler = BasicCrawler(storage_client=FileSystemStorageClient(), configuration=configuration_b) + + # Assert that the storages are different + assert dataset is not await crawler.get_dataset() + assert kvs is not await crawler.get_key_value_store() + assert rq is not await crawler.get_request_manager() + + # Assert that all storages exists on the filesystem + for path in expected_paths: + assert path.is_dir() async def test_allows_storage_client_overwrite_before_run(monkeypatch: pytest.MonkeyPatch) -> None: diff --git a/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py b/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py index c5f31f144e..d3e5c6d9cf 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py @@ -27,20 +27,14 @@ def configuration(tmp_path: Path) -> Configuration: @pytest.fixture async def dataset_client(configuration: Configuration) -> AsyncGenerator[FileSystemDatasetClient, None]: """A fixture for a file system dataset client.""" - client = await FileSystemStorageClient().create_dataset_client( - name='test_dataset', - configuration=configuration, - ) + client = await FileSystemStorageClient().create_dataset_client(name='test_dataset', configuration=configuration) yield client await client.drop() async def test_file_and_directory_creation(configuration: Configuration) -> None: """Test that file system dataset creates proper files and directories.""" - client = await FileSystemStorageClient().create_dataset_client( - name='new_dataset', - configuration=configuration, - ) + client = await FileSystemStorageClient().create_dataset_client(name='new_dataset', configuration=configuration) # Verify files were created assert client.path_to_dataset.exists() @@ -137,15 +131,12 @@ async def test_metadata_file_updates(dataset_client: FileSystemDatasetClient) -> assert metadata_json['item_count'] == 1 -async def test_data_persistence_across_reopens(configuration: Configuration) -> None: +async def test_data_persistence_across_reopens() -> None: """Test that data persists correctly when reopening the same dataset.""" storage_client = FileSystemStorageClient() # Create dataset and add data - original_client = await storage_client.create_dataset_client( - name='persistence-test', - configuration=configuration, - ) + original_client = await storage_client.create_dataset_client(name='persistence-test') test_data = {'test_item': 'test_value', 'id': 123} await original_client.push_data(test_data) @@ -153,10 +144,7 @@ async def test_data_persistence_across_reopens(configuration: Configuration) -> dataset_id = (await original_client.get_metadata()).id # Reopen by ID and verify data persists - reopened_client = await storage_client.create_dataset_client( - id=dataset_id, - configuration=configuration, - ) + reopened_client = await storage_client.create_dataset_client(id=dataset_id) data = await reopened_client.get_data() assert len(data.items) == 1 diff --git a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py index c5bfa96c47..b9702299a0 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py @@ -27,20 +27,14 @@ def configuration(tmp_path: Path) -> Configuration: @pytest.fixture async def kvs_client(configuration: Configuration) -> AsyncGenerator[FileSystemKeyValueStoreClient, None]: """A fixture for a file system key-value store client.""" - client = await FileSystemStorageClient().create_kvs_client( - name='test_kvs', - configuration=configuration, - ) + client = await FileSystemStorageClient().create_kvs_client(name='test_kvs', configuration=configuration) yield client await client.drop() async def test_file_and_directory_creation(configuration: Configuration) -> None: """Test that file system KVS creates proper files and directories.""" - client = await FileSystemStorageClient().create_kvs_client( - name='new_kvs', - configuration=configuration, - ) + client = await FileSystemStorageClient().create_kvs_client(name='new_kvs', configuration=configuration) # Verify files were created assert client.path_to_kvs.exists() @@ -187,10 +181,7 @@ async def test_data_persistence_across_reopens(configuration: Configuration) -> storage_client = FileSystemStorageClient() # Create KVS and add data - original_client = await storage_client.create_kvs_client( - name='persistence-test', - configuration=configuration, - ) + original_client = await storage_client.create_kvs_client(name='persistence-test', configuration=configuration) test_key = 'persistent-key' test_value = 'persistent-value' @@ -201,7 +192,6 @@ async def test_data_persistence_across_reopens(configuration: Configuration) -> # Reopen by ID and verify data persists reopened_client = await storage_client.create_kvs_client( id=kvs_id, - configuration=configuration, ) record = await reopened_client.get_value(key=test_key) diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index 0be182fcd8..dc2937a259 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -25,22 +25,18 @@ def configuration(tmp_path: Path) -> Configuration: @pytest.fixture -async def rq_client(configuration: Configuration) -> AsyncGenerator[FileSystemRequestQueueClient, None]: +async def rq_client() -> AsyncGenerator[FileSystemRequestQueueClient, None]: """A fixture for a file system request queue client.""" client = await FileSystemStorageClient().create_rq_client( name='test_request_queue', - configuration=configuration, ) yield client await client.drop() -async def test_file_and_directory_creation(configuration: Configuration) -> None: +async def test_file_and_directory_creation() -> None: """Test that file system RQ creates proper files and directories.""" - client = await FileSystemStorageClient().create_rq_client( - name='new_request_queue', - configuration=configuration, - ) + client = await FileSystemStorageClient().create_rq_client(name='new_request_queue') # Verify files were created assert client.path_to_rq.exists() @@ -135,14 +131,13 @@ async def test_metadata_file_updates(rq_client: FileSystemRequestQueueClient) -> assert metadata_json['total_request_count'] == 1 -async def test_data_persistence_across_reopens(configuration: Configuration) -> None: +async def test_data_persistence_across_reopens() -> None: """Test that requests persist correctly when reopening the same RQ.""" storage_client = FileSystemStorageClient() # Create RQ and add requests original_client = await storage_client.create_rq_client( name='persistence-test', - configuration=configuration, ) test_requests = [ @@ -156,7 +151,6 @@ async def test_data_persistence_across_reopens(configuration: Configuration) -> # Reopen by ID and verify requests persist reopened_client = await storage_client.create_rq_client( id=rq_id, - configuration=configuration, ) metadata = await reopened_client.get_metadata() diff --git a/tests/unit/storage_clients/_memory/test_memory_dataset_client.py b/tests/unit/storage_clients/_memory/test_memory_dataset_client.py index 8cc846b0f4..c503374ce7 100644 --- a/tests/unit/storage_clients/_memory/test_memory_dataset_client.py +++ b/tests/unit/storage_clients/_memory/test_memory_dataset_client.py @@ -5,7 +5,6 @@ import pytest -from crawlee.configuration import Configuration from crawlee.storage_clients import MemoryStorageClient if TYPE_CHECKING: @@ -24,12 +23,9 @@ async def dataset_client() -> AsyncGenerator[MemoryDatasetClient, None]: async def test_memory_specific_purge_behavior() -> None: """Test memory-specific purge behavior and in-memory storage characteristics.""" - configuration = Configuration(purge_on_start=True) - # Create dataset and add data dataset_client1 = await MemoryStorageClient().create_dataset_client( name='test_purge_dataset', - configuration=configuration, ) await dataset_client1.push_data({'item': 'initial data'}) @@ -40,7 +36,6 @@ async def test_memory_specific_purge_behavior() -> None: # Reopen with same storage client instance dataset_client2 = await MemoryStorageClient().create_dataset_client( name='test_purge_dataset', - configuration=configuration, ) # Verify data was purged (memory storage specific behavior) diff --git a/tests/unit/storage_clients/_memory/test_memory_kvs_client.py b/tests/unit/storage_clients/_memory/test_memory_kvs_client.py index 463fb2a14c..ef55107393 100644 --- a/tests/unit/storage_clients/_memory/test_memory_kvs_client.py +++ b/tests/unit/storage_clients/_memory/test_memory_kvs_client.py @@ -5,7 +5,6 @@ import pytest -from crawlee.configuration import Configuration from crawlee.storage_clients import MemoryStorageClient if TYPE_CHECKING: @@ -24,12 +23,10 @@ async def kvs_client() -> AsyncGenerator[MemoryKeyValueStoreClient, None]: async def test_memory_specific_purge_behavior() -> None: """Test memory-specific purge behavior and in-memory storage characteristics.""" - configuration = Configuration(purge_on_start=True) # Create KVS and add data kvs_client1 = await MemoryStorageClient().create_kvs_client( name='test_purge_kvs', - configuration=configuration, ) await kvs_client1.set_value(key='test-key', value='initial value') @@ -41,7 +38,6 @@ async def test_memory_specific_purge_behavior() -> None: # Reopen with same storage client instance kvs_client2 = await MemoryStorageClient().create_kvs_client( name='test_purge_kvs', - configuration=configuration, ) # Verify value was purged (memory storage specific behavior) diff --git a/tests/unit/storage_clients/_memory/test_memory_rq_client.py b/tests/unit/storage_clients/_memory/test_memory_rq_client.py index 7877d8af79..8bfe8632df 100644 --- a/tests/unit/storage_clients/_memory/test_memory_rq_client.py +++ b/tests/unit/storage_clients/_memory/test_memory_rq_client.py @@ -6,7 +6,6 @@ import pytest from crawlee import Request -from crawlee.configuration import Configuration from crawlee.storage_clients import MemoryStorageClient if TYPE_CHECKING: @@ -25,12 +24,9 @@ async def rq_client() -> AsyncGenerator[MemoryRequestQueueClient, None]: async def test_memory_specific_purge_behavior() -> None: """Test memory-specific purge behavior and in-memory storage characteristics.""" - configuration = Configuration(purge_on_start=True) - # Create RQ and add data rq_client1 = await MemoryStorageClient().create_rq_client( name='test_purge_rq', - configuration=configuration, ) request = Request.from_url(url='https://example.com/initial') await rq_client1.add_batch_of_requests([request]) @@ -41,7 +37,6 @@ async def test_memory_specific_purge_behavior() -> None: # Reopen with same storage client instance rq_client2 = await MemoryStorageClient().create_rq_client( name='test_purge_rq', - configuration=configuration, ) # Verify queue was purged (memory storage specific behavior) diff --git a/tests/unit/storages/conftest.py b/tests/unit/storages/conftest.py new file mode 100644 index 0000000000..e7fd22f6b5 --- /dev/null +++ b/tests/unit/storages/conftest.py @@ -0,0 +1,13 @@ +import pytest + +from crawlee import service_locator +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient, StorageClient + + +@pytest.fixture(params=['memory', 'file_system']) +def storage_client(request: pytest.FixtureRequest) -> StorageClient: + """Parameterized fixture to test with different storage clients.""" + storage_client: StorageClient + storage_client = MemoryStorageClient() if request.param == 'memory' else FileSystemStorageClient() + service_locator.set_storage_client(storage_client) + return storage_client diff --git a/tests/unit/storages/test_dataset.py b/tests/unit/storages/test_dataset.py index c8ce6daf01..188bce2e7c 100644 --- a/tests/unit/storages/test_dataset.py +++ b/tests/unit/storages/test_dataset.py @@ -3,6 +3,7 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING import pytest @@ -11,6 +12,7 @@ from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient from crawlee.storages import Dataset, KeyValueStore +from crawlee.storages._storage_instance_manager import StorageInstanceManager if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -20,33 +22,13 @@ from crawlee.storage_clients import StorageClient -@pytest.fixture(params=['memory', 'file_system']) -def storage_client(request: pytest.FixtureRequest) -> StorageClient: - """Parameterized fixture to test with different storage clients.""" - if request.param == 'memory': - return MemoryStorageClient() - - return FileSystemStorageClient() - - -@pytest.fixture -def configuration(tmp_path: Path) -> Configuration: - """Provide a configuration with a temporary storage directory.""" - return Configuration( - crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] - purge_on_start=True, - ) - - @pytest.fixture async def dataset( storage_client: StorageClient, - configuration: Configuration, ) -> AsyncGenerator[Dataset, None]: """Fixture that provides a dataset instance for each test.""" dataset = await Dataset.open( storage_client=storage_client, - configuration=configuration, ) yield dataset @@ -55,13 +37,11 @@ async def dataset( async def test_open_creates_new_dataset( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that open() creates a new dataset with proper metadata.""" dataset = await Dataset.open( name='new_dataset', storage_client=storage_client, - configuration=configuration, ) # Verify dataset properties @@ -76,13 +56,11 @@ async def test_open_creates_new_dataset( async def test_reopen_default( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test reopening a dataset with default parameters.""" # Create a first dataset instance with default parameters dataset_1 = await Dataset.open( storage_client=storage_client, - configuration=configuration, ) # Verify default properties @@ -98,7 +76,6 @@ async def test_reopen_default( # Reopen the same dataset dataset_2 = await Dataset.open( storage_client=storage_client, - configuration=configuration, ) # Verify both instances reference the same dataset @@ -117,14 +94,12 @@ async def test_reopen_default( async def test_open_by_id( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test opening a dataset by its ID.""" # First create a dataset by name dataset1 = await Dataset.open( name='dataset_by_id_test', storage_client=storage_client, - configuration=configuration, ) # Add some data to identify it @@ -135,7 +110,6 @@ async def test_open_by_id( dataset2 = await Dataset.open( id=dataset1.id, storage_client=storage_client, - configuration=configuration, ) # Verify it's the same dataset @@ -154,13 +128,11 @@ async def test_open_by_id( async def test_open_existing_dataset( dataset: Dataset, - storage_client: StorageClient, ) -> None: """Test that open() loads an existing dataset correctly.""" # Open the same dataset again reopened_dataset = await Dataset.open( name=dataset.name, - storage_client=storage_client, ) # Verify dataset properties @@ -176,7 +148,6 @@ async def test_open_existing_dataset( async def test_open_with_id_and_name( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that open() raises an error when both id and name are provided.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -184,7 +155,6 @@ async def test_open_with_id_and_name( id='some-id', name='some-name', storage_client=storage_client, - configuration=configuration, ) @@ -396,13 +366,11 @@ async def test_list_items_with_options(dataset: Dataset) -> None: async def test_drop( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test dropping a dataset removes it from cache and clears its data.""" dataset = await Dataset.open( name='drop_test', storage_client=storage_client, - configuration=configuration, ) # Add some data @@ -415,7 +383,6 @@ async def test_drop( new_dataset = await Dataset.open( name='drop_test', storage_client=storage_client, - configuration=configuration, ) result = await new_dataset.get_data() @@ -431,7 +398,6 @@ async def test_export_to_json( # Create a key-value store for export kvs = await KeyValueStore.open( name='export_kvs', - storage_client=storage_client, ) # Add some items to the dataset @@ -511,6 +477,43 @@ async def test_export_to_invalid_content_type(dataset: Dataset) -> None: ) +async def test_export_with_multiple_kwargs(dataset: Dataset, tmp_path: Path) -> None: + """Test exporting dataset using many optional arguments together.""" + target_kvs_name = 'some_kvs' + target_storage_client = FileSystemStorageClient() + export_key = 'exported_dataset' + data = {'some key': 'some data'} + + # Prepare custom directory and configuration + custom_dir_name = 'some_dir' + custom_dir = tmp_path / custom_dir_name + custom_dir.mkdir() + target_configuration = Configuration(crawlee_storage_dir=str(custom_dir)) # type: ignore[call-arg] + + # Set expected values + expected_exported_data = f'{json.dumps([{"some key": "some data"}])}' + expected_kvs_dir = custom_dir / 'key_value_stores' / target_kvs_name + + # Populate dataset and export + await dataset.push_data(data) + await dataset.export_to( + key=export_key, + content_type='json', + to_kvs_name=target_kvs_name, + to_kvs_storage_client=target_storage_client, + to_kvs_configuration=target_configuration, + ) + + # Verify the directory was created + assert expected_kvs_dir.is_dir() + # Verify that kvs contains the exported data + kvs = await KeyValueStore.open( + name=target_kvs_name, storage_client=target_storage_client, configuration=target_configuration + ) + + assert await kvs.get_value(key=export_key) == expected_exported_data + + async def test_large_dataset(dataset: Dataset) -> None: """Test handling a large dataset with many items.""" items = [{'id': i, 'value': f'value-{i}'} for i in range(100)] @@ -531,14 +534,12 @@ async def test_large_dataset(dataset: Dataset) -> None: async def test_purge( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test purging a dataset removes all data but keeps the dataset itself.""" # First create a dataset dataset = await Dataset.open( name='purge_test_dataset', storage_client=storage_client, - configuration=configuration, ) # Add some data @@ -587,19 +588,16 @@ async def test_purge( async def test_open_with_alias( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test opening datasets with alias parameter for NDU functionality.""" # Create datasets with different aliases dataset_1 = await Dataset.open( alias='test_alias_1', storage_client=storage_client, - configuration=configuration, ) dataset_2 = await Dataset.open( alias='test_alias_2', storage_client=storage_client, - configuration=configuration, ) # Verify they have different IDs but no names (unnamed) @@ -627,21 +625,18 @@ async def test_open_with_alias( async def test_alias_caching( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that datasets with same alias return same instance (cached).""" # Open dataset with alias dataset_1 = await Dataset.open( alias='cache_test', storage_client=storage_client, - configuration=configuration, ) # Open again with same alias dataset_2 = await Dataset.open( alias='cache_test', storage_client=storage_client, - configuration=configuration, ) # Should be same instance @@ -654,7 +649,6 @@ async def test_alias_caching( async def test_alias_with_id_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing both alias and id raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -662,13 +656,11 @@ async def test_alias_with_id_error( id='some-id', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_name_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing both alias and name raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -676,13 +668,11 @@ async def test_alias_with_name_error( name='some-name', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_all_parameters_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing id, name, and alias raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -691,13 +681,11 @@ async def test_alias_with_all_parameters_error( name='some-name', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_special_characters( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test alias functionality with special characters.""" special_aliases = [ @@ -713,7 +701,6 @@ async def test_alias_with_special_characters( dataset = await Dataset.open( alias=alias, storage_client=storage_client, - configuration=configuration, ) datasets.append(dataset) @@ -731,26 +718,28 @@ async def test_alias_with_special_characters( await dataset.drop() -async def test_named_vs_alias_conflict_detection() -> None: +async def test_named_vs_alias_conflict_detection( + storage_client: StorageClient, +) -> None: """Test that conflicts between named and alias storages are detected.""" # Test 1: Create named storage first, then try alias with same name - named_dataset = await Dataset.open(name='conflict_test') + named_dataset = await Dataset.open(name='conflict_test', storage_client=storage_client) assert named_dataset.name == 'conflict_test' # Try to create alias with same name - should raise error with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'): - await Dataset.open(alias='conflict_test') + await Dataset.open(alias='conflict_test', storage_client=storage_client) # Clean up await named_dataset.drop() # Test 2: Create alias first, then try named with same name - alias_dataset = await Dataset.open(alias='conflict_test2') + alias_dataset = await Dataset.open(alias='conflict_test2', storage_client=storage_client) assert alias_dataset.name is None # Alias storages have no name # Try to create named with same name - should raise error with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'): - await Dataset.open(name='conflict_test2') + await Dataset.open(name='conflict_test2', storage_client=storage_client) # Clean up await alias_dataset.drop() @@ -758,14 +747,12 @@ async def test_named_vs_alias_conflict_detection() -> None: async def test_alias_parameter( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test dataset creation and operations with alias parameter.""" # Create dataset with alias alias_dataset = await Dataset.open( alias='test_alias', storage_client=storage_client, - configuration=configuration, ) # Verify alias dataset properties @@ -783,14 +770,12 @@ async def test_alias_parameter( async def test_alias_vs_named_isolation( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that alias and named datasets with same identifier are isolated.""" # Create named dataset named_dataset = await Dataset.open( name='test_identifier', storage_client=storage_client, - configuration=configuration, ) # Verify named dataset @@ -804,7 +789,6 @@ async def test_alias_vs_named_isolation( alias_dataset = await Dataset.open( alias='test_identifier', storage_client=storage_client, - configuration=configuration, ) # Should be different instance @@ -820,20 +804,16 @@ async def test_alias_vs_named_isolation( async def test_default_vs_alias_default_equivalence( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that default dataset and alias='default' are equivalent.""" # Open default dataset default_dataset = await Dataset.open( storage_client=storage_client, - configuration=configuration, ) - # Open alias='default' dataset alias_default_dataset = await Dataset.open( - alias='default', + alias=StorageInstanceManager._DEFAULT_STORAGE_ALIAS, storage_client=storage_client, - configuration=configuration, ) # Should be the same @@ -851,7 +831,6 @@ async def test_default_vs_alias_default_equivalence( async def test_multiple_alias_isolation( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that different aliases create separate datasets.""" datasets = [] @@ -860,7 +839,6 @@ async def test_multiple_alias_isolation( dataset = await Dataset.open( alias=f'alias_{i}', storage_client=storage_client, - configuration=configuration, ) await dataset.push_data({'alias': f'alias_{i}', 'index': i}) datasets.append(dataset) @@ -1049,3 +1027,13 @@ async def test_purge_on_start_disabled(storage_client: StorageClient) -> None: await default_dataset_2.drop() await alias_dataset_2.drop() await named_dataset_2.drop() + + +async def test_name_default_not_allowed(storage_client: StorageClient) -> None: + """Test that storage can't have default alias as name, to prevent collisions with unnamed storage alias.""" + with pytest.raises( + ValueError, + match=f'Storage name cannot be "{StorageInstanceManager._DEFAULT_STORAGE_ALIAS}" as ' + f'it is reserved for default alias.', + ): + await Dataset.open(name=StorageInstanceManager._DEFAULT_STORAGE_ALIAS, storage_client=storage_client) diff --git a/tests/unit/storages/test_key_value_store.py b/tests/unit/storages/test_key_value_store.py index 1b2f9209ec..9e9dcf8c55 100644 --- a/tests/unit/storages/test_key_value_store.py +++ b/tests/unit/storages/test_key_value_store.py @@ -10,43 +10,22 @@ from crawlee import service_locator from crawlee.configuration import Configuration -from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient from crawlee.storages import KeyValueStore +from crawlee.storages._storage_instance_manager import StorageInstanceManager if TYPE_CHECKING: from collections.abc import AsyncGenerator - from pathlib import Path from crawlee.storage_clients import StorageClient -@pytest.fixture(params=['memory', 'file_system']) -def storage_client(request: pytest.FixtureRequest) -> StorageClient: - """Parameterized fixture to test with different storage clients.""" - if request.param == 'memory': - return MemoryStorageClient() - - return FileSystemStorageClient() - - -@pytest.fixture -def configuration(tmp_path: Path) -> Configuration: - """Provide a configuration with a temporary storage directory.""" - return Configuration( - crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] - purge_on_start=True, - ) - - @pytest.fixture async def kvs( storage_client: StorageClient, - configuration: Configuration, ) -> AsyncGenerator[KeyValueStore, None]: """Fixture that provides a key-value store instance for each test.""" kvs = await KeyValueStore.open( storage_client=storage_client, - configuration=configuration, ) yield kvs @@ -55,13 +34,11 @@ async def kvs( async def test_open_creates_new_kvs( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that open() creates a new key-value store with proper metadata.""" kvs = await KeyValueStore.open( name='new_kvs', storage_client=storage_client, - configuration=configuration, ) # Verify key-value store properties @@ -92,7 +69,6 @@ async def test_open_existing_kvs( async def test_open_with_id_and_name( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that open() raises an error when both id and name are provided.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -100,20 +76,17 @@ async def test_open_with_id_and_name( id='some-id', name='some-name', storage_client=storage_client, - configuration=configuration, ) async def test_open_by_id( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test opening a key-value store by its ID.""" # First create a key-value store by name kvs1 = await KeyValueStore.open( name='kvs_by_id_test', storage_client=storage_client, - configuration=configuration, ) # Add some data to identify it @@ -123,7 +96,6 @@ async def test_open_by_id( kvs2 = await KeyValueStore.open( id=kvs1.id, storage_client=storage_client, - configuration=configuration, ) # Verify it's the same key-value store @@ -295,13 +267,11 @@ async def test_iterate_keys_with_limit(kvs: KeyValueStore) -> None: async def test_drop( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test dropping a key-value store removes it from cache and clears its data.""" kvs = await KeyValueStore.open( name='drop_test', storage_client=storage_client, - configuration=configuration, ) # Add some data @@ -314,7 +284,6 @@ async def test_drop( new_kvs = await KeyValueStore.open( name='drop_test', storage_client=storage_client, - configuration=configuration, ) # Attempt to get a previously stored value @@ -325,13 +294,11 @@ async def test_drop( async def test_reopen_default( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test reopening the default key-value store.""" # Open the default key-value store kvs1 = await KeyValueStore.open( storage_client=storage_client, - configuration=configuration, ) # Set a value @@ -340,7 +307,6 @@ async def test_reopen_default( # Open the default key-value store again kvs2 = await KeyValueStore.open( storage_client=storage_client, - configuration=configuration, ) # Verify they are the same store @@ -417,16 +383,16 @@ async def test_key_with_special_characters(kvs: KeyValueStore) -> None: assert await kvs.get_value(key=special_key) is None -async def test_data_persistence_on_reopen(configuration: Configuration) -> None: +async def test_data_persistence_on_reopen() -> None: """Test that data persists when reopening a KeyValueStore.""" - kvs1 = await KeyValueStore.open(configuration=configuration) + kvs1 = await KeyValueStore.open() await kvs1.set_value('key_123', 'value_123') result1 = await kvs1.get_value('key_123') assert result1 == 'value_123' - kvs2 = await KeyValueStore.open(configuration=configuration) + kvs2 = await KeyValueStore.open() result2 = await kvs2.get_value('key_123') assert result2 == 'value_123' @@ -440,14 +406,12 @@ async def test_data_persistence_on_reopen(configuration: Configuration) -> None: async def test_purge( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test purging a key-value store removes all values but keeps the store itself.""" # First create a key-value store kvs = await KeyValueStore.open( name='purge_test_kvs', storage_client=storage_client, - configuration=configuration, ) # Add some values @@ -603,19 +567,16 @@ async def test_record_exists_after_purge(kvs: KeyValueStore) -> None: async def test_open_with_alias( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test opening key-value stores with alias parameter for NDU functionality.""" # Create key-value stores with different aliases kvs_1 = await KeyValueStore.open( alias='test_alias_1', storage_client=storage_client, - configuration=configuration, ) kvs_2 = await KeyValueStore.open( alias='test_alias_2', storage_client=storage_client, - configuration=configuration, ) # Verify they have different IDs but no names (unnamed) @@ -641,21 +602,18 @@ async def test_open_with_alias( async def test_alias_caching( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that key-value stores with same alias return same instance (cached).""" # Open kvs with alias kvs_1 = await KeyValueStore.open( alias='cache_test', storage_client=storage_client, - configuration=configuration, ) # Open again with same alias kvs_2 = await KeyValueStore.open( alias='cache_test', storage_client=storage_client, - configuration=configuration, ) # Should be same instance @@ -668,7 +626,6 @@ async def test_alias_caching( async def test_alias_with_id_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing both alias and id raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -676,13 +633,11 @@ async def test_alias_with_id_error( id='some-id', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_name_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing both alias and name raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -690,13 +645,11 @@ async def test_alias_with_name_error( name='some-name', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_special_characters( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test alias functionality with special characters.""" special_aliases = [ @@ -712,7 +665,6 @@ async def test_alias_with_special_characters( kvs = await KeyValueStore.open( alias=alias, storage_client=storage_client, - configuration=configuration, ) stores.append(kvs) @@ -732,13 +684,11 @@ async def test_alias_with_special_characters( async def test_alias_key_operations( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that key operations work correctly with alias stores.""" kvs = await KeyValueStore.open( alias='key_ops_test', storage_client=storage_client, - configuration=configuration, ) # Test setting multiple keys @@ -781,33 +731,35 @@ async def test_alias_key_operations( await kvs.drop() -async def test_named_vs_alias_conflict_detection() -> None: +async def test_named_vs_alias_conflict_detection( + storage_client: StorageClient, +) -> None: """Test that conflicts between named and alias storages are detected.""" # Test 1: Create named storage first, then try alias with same name - named_kvs = await KeyValueStore.open(name='conflict_test') + named_kvs = await KeyValueStore.open(name='conflict_test', storage_client=storage_client) assert named_kvs.name == 'conflict_test' # Try to create alias with same name - should raise error with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'): - await KeyValueStore.open(alias='conflict_test') + await KeyValueStore.open(alias='conflict_test', storage_client=storage_client) # Clean up await named_kvs.drop() # Test 2: Create alias first, then try named with same name - alias_kvs = await KeyValueStore.open(alias='conflict_test2') + alias_kvs = await KeyValueStore.open(alias='conflict_test2', storage_client=storage_client) assert alias_kvs.name is None # Alias storages have no name # Try to create named with same name - should raise error with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'): - await KeyValueStore.open(name='conflict_test2') + await KeyValueStore.open(name='conflict_test2', storage_client=storage_client) # Clean up await alias_kvs.drop() # Test 3: Different names should work fine - named_kvs_ok = await KeyValueStore.open(name='different_name') - alias_kvs_ok = await KeyValueStore.open(alias='different_alias') + named_kvs_ok = await KeyValueStore.open(name='different_name', storage_client=storage_client) + alias_kvs_ok = await KeyValueStore.open(alias='different_alias', storage_client=storage_client) assert named_kvs_ok.name == 'different_name' assert alias_kvs_ok.name is None @@ -819,14 +771,12 @@ async def test_named_vs_alias_conflict_detection() -> None: async def test_alias_parameter( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test key-value store creation and operations with alias parameter.""" # Create kvs with alias alias_kvs = await KeyValueStore.open( alias='test_alias', storage_client=storage_client, - configuration=configuration, ) # Verify alias kvs properties @@ -843,14 +793,12 @@ async def test_alias_parameter( async def test_alias_vs_named_isolation( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that alias and named key-value stores with same identifier are isolated.""" # Create named kvs named_kvs = await KeyValueStore.open( name='test_identifier', storage_client=storage_client, - configuration=configuration, ) # Verify named kvs @@ -864,7 +812,6 @@ async def test_alias_vs_named_isolation( alias_kvs = await KeyValueStore.open( alias='test_identifier', storage_client=storage_client, - configuration=configuration, ) # Should be different instance @@ -880,20 +827,16 @@ async def test_alias_vs_named_isolation( async def test_default_vs_alias_default_equivalence( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that default key-value store and alias='default' are equivalent.""" # Open default kvs default_kvs = await KeyValueStore.open( storage_client=storage_client, - configuration=configuration, ) - # Open alias='default' kvs alias_default_kvs = await KeyValueStore.open( - alias='default', + alias=StorageInstanceManager._DEFAULT_STORAGE_ALIAS, storage_client=storage_client, - configuration=configuration, ) # Should be the same @@ -911,7 +854,6 @@ async def test_default_vs_alias_default_equivalence( async def test_multiple_alias_isolation( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that different aliases create separate key-value stores.""" kvs_stores = [] @@ -920,7 +862,6 @@ async def test_multiple_alias_isolation( kvs = await KeyValueStore.open( alias=f'alias_{i}', storage_client=storage_client, - configuration=configuration, ) await kvs.set_value('alias', f'alias_{i}') await kvs.set_value('index', i) @@ -1100,3 +1041,13 @@ async def test_purge_on_start_disabled(storage_client: StorageClient) -> None: await named_kvs_2.drop() await alias_kvs_2.drop() await default_kvs_2.drop() + + +async def test_name_default_not_allowed(storage_client: StorageClient) -> None: + """Test that storage can't have default alias as name, to prevent collisions with unnamed storage alias.""" + with pytest.raises( + ValueError, + match=f'Storage name cannot be "{StorageInstanceManager._DEFAULT_STORAGE_ALIAS}" as ' + f'it is reserved for default alias.', + ): + await KeyValueStore.open(name=StorageInstanceManager._DEFAULT_STORAGE_ALIAS, storage_client=storage_client) diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index b2bfda0394..11d2eb95da 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -10,43 +10,23 @@ from crawlee import Request, service_locator from crawlee.configuration import Configuration -from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient, StorageClient +from crawlee.storage_clients import StorageClient from crawlee.storages import RequestQueue +from crawlee.storages._storage_instance_manager import StorageInstanceManager if TYPE_CHECKING: from collections.abc import AsyncGenerator - from pathlib import Path from crawlee.storage_clients import StorageClient -@pytest.fixture(params=['memory', 'file_system']) -def storage_client(request: pytest.FixtureRequest) -> StorageClient: - """Parameterized fixture to test with different storage clients.""" - if request.param == 'memory': - return MemoryStorageClient() - - return FileSystemStorageClient() - - -@pytest.fixture -def configuration(tmp_path: Path) -> Configuration: - """Provide a configuration with a temporary storage directory.""" - return Configuration( - crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] - purge_on_start=True, - ) - - @pytest.fixture async def rq( storage_client: StorageClient, - configuration: Configuration, ) -> AsyncGenerator[RequestQueue, None]: """Fixture that provides a request queue instance for each test.""" rq = await RequestQueue.open( storage_client=storage_client, - configuration=configuration, ) yield rq @@ -55,13 +35,11 @@ async def rq( async def test_open_creates_new_rq( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that open() creates a new request queue with proper metadata.""" rq = await RequestQueue.open( name='new_request_queue', storage_client=storage_client, - configuration=configuration, ) # Verify request queue properties @@ -96,7 +74,6 @@ async def test_open_existing_rq( async def test_open_with_id_and_name( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that open() raises an error when both id and name are provided.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -104,20 +81,17 @@ async def test_open_with_id_and_name( id='some-id', name='some-name', storage_client=storage_client, - configuration=configuration, ) async def test_open_by_id( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test opening a request queue by its ID.""" # First create a request queue by name rq1 = await RequestQueue.open( name='rq_by_id_test', storage_client=storage_client, - configuration=configuration, ) # Add a request to identify it @@ -127,7 +101,6 @@ async def test_open_by_id( rq2 = await RequestQueue.open( id=rq1.id, storage_client=storage_client, - configuration=configuration, ) # Verify it's the same request queue @@ -498,13 +471,11 @@ async def test_reclaim_non_existent_request(rq: RequestQueue) -> None: async def test_drop( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test dropping a request queue removes it from cache and clears its data.""" rq = await RequestQueue.open( name='drop_test', storage_client=storage_client, - configuration=configuration, ) # Add a request @@ -517,7 +488,6 @@ async def test_drop( new_rq = await RequestQueue.open( name='drop_test', storage_client=storage_client, - configuration=configuration, ) # Verify the queue is empty @@ -530,7 +500,6 @@ async def test_drop( async def test_reopen_default( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test reopening the default request queue.""" # First clean up any storage instance caches @@ -540,7 +509,6 @@ async def test_reopen_default( # Open the default request queue rq1 = await RequestQueue.open( storage_client=storage_client, - configuration=configuration, ) # If a request queue already exists (due to previous test run), purge it to start fresh @@ -551,7 +519,6 @@ async def test_reopen_default( await rq1.drop() rq1 = await RequestQueue.open( storage_client=storage_client, - configuration=configuration, ) # Verify we're starting fresh @@ -568,7 +535,6 @@ async def test_reopen_default( # Open the default request queue again rq2 = await RequestQueue.open( storage_client=storage_client, - configuration=configuration, ) # Verify they are the same queue @@ -591,14 +557,12 @@ async def test_reopen_default( async def test_purge( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test purging a request queue removes all requests but keeps the queue itself.""" # First create a request queue rq = await RequestQueue.open( name='purge_test_queue', storage_client=storage_client, - configuration=configuration, ) # Add some requests @@ -646,19 +610,16 @@ async def test_purge( async def test_open_with_alias( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test opening request queues with alias parameter for NDU functionality.""" # Create request queues with different aliases rq_1 = await RequestQueue.open( alias='test_alias_1', storage_client=storage_client, - configuration=configuration, ) rq_2 = await RequestQueue.open( alias='test_alias_2', storage_client=storage_client, - configuration=configuration, ) # Verify they have different IDs but no names (unnamed) @@ -687,21 +648,18 @@ async def test_open_with_alias( async def test_alias_caching( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that request queues with same alias return same instance (cached).""" # Open rq with alias rq_1 = await RequestQueue.open( alias='cache_test', storage_client=storage_client, - configuration=configuration, ) # Open again with same alias rq_2 = await RequestQueue.open( alias='cache_test', storage_client=storage_client, - configuration=configuration, ) # Should be same instance @@ -714,7 +672,6 @@ async def test_alias_caching( async def test_alias_with_id_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing both alias and id raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -722,13 +679,11 @@ async def test_alias_with_id_error( id='some-id', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_name_error( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that providing both alias and name raises error.""" with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): @@ -736,13 +691,11 @@ async def test_alias_with_name_error( name='some-name', alias='some-alias', storage_client=storage_client, - configuration=configuration, ) async def test_alias_with_special_characters( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test alias functionality with special characters.""" special_aliases = [ @@ -758,7 +711,6 @@ async def test_alias_with_special_characters( rq = await RequestQueue.open( alias=alias, storage_client=storage_client, - configuration=configuration, ) queues.append(rq) @@ -778,13 +730,11 @@ async def test_alias_with_special_characters( async def test_alias_request_operations( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that request operations work correctly with alias queues.""" rq = await RequestQueue.open( alias='request_ops_test', storage_client=storage_client, - configuration=configuration, ) # Test adding multiple requests @@ -828,13 +778,11 @@ async def test_alias_request_operations( async def test_alias_forefront_operations( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test forefront operations work correctly with alias queues.""" rq = await RequestQueue.open( alias='forefront_test', storage_client=storage_client, - configuration=configuration, ) # Add normal requests @@ -860,13 +808,11 @@ async def test_alias_forefront_operations( async def test_alias_batch_operations( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test batch operations work correctly with alias queues.""" rq = await RequestQueue.open( alias='batch_test', storage_client=storage_client, - configuration=configuration, ) # Test batch adding @@ -889,26 +835,31 @@ async def test_alias_batch_operations( await rq.drop() -async def test_named_vs_alias_conflict_detection() -> None: +async def test_named_vs_alias_conflict_detection( + storage_client: StorageClient, +) -> None: """Test that conflicts between named and alias storages are detected.""" # Test 1: Create named storage first, then try alias with same name - named_rq = await RequestQueue.open(name='conflict_test') + named_rq = await RequestQueue.open( + name='conflict_test', + storage_client=storage_client, + ) assert named_rq.name == 'conflict_test' # Try to create alias with same name - should raise error with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'): - await RequestQueue.open(alias='conflict_test') + await RequestQueue.open(alias='conflict_test', storage_client=storage_client) # Clean up await named_rq.drop() # Test 2: Create alias first, then try named with same name - alias_rq = await RequestQueue.open(alias='conflict_test2') + alias_rq = await RequestQueue.open(alias='conflict_test2', storage_client=storage_client) assert alias_rq.name is None # Alias storages have no name # Try to create named with same name - should raise error with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'): - await RequestQueue.open(name='conflict_test2') + await RequestQueue.open(name='conflict_test2', storage_client=storage_client) # Clean up await alias_rq.drop() @@ -927,14 +878,12 @@ async def test_named_vs_alias_conflict_detection() -> None: async def test_alias_parameter( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test request queue creation and operations with alias parameter.""" # Create request queue with alias alias_rq = await RequestQueue.open( alias='test_alias', storage_client=storage_client, - configuration=configuration, ) # Verify alias request queue properties @@ -951,14 +900,12 @@ async def test_alias_parameter( async def test_alias_vs_named_isolation( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that alias and named request queues with same identifier are isolated.""" # Create named request queue named_rq = await RequestQueue.open( name='test_identifier', storage_client=storage_client, - configuration=configuration, ) # Verify named request queue @@ -972,7 +919,6 @@ async def test_alias_vs_named_isolation( alias_rq = await RequestQueue.open( alias='test_identifier', storage_client=storage_client, - configuration=configuration, ) # Should be different instance @@ -989,20 +935,16 @@ async def test_alias_vs_named_isolation( async def test_default_vs_alias_default_equivalence( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that default request queue and alias='default' are equivalent.""" # Open default request queue default_rq = await RequestQueue.open( storage_client=storage_client, - configuration=configuration, ) - # Open alias='default' request queue alias_default_rq = await RequestQueue.open( - alias='default', + alias=StorageInstanceManager._DEFAULT_STORAGE_ALIAS, storage_client=storage_client, - configuration=configuration, ) # Should be the same @@ -1020,7 +962,6 @@ async def test_default_vs_alias_default_equivalence( async def test_multiple_alias_isolation( storage_client: StorageClient, - configuration: Configuration, ) -> None: """Test that different aliases create separate request queues.""" request_queues = [] @@ -1029,7 +970,6 @@ async def test_multiple_alias_isolation( rq = await RequestQueue.open( alias=f'alias_{i}', storage_client=storage_client, - configuration=configuration, ) await rq.add_request(f'https://example.com/alias_{i}') request_queues.append(rq) @@ -1297,3 +1237,13 @@ async def test_purge_on_start_disabled(storage_client: StorageClient) -> None: await named_rq_2.drop() await alias_rq_2.drop() await default_rq_2.drop() + + +async def test_name_default_not_allowed(storage_client: StorageClient) -> None: + """Test that storage can't have default alias as name, to prevent collisions with unnamed storage alias.""" + with pytest.raises( + ValueError, + match=f'Storage name cannot be "{StorageInstanceManager._DEFAULT_STORAGE_ALIAS}" as ' + f'it is reserved for default alias.', + ): + await RequestQueue.open(name=StorageInstanceManager._DEFAULT_STORAGE_ALIAS, storage_client=storage_client) diff --git a/tests/unit/storages/test_storage_instance_manager.py b/tests/unit/storages/test_storage_instance_manager.py new file mode 100644 index 0000000000..730279f9a6 --- /dev/null +++ b/tests/unit/storages/test_storage_instance_manager.py @@ -0,0 +1,130 @@ +from pathlib import Path +from typing import cast + +import pytest + +from crawlee import service_locator +from crawlee.configuration import Configuration +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient +from crawlee.storages import Dataset, KeyValueStore, RequestQueue +from crawlee.storages._base import Storage + + +@pytest.fixture(autouse=True) +def clean_storage_instance_manager() -> None: + """Helper function to clean the storage instance manager before each test.""" + service_locator.storage_instance_manager.clear_cache() + + +@pytest.fixture(params=[KeyValueStore, Dataset, RequestQueue]) +def storage_type(request: pytest.FixtureRequest) -> type[Storage]: + return cast('type[Storage]', request.param) + + +async def test_unique_storage_by_storage_client(tmp_path: Path, storage_type: type[Storage]) -> None: + config = Configuration( + purge_on_start=True, + ) + config.storage_dir = str(tmp_path) + + storage_1 = await storage_type.open(storage_client=MemoryStorageClient(), configuration=config) + storage_2 = await storage_type.open(storage_client=FileSystemStorageClient(), configuration=config) + assert storage_1 is not storage_2 + + +async def test_same_storage_when_different_client(tmp_path: Path, storage_type: type[Storage]) -> None: + config = Configuration( + purge_on_start=True, + ) + config.storage_dir = str(tmp_path) + + storage_1 = await storage_type.open(storage_client=MemoryStorageClient(), configuration=config) + storage_2 = await storage_type.open(storage_client=MemoryStorageClient(), configuration=config) + assert storage_1 is storage_2 + + +async def test_unique_storage_by_storage_type(tmp_path: Path) -> None: + config = Configuration( + purge_on_start=True, + ) + config.storage_dir = str(tmp_path) + storage_client = MemoryStorageClient() + + kvs = await KeyValueStore.open(storage_client=storage_client, configuration=config) + dataset = await Dataset.open(storage_client=storage_client, configuration=config) + assert kvs is not dataset + + +async def test_unique_storage_by_name(storage_type: type[Storage]) -> None: + """Test that StorageInstanceManager support different storage clients at the same time.""" + storage_client = MemoryStorageClient() + + storage_1 = await storage_type.open(storage_client=storage_client, name='kvs1') + storage_2 = await storage_type.open(storage_client=storage_client, name='kvs2') + assert storage_1 is not storage_2 + + +async def test_unique_storage_by_unique_cache_key_different_path(tmp_path: Path, storage_type: type[Storage]) -> None: + """Test that StorageInstanceManager support unique cache key. Difference in storage_dir.""" + path_1 = tmp_path / 'dir1' + path_2 = tmp_path / 'dir2' + path_1.mkdir() + path_2.mkdir() + + config_1 = Configuration() + config_1.storage_dir = str(path_1) + + config_2 = Configuration() + config_2.storage_dir = str(path_2) + + storage_client = FileSystemStorageClient() + + storage_1 = await storage_type.open(storage_client=storage_client, configuration=config_1) + storage_2 = await storage_type.open(storage_client=storage_client, configuration=config_2) + assert storage_1 is not storage_2 + + +async def test_unique_storage_by_unique_cache_key_same_path(tmp_path: Path, storage_type: type[Storage]) -> None: + """Test that StorageInstanceManager support unique cache key. Different configs with same storage_dir create same + storage.""" + config_1 = Configuration() + config_1.storage_dir = str(tmp_path) + + config_2 = Configuration() + config_2.storage_dir = str(tmp_path) + + storage_client = FileSystemStorageClient() + + storage_1 = await storage_type.open(storage_client=storage_client, configuration=config_1) + storage_2 = await storage_type.open(storage_client=storage_client, configuration=config_2) + assert storage_1 is storage_2 + + +async def test_identical_storage_default_config(storage_type: type[Storage]) -> None: + """Test that StorageInstanceManager correctly caches storage based on the storage client.""" + storage_client = MemoryStorageClient() + + storage_1 = await storage_type.open(storage_client=storage_client) + storage_2 = await storage_type.open(storage_client=storage_client) + assert storage_1 is storage_2 + + +async def test_identical_storage_default_storage(storage_type: type[Storage]) -> None: + """Test that StorageInstanceManager correctly caches storage based on the storage client.""" + storage_1 = await storage_type.open() + storage_2 = await storage_type.open() + assert storage_1 is storage_2 + + +async def test_identical_storage_clear_cache(storage_type: type[Storage]) -> None: + storage_1 = await storage_type.open() + service_locator.storage_instance_manager.clear_cache() + storage_2 = await storage_type.open() + assert storage_1 is not storage_2 + + +async def test_identical_storage_remove_from_cache(storage_type: type[Storage]) -> None: + storage_1 = await storage_type.open() + service_locator.storage_instance_manager.remove_from_cache(storage_1) + storage_2 = await storage_type.open() + assert storage_1 is not storage_2 diff --git a/tests/unit/test_service_locator.py b/tests/unit/test_service_locator.py index 7c29f31254..b9623c699b 100644 --- a/tests/unit/test_service_locator.py +++ b/tests/unit/test_service_locator.py @@ -22,13 +22,13 @@ def test_custom_configuration() -> None: assert config is custom_config -def test_configuration_overwrite() -> None: +def test_configuration_overwrite_not_possible() -> None: default_config = Configuration() service_locator.set_configuration(default_config) custom_config = Configuration(default_browser_path='custom_path') - service_locator.set_configuration(custom_config) - assert service_locator.get_configuration() is custom_config + with pytest.raises(ServiceConflictError): + service_locator.set_configuration(custom_config) def test_configuration_conflict() -> None: @@ -51,15 +51,13 @@ def test_custom_event_manager() -> None: assert event_manager is custom_event_manager -def test_event_manager_overwrite() -> None: +def test_event_manager_overwrite_not_possible() -> None: custom_event_manager = LocalEventManager() service_locator.set_event_manager(custom_event_manager) another_custom_event_manager = LocalEventManager() - service_locator.set_event_manager(another_custom_event_manager) - - assert custom_event_manager != another_custom_event_manager - assert service_locator.get_event_manager() is another_custom_event_manager + with pytest.raises(ServiceConflictError): + service_locator.set_event_manager(another_custom_event_manager) def test_event_manager_conflict() -> None: @@ -82,15 +80,13 @@ def test_custom_storage_client() -> None: assert storage_client is custom_storage_client -def test_storage_client_overwrite() -> None: +def test_storage_client_overwrite_not_possible() -> None: custom_storage_client = MemoryStorageClient() service_locator.set_storage_client(custom_storage_client) another_custom_storage_client = MemoryStorageClient() - service_locator.set_storage_client(another_custom_storage_client) - - assert custom_storage_client != another_custom_storage_client - assert service_locator.get_storage_client() is another_custom_storage_client + with pytest.raises(ServiceConflictError): + service_locator.set_storage_client(another_custom_storage_client) def test_storage_client_conflict() -> None: diff --git a/uv.lock b/uv.lock index 26b5e8ee78..3fba304a73 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.13'", @@ -716,7 +716,7 @@ requires-dist = [ { name = "playwright", marker = "extra == 'playwright'", specifier = ">=1.27.0" }, { name = "protego", specifier = ">=0.5.0" }, { name = "psutil", specifier = ">=6.0.0" }, - { name = "pydantic", specifier = ">=2.8.0,!=2.10.0,!=2.10.1,!=2.10.2" }, + { name = "pydantic", specifier = ">=2.11.0" }, { name = "pydantic-settings", specifier = ">=2.2.0,!=2.7.0,!=2.7.1,!=2.8.0" }, { name = "pyee", specifier = ">=9.0.0" }, { name = "rich", marker = "extra == 'cli'", specifier = ">=13.9.0" },