diff --git a/docs/references/executor.md b/docs/references/executor.md index e8eb86ba8..91c415359 100644 --- a/docs/references/executor.md +++ b/docs/references/executor.md @@ -3,4 +3,3 @@ members: - Executor - run_async_batch - - is_event_loop_running diff --git a/ragas/src/ragas/async_utils.py b/ragas/src/ragas/async_utils.py index f327955c2..57fd2122f 100644 --- a/ragas/src/ragas/async_utils.py +++ b/ragas/src/ragas/async_utils.py @@ -1,89 +1,165 @@ """Async utils.""" import asyncio -from typing import Any, Coroutine, List, Optional +import logging +import typing as t from tqdm.auto import tqdm -from ragas.executor import is_event_loop_running -from ragas.utils import batched +logger = logging.getLogger(__name__) + + +def apply_nest_asyncio(): + NEST_ASYNCIO_APPLIED: bool = False + if is_event_loop_running(): + # an event loop is running so call nested_asyncio to fix this + try: + import nest_asyncio + except ImportError: + raise ImportError( + "It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work." + ) + + if not NEST_ASYNCIO_APPLIED: + nest_asyncio.apply() + NEST_ASYNCIO_APPLIED = True + + +def is_event_loop_running() -> bool: + """ + Check if an event loop is currently running. + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return False + else: + return loop.is_running() + + +def as_completed( + coroutines: t.Sequence[t.Coroutine], max_workers: int = -1 +) -> t.Iterator[asyncio.Future]: + """ + Wrap coroutines with a semaphore if max_workers is specified. + + Returns an iterator of futures that completes as tasks finish. + """ + if max_workers == -1: + tasks = [asyncio.create_task(coro) for coro in coroutines] + else: + semaphore = asyncio.Semaphore(max_workers) + + async def sema_coro(coro): + async with semaphore: + return await coro + + tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines] + + return asyncio.as_completed(tasks) + + +async def process_futures( + futures: t.Iterator[asyncio.Future], pbar: t.Optional[tqdm] = None +) -> t.AsyncGenerator[t.Any, None]: + """ + Process futures with optional progress tracking. + + Args: + futures: Iterator of asyncio futures to process (e.g., from asyncio.as_completed) + pbar: Optional progress bar to update + + Yields: + Results from completed futures as they finish + """ + # Process completed futures as they finish + for future in futures: + try: + result = await future + except Exception as e: + result = e + + if pbar: + pbar.update(1) + yield result + + +def run( + async_func: t.Union[ + t.Callable[[], t.Coroutine[t.Any, t.Any, t.Any]], + t.Coroutine[t.Any, t.Any, t.Any], + ], +) -> t.Any: + """Run an async function in the current event loop or a new one if not running.""" + try: + # Check if we're already in a running event loop + loop = asyncio.get_running_loop() + # If we get here, we're in a running loop - need nest_asyncio + try: + import nest_asyncio + except ImportError as e: + raise ImportError( + "It seems like you're running this in a jupyter-like environment. " + "Please install nest_asyncio with `pip install nest_asyncio` to make it work." + ) from e + + nest_asyncio.apply() + # Create the coroutine if it's a callable, otherwise use directly + coro = async_func() if callable(async_func) else async_func + return loop.run_until_complete(coro) + + except RuntimeError: + # No running event loop, so we can use asyncio.run + coro = async_func() if callable(async_func) else async_func + return asyncio.run(coro) def run_async_tasks( - tasks: List[Coroutine], - batch_size: Optional[int] = None, + tasks: t.Sequence[t.Coroutine], + batch_size: t.Optional[int] = None, show_progress: bool = True, progress_bar_desc: str = "Running async tasks", -) -> List[Any]: + max_workers: int = -1, +) -> t.List[t.Any]: """ Execute async tasks with optional batching and progress tracking. NOTE: Order of results is not guaranteed! Args: - tasks: List of coroutines to execute + tasks: Sequence of coroutines to execute batch_size: Optional size for batching tasks. If None, runs all concurrently show_progress: Whether to display progress bars + max_workers: Maximum number of concurrent tasks (-1 for unlimited) """ + from ragas.utils import ProgressBarManager, batched async def _run(): total_tasks = len(tasks) results = [] + pbm = ProgressBarManager(progress_bar_desc, show_progress) - # If no batching, run all tasks concurrently with single progress bar if not batch_size: - with tqdm( - total=total_tasks, - desc=progress_bar_desc, - disable=not show_progress, - ) as pbar: - for future in asyncio.as_completed(tasks): - result = await future - results.append(result) - pbar.update(1) - return results - - # With batching, show nested progress bars - batches = batched(tasks, batch_size) # generator - n_batches = (total_tasks + batch_size - 1) // batch_size - with ( - tqdm( - total=total_tasks, - desc=progress_bar_desc, - disable=not show_progress, - position=0, - leave=True, - ) as overall_pbar, - tqdm( - total=batch_size, - desc=f"Batch 1/{n_batches}", - disable=not show_progress, - position=1, - leave=False, - ) as batch_pbar, - ): - for i, batch in enumerate(batches, 1): - batch_pbar.reset(total=len(batch)) - batch_pbar.set_description(f"Batch {i}/{n_batches}") - for future in asyncio.as_completed(batch): - result = await future + with pbm.create_single_bar(total_tasks) as pbar: + async for result in process_futures( + as_completed(tasks, max_workers), pbar + ): results.append(result) - overall_pbar.update(1) - batch_pbar.update(1) + else: + total_tasks = len(tasks) + batches = batched(tasks, batch_size) + overall_pbar, batch_pbar, n_batches = pbm.create_nested_bars( + total_tasks, batch_size + ) + with overall_pbar, batch_pbar: + for i, batch in enumerate(batches, 1): + pbm.update_batch_bar(batch_pbar, i, n_batches, len(batch)) + async for result in process_futures( + as_completed(batch, max_workers), batch_pbar + ): + results.append(result) + overall_pbar.update(len(batch)) return results - if is_event_loop_running(): - # an event loop is running so call nested_asyncio to fix this - try: - import nest_asyncio - except ImportError: - raise ImportError( - "It seems like your running this in a jupyter-like environment. " - "Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) - else: - nest_asyncio.apply() - - results = asyncio.run(_run()) - return results + return run(_run) diff --git a/ragas/src/ragas/executor.py b/ragas/src/ragas/executor.py index a0e209694..47a5c108b 100644 --- a/ragas/src/ragas/executor.py +++ b/ragas/src/ragas/executor.py @@ -1,56 +1,19 @@ from __future__ import annotations -import asyncio import logging import typing as t from dataclasses import dataclass, field -import nest_asyncio import numpy as np from tqdm.auto import tqdm +from ragas.async_utils import as_completed, process_futures, run from ragas.run_config import RunConfig -from ragas.utils import batched - -nest_asyncio.apply() +from ragas.utils import ProgressBarManager, batched logger = logging.getLogger(__name__) -def is_event_loop_running() -> bool: - """ - Check if an event loop is currently running. - """ - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return False - else: - return loop.is_running() - - -async def as_completed( - coroutines: t.List[t.Coroutine], max_workers: int -) -> t.Iterator[asyncio.Future]: - """ - Wrap coroutines with a semaphore if max_workers is specified. - - Returns an iterator of futures that completes as tasks finish. - """ - if max_workers == -1: - tasks = [asyncio.create_task(coro) for coro in coroutines] - - else: - semaphore = asyncio.Semaphore(max_workers) - - async def sema_coro(coro): - async with semaphore: - return await coro - - tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines] - return asyncio.as_completed(tasks) - - @dataclass class Executor: """ @@ -83,15 +46,13 @@ class Executor: raise_exceptions: bool = False batch_size: t.Optional[int] = None run_config: t.Optional[RunConfig] = field(default=None, repr=False) - _nest_asyncio_applied: bool = field(default=False, repr=False) pbar: t.Optional[tqdm] = None + _jobs_processed: int = field(default=0, repr=False) def wrap_callable_with_index( self, callable: t.Callable, counter: int ) -> t.Callable: - async def wrapped_callable_async( - *args, **kwargs - ) -> t.Tuple[int, t.Callable | float]: + async def wrapped_callable_async(*args, **kwargs) -> t.Tuple[int, t.Any]: try: result = await callable(*args, **kwargs) return counter, result @@ -122,97 +83,128 @@ def submit( """ Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index. """ - callable_with_index = self.wrap_callable_with_index(callable, len(self.jobs)) + # Use _jobs_processed for consistent indexing across multiple runs + callable_with_index = self.wrap_callable_with_index( + callable, self._jobs_processed + ) self.jobs.append((callable_with_index, args, kwargs, name)) + self._jobs_processed += 1 + + def clear_jobs(self) -> None: + """Clear all submitted jobs and reset counter.""" + self.jobs.clear() + self._jobs_processed = 0 async def _process_jobs(self) -> t.List[t.Any]: """Execute jobs with optional progress tracking.""" - max_workers = (self.run_config or RunConfig()).max_workers + if not self.jobs: + return [] + + # Make a copy of jobs to process and clear the original list to prevent re-execution + jobs_to_process = self.jobs.copy() + self.jobs.clear() + + max_workers = ( + self.run_config.max_workers + if self.run_config and hasattr(self.run_config, "max_workers") + else -1 + ) results = [] + pbm = ProgressBarManager(self.desc, self.show_progress) if not self.batch_size: # Use external progress bar if provided, otherwise create one if self.pbar is None: - with tqdm( - total=len(self.jobs), - desc=self.desc, - disable=not self.show_progress, - ) as internal_pbar: + with pbm.create_single_bar(len(jobs_to_process)) as internal_pbar: await self._process_coroutines( - self.jobs, internal_pbar, results, max_workers + jobs_to_process, internal_pbar, results, max_workers ) else: await self._process_coroutines( - self.jobs, self.pbar, results, max_workers + jobs_to_process, self.pbar, results, max_workers ) - return results - # With batching, show nested progress bars - batches = batched(self.jobs, self.batch_size) # generator of job tuples - n_batches = (len(self.jobs) + self.batch_size - 1) // self.batch_size - - with ( - tqdm( - total=len(self.jobs), - desc=self.desc, - disable=not self.show_progress, - position=1, - leave=True, - ) as overall_pbar, - tqdm( - total=min(self.batch_size, len(self.jobs)), - desc=f"Batch 1/{n_batches}", - disable=not self.show_progress, - position=0, - leave=False, - ) as batch_pbar, - ): + # Process jobs in batches with nested progress bars + await self._process_batched_jobs(jobs_to_process, pbm, max_workers, results) + return results + + async def _process_batched_jobs( + self, jobs_to_process, progress_manager, max_workers, results + ): + """Process jobs in batches with nested progress tracking.""" + batch_size = self.batch_size or len(jobs_to_process) + batches = batched(jobs_to_process, batch_size) + overall_pbar, batch_pbar, n_batches = progress_manager.create_nested_bars( + len(jobs_to_process), batch_size + ) + + with overall_pbar, batch_pbar: for i, batch in enumerate(batches, 1): - batch_pbar.reset(total=len(batch)) - batch_pbar.set_description(f"Batch {i}/{n_batches}") + progress_manager.update_batch_bar(batch_pbar, i, n_batches, len(batch)) # Create coroutines per batch coroutines = [ afunc(*args, **kwargs) for afunc, args, kwargs, _ in batch ] - for future in await as_completed(coroutines, max_workers): - result = await future - results.append(result) - overall_pbar.update(1) - batch_pbar.update(1) - - return results + async for result in process_futures( + as_completed(coroutines, max_workers), batch_pbar + ): + # Ensure result is always a tuple (counter, value) + if isinstance(result, Exception): + # Find the counter for this failed job + idx = coroutines.index(result.__context__) + counter = ( + batch[idx][0].__closure__[1].cell_contents + ) # counter from closure + results.append((counter, result)) + else: + results.append(result) + # Update overall progress bar for all futures in this batch + overall_pbar.update(len(batch)) async def _process_coroutines(self, jobs, pbar, results, max_workers): """Helper function to process coroutines and update the progress bar.""" coroutines = [afunc(*args, **kwargs) for afunc, args, kwargs, _ in jobs] - for future in await as_completed(coroutines, max_workers): - result = await future - results.append(result) - pbar.update(1) + async for result in process_futures( + as_completed(coroutines, max_workers), pbar + ): + # Ensure result is always a tuple (counter, value) + if isinstance(result, Exception): + idx = coroutines.index(result.__context__) + counter = ( + jobs[idx][0].__closure__[1].cell_contents + ) # counter from closure + results.append((counter, result)) + else: + results.append(result) + + async def aresults(self) -> t.List[t.Any]: + """ + Execute all submitted jobs and return their results asynchronously. + The results are returned in the order of job submission. + + This is the async entry point for executing async jobs when already in an async context. + """ + results = await self._process_jobs() + # If raise_exceptions is True, propagate the exception + for r in results: + if self.raise_exceptions and isinstance(r, Exception): + raise r + sorted_results = sorted(results, key=lambda x: x[0]) + return [r[1] for r in sorted_results] def results(self) -> t.List[t.Any]: """ Execute all submitted jobs and return their results. The results are returned in the order of job submission. + + This is the main sync entry point for executing async jobs. """ - if is_event_loop_running(): - # an event loop is running so call nested_asyncio to fix this - try: - import nest_asyncio - except ImportError as e: - raise ImportError( - "It seems like your running this in a jupyter-like environment. " - "Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) from e - else: - if not self._nest_asyncio_applied: - nest_asyncio.apply() - self._nest_asyncio_applied = True - results = asyncio.run(self._process_jobs()) - sorted_results = sorted(results, key=lambda x: x[0]) - return [r[1] for r in sorted_results] + async def _async_wrapper(): + return await self.aresults() + + return run(_async_wrapper) def run_async_batch( diff --git a/ragas/src/ragas/metrics/base.py b/ragas/src/ragas/metrics/base.py index 11c1b2947..f78a55f11 100644 --- a/ragas/src/ragas/metrics/base.py +++ b/ragas/src/ragas/metrics/base.py @@ -12,9 +12,9 @@ from tqdm import tqdm from ragas._analytics import EvaluationEvent, _analytics_batcher +from ragas.async_utils import is_event_loop_running from ragas.callbacks import ChainType, new_group from ragas.dataset_schema import MetricAnnotation, MultiTurnSample, SingleTurnSample -from ragas.executor import is_event_loop_running from ragas.losses import BinaryMetricLoss, MSELoss from ragas.prompt import FewShotPydanticPrompt, PromptMixin from ragas.run_config import RunConfig diff --git a/ragas/src/ragas/testset/transforms/base.py b/ragas/src/ragas/testset/transforms/base.py index 49945e482..e58e12bbe 100644 --- a/ragas/src/ragas/testset/transforms/base.py +++ b/ragas/src/ragas/testset/transforms/base.py @@ -68,20 +68,30 @@ def filter(self, kg: KnowledgeGraph) -> KnowledgeGraph: KnowledgeGraph The filtered knowledge graph. """ - + logger.debug("Filtering KnowledgeGraph with %s", self.filter_nodes.__name__) + filtered_nodes = [node for node in kg.nodes if self.filter_nodes(node)] + node_ids = {node.id for node in filtered_nodes} + filtered_relationships = [ + rel + for rel in kg.relationships + if (rel.source.id in node_ids) and (rel.target.id in node_ids) + ] + logger.debug( + "Filter reduced KnowledgeGraph by %d/%d nodes and %d/%d relationships", + len(kg.nodes) - len(filtered_nodes), + len(kg.nodes), + len(kg.relationships) - len(filtered_relationships), + len(kg.relationships), + ) return KnowledgeGraph( - nodes=[node for node in kg.nodes if self.filter_nodes(node)], - relationships=[ - rel - for rel in kg.relationships - if rel.source in kg.nodes and rel.target in kg.nodes - ], + nodes=filtered_nodes, + relationships=filtered_relationships, ) @abstractmethod - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in sequence by the Executor. This + Generates a sequence of coroutines to be executed in sequence by the Executor. This coroutine will, upon execution, write the transformation into the KnowledgeGraph. Parameters @@ -91,8 +101,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ pass @@ -159,9 +169,9 @@ async def extract(self, node: Node) -> t.Tuple[str, t.Any]: """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in parallel by the Executor. + Generates a sequence of coroutines to be executed in parallel by the Executor. Parameters ---------- @@ -170,8 +180,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ async def apply_extract(node: Node): @@ -186,7 +196,13 @@ async def apply_extract(node: Node): ) filtered = self.filter(kg) - return [apply_extract(node) for node in filtered.nodes] + plan = [apply_extract(node) for node in filtered.nodes] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan @dataclass @@ -197,7 +213,6 @@ class LLMBasedExtractor(Extractor, PromptMixin): tokenizer: Encoding = DEFAULT_TOKENIZER def split_text_by_token_limit(self, text, max_token_limit): - # Tokenize the entire input string tokens = self.tokenizer.encode(text) @@ -268,9 +283,9 @@ async def split(self, node: Node) -> t.Tuple[t.List[Node], t.List[Relationship]] """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in parallel by the Executor. + Generates a sequence of coroutines to be executed in parallel by the Executor. Parameters ---------- @@ -279,8 +294,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ async def apply_split(node: Node): @@ -289,7 +304,13 @@ async def apply_split(node: Node): kg.relationships.extend(relationships) filtered = self.filter(kg) - return [apply_split(node) for node in filtered.nodes] + plan = [apply_split(node) for node in filtered.nodes] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan class RelationshipBuilder(BaseGraphTransformation): @@ -319,9 +340,9 @@ async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]: """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in parallel by the Executor. + Generates a sequence of coroutines to be executed in parallel by the Executor. Parameters ---------- @@ -330,8 +351,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ async def apply_build_relationships( @@ -341,14 +362,18 @@ async def apply_build_relationships( original_kg.relationships.extend(relationships) filtered_kg = self.filter(kg) - return [apply_build_relationships(filtered_kg=filtered_kg, original_kg=kg)] + plan = [apply_build_relationships(filtered_kg=filtered_kg, original_kg=kg)] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan @dataclass class NodeFilter(BaseGraphTransformation): - async def transform(self, kg: KnowledgeGraph) -> KnowledgeGraph: - filtered = self.filter(kg) for node in filtered.nodes: @@ -378,9 +403,9 @@ async def custom_filter(self, node: Node, kg: KnowledgeGraph) -> bool: """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed + Generates a sequence of coroutines to be executed """ async def apply_filter(node: Node): @@ -388,7 +413,13 @@ async def apply_filter(node: Node): kg.remove_node(node) filtered = self.filter(kg) - return [apply_filter(node) for node in filtered.nodes] + plan = [apply_filter(node) for node in filtered.nodes] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan @dataclass diff --git a/ragas/src/ragas/testset/transforms/engine.py b/ragas/src/ragas/testset/transforms/engine.py index 0eb312b86..29eec1a52 100644 --- a/ragas/src/ragas/testset/transforms/engine.py +++ b/ragas/src/ragas/testset/transforms/engine.py @@ -1,12 +1,11 @@ from __future__ import annotations -import asyncio import logging import typing as t from tqdm.auto import tqdm -from ragas.executor import as_completed, is_event_loop_running +from ragas.async_utils import apply_nest_asyncio, as_completed, run_async_tasks from ragas.run_config import RunConfig from ragas.testset.graph import KnowledgeGraph from ragas.testset.transforms.base import BaseGraphTransformation @@ -17,7 +16,7 @@ logger = logging.getLogger(__name__) Transforms = t.Union[ - t.List[BaseGraphTransformation], + t.List[t.Union[BaseGraphTransformation, "Parallel"]], "Parallel", BaseGraphTransformation, ] @@ -32,22 +31,28 @@ class Parallel: >>> Parallel(HeadlinesExtractor(), SummaryExtractor()) """ - def __init__(self, *transformations: BaseGraphTransformation): + def __init__(self, *transformations: t.Union[BaseGraphTransformation, "Parallel"]): self.transformations = list(transformations) - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: coroutines = [] for transformation in self.transformations: coroutines.extend(transformation.generate_execution_plan(kg)) + class_names = [t.__class__.__name__ for t in self.transformations] + logger.debug( + f"Created {len(coroutines)} coroutines for transformations: {class_names}" + ) return coroutines -async def run_coroutines(coroutines: t.List[t.Coroutine], desc: str, max_workers: int): +async def run_coroutines( + coroutines: t.Sequence[t.Coroutine], desc: str, max_workers: int +): """ - Run a list of coroutines in parallel. + Run a sequence of coroutines in parallel. """ for future in tqdm( - await as_completed(coroutines, max_workers=max_workers), + as_completed(coroutines, max_workers=max_workers), desc=desc, total=len(coroutines), # whether you want to keep the progress bar after completion @@ -67,22 +72,6 @@ def get_desc(transform: BaseGraphTransformation | Parallel): return f"Applying {transform.__class__.__name__}" -def apply_nest_asyncio(): - NEST_ASYNCIO_APPLIED: bool = False - if is_event_loop_running(): - # an event loop is running so call nested_asyncio to fix this - try: - import nest_asyncio - except ImportError: - raise ImportError( - "It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) - - if not NEST_ASYNCIO_APPLIED: - nest_asyncio.apply() - NEST_ASYNCIO_APPLIED = True - - def apply_transforms( kg: KnowledgeGraph, transforms: Transforms, @@ -90,44 +79,41 @@ def apply_transforms( callbacks: t.Optional[Callbacks] = None, ): """ - Apply a list of transformations to a knowledge graph in place. + Recursively apply transformations to a knowledge graph in place. """ # apply nest_asyncio to fix the event loop issue in jupyter apply_nest_asyncio() - # if single transformation, wrap it in a list - if isinstance(transforms, BaseGraphTransformation): - transforms = [transforms] + max_workers = getattr(run_config, "max_workers", -1) - # apply the transformations - # if Sequences, apply each transformation sequentially - if isinstance(transforms, t.List): + if isinstance(transforms, t.Sequence): for transform in transforms: - asyncio.run( - run_coroutines( - transform.generate_execution_plan(kg), - get_desc(transform), - run_config.max_workers, - ) - ) - # if Parallel, collect inside it and run it all + apply_transforms(kg, transform, run_config, callbacks) elif isinstance(transforms, Parallel): - asyncio.run( - run_coroutines( - transforms.generate_execution_plan(kg), - get_desc(transforms), - run_config.max_workers, - ) + apply_transforms(kg, transforms.transformations, run_config, callbacks) + elif isinstance(transforms, BaseGraphTransformation): + logger.debug( + f"Generating execution plan for transformation {transforms.__class__.__name__}" + ) + coros = transforms.generate_execution_plan(kg) + desc = get_desc(transforms) + run_async_tasks( + coros, + batch_size=None, + show_progress=True, + progress_bar_desc=desc, + max_workers=max_workers, ) else: raise ValueError( - f"Invalid transforms type: {type(transforms)}. Expects a list of BaseGraphTransformations or a Parallel instance." + f"Invalid transforms type: {type(transforms)}. Expects a sequence of BaseGraphTransformations or a Parallel instance." ) + logger.debug("All transformations applied successfully.") def rollback_transforms(kg: KnowledgeGraph, transforms: Transforms): """ - Rollback a list of transformations from a knowledge graph. + Rollback a sequence of transformations from a knowledge graph. Note ---- diff --git a/ragas/src/ragas/utils.py b/ragas/src/ragas/utils.py index b24818427..6f9960801 100644 --- a/ragas/src/ragas/utils.py +++ b/ragas/src/ragas/utils.py @@ -12,6 +12,7 @@ import numpy as np import tiktoken from datasets import Dataset +from tqdm.auto import tqdm if t.TYPE_CHECKING: from ragas.metrics.base import Metric @@ -239,6 +240,51 @@ def batched(iterable: t.Iterable, n: int) -> t.Iterator[t.Tuple]: yield batch +class ProgressBarManager: + """Manages progress bars for batch and non-batch execution.""" + + def __init__(self, desc: str, show_progress: bool): + self.desc = desc + self.show_progress = show_progress + + def create_single_bar(self, total: int) -> tqdm: + """Create a single progress bar for non-batch execution.""" + return tqdm( + total=total, + desc=self.desc, + disable=not self.show_progress, + ) + + def create_nested_bars(self, total_jobs: int, batch_size: int): + """Create nested progress bars for batch execution.""" + n_batches = (total_jobs + batch_size - 1) // batch_size + + overall_pbar = tqdm( + total=total_jobs, + desc=self.desc, + disable=not self.show_progress, + position=0, + leave=True, + ) + + batch_pbar = tqdm( + total=min(batch_size, total_jobs), + desc=f"Batch 1/{n_batches}", + disable=not self.show_progress, + position=1, + leave=False, + ) + + return overall_pbar, batch_pbar, n_batches + + def update_batch_bar( + self, batch_pbar: tqdm, batch_num: int, n_batches: int, batch_size: int + ): + """Update batch progress bar for new batch.""" + batch_pbar.reset(total=batch_size) + batch_pbar.set_description(f"Batch {batch_num}/{n_batches}") + + _LOGGER_DATE_TIME = "%Y-%m-%d %H:%M:%S" diff --git a/ragas/tests/unit/test_async_utils.py b/ragas/tests/unit/test_async_utils.py index 2666a35ff..477f5be05 100644 --- a/ragas/tests/unit/test_async_utils.py +++ b/ragas/tests/unit/test_async_utils.py @@ -1,8 +1,72 @@ +import asyncio + import pytest from ragas.async_utils import run_async_tasks +def test_is_event_loop_running_in_script(): + from ragas.async_utils import is_event_loop_running + + assert is_event_loop_running() is False + + +def test_as_completed_in_script(): + from ragas.async_utils import as_completed + + async def echo_order(index: int, delay: float): + await asyncio.sleep(delay) + return index + + async def _run(): + # Use decreasing delays so results come out in reverse order + coros = [echo_order(1, 0.3), echo_order(2, 0.2), echo_order(3, 0.1)] + results = [] + for t in as_completed(coros, 3): + r = await t + results.append(r) + return results + + results = asyncio.run(_run()) + # Results should be [3, 2, 1] due to decreasing delays + assert results == [3, 2, 1] + + +def test_as_completed_max_workers(): + import time + + from ragas.async_utils import as_completed + + async def sleeper(idx): + await asyncio.sleep(0.1) + return idx + + async def _run(): + start = time.time() + coros = [sleeper(i) for i in range(5)] + results = [] + for t in as_completed(coros, max_workers=2): + r = await t + results.append(r) + elapsed = time.time() - start + return results, elapsed + + results, elapsed = asyncio.run(_run()) + # With max_workers=2, total time should be at least 0.2s for 5 tasks + assert len(results) == 5 + assert elapsed >= 0.2 + + +def test_run_function(): + from ragas.async_utils import run + + async def foo(): + return 42 + + result = run(foo) + assert result == 42 + + @pytest.fixture def tasks(): async def echo_order(index: int): @@ -11,28 +75,16 @@ async def echo_order(index: int): return [echo_order(i) for i in range(1, 11)] -@pytest.mark.asyncio -async def test_run_async_tasks_unbatched(tasks): - # Act +def test_run_async_tasks_unbatched(tasks): results = run_async_tasks(tasks) - - # Assert assert sorted(results) == sorted(range(1, 11)) -@pytest.mark.asyncio -async def test_run_async_tasks_batched(tasks): - # Act +def test_run_async_tasks_batched(tasks): results = run_async_tasks(tasks, batch_size=3) - - # Assert assert sorted(results) == sorted(range(1, 11)) -@pytest.mark.asyncio -async def test_run_async_tasks_no_progress(tasks): - # Act +def test_run_async_tasks_no_progress(tasks): results = run_async_tasks(tasks, show_progress=False) - - # Assert assert sorted(results) == sorted(range(1, 11)) diff --git a/ragas/tests/unit/test_engine.py b/ragas/tests/unit/test_engine.py new file mode 100644 index 000000000..33aa3c910 --- /dev/null +++ b/ragas/tests/unit/test_engine.py @@ -0,0 +1,130 @@ +import asyncio +import types +import typing as t + +import pytest + +from ragas.testset.graph import KnowledgeGraph, Node, NodeType +from ragas.testset.transforms.base import BaseGraphTransformation +from ragas.testset.transforms.engine import Parallel, apply_transforms, get_desc + + +class DummyTransformation(BaseGraphTransformation): + def __init__(self, name="Dummy"): + self.name = name + + def generate_execution_plan(self, kg): + return [self.double(node) for node in kg.nodes] + + async def transform( + self, kg: KnowledgeGraph + ) -> t.List[t.Tuple[Node, t.Tuple[str, t.Any]]]: + filtered = self.filter(kg) + nodes = sorted( + filtered.nodes, key=lambda n: n.get_property("page_content") or "" + ) + return [(node, await self.double(node)) for node in nodes] + + async def double(self, node): + # Repeat the text in a single node's 'page_content' property + content = node.get_property("page_content") + if content is not None: + node.properties["page_content"] = content * 2 + return node + + +@pytest.fixture +def kg(): + import string + + kg = KnowledgeGraph() + for letter in string.ascii_uppercase[:10]: + node = Node( + properties={"page_content": letter}, + type=NodeType.DOCUMENT, + ) + kg.add(node) + return kg + + +def test_parallel_stores_transformations(): + t1 = DummyTransformation("A") + t2 = DummyTransformation("B") + p = Parallel(t1, t2) + assert p.transformations == [t1, t2] + + +def test_parallel_generate_execution_plan_aggregates(kg): + t1 = DummyTransformation("A") + t2 = DummyTransformation("B") + p = Parallel(t1, t2) + coros = p.generate_execution_plan(kg) + assert len(coros) == len(kg.nodes) * 2 # Each transformation runs on each node + assert all(isinstance(c, types.CoroutineType) for c in coros) + + # Await all coroutines to avoid RuntimeWarning + async def run_all(): + await asyncio.gather(*coros) + + asyncio.run(run_all()) + + +def test_parallel_nested(kg): + t1 = DummyTransformation("A") + t2 = DummyTransformation("B") + p_inner = Parallel(t1) + p_outer = Parallel(p_inner, t2) + coros = p_outer.generate_execution_plan(kg) + assert len(coros) == len(kg.nodes) * 2 # Each transformation runs on each node + assert all(isinstance(c, types.CoroutineType) for c in coros) + + # Await all coroutines to avoid RuntimeWarning + async def run_all(): + await asyncio.gather(*coros) + + asyncio.run(run_all()) + + +def test_get_desc_parallel_and_single(): + t1 = DummyTransformation("A") + p = Parallel(t1) + desc_p = get_desc(p) + desc_t = get_desc(t1) + assert "Parallel" not in desc_t + assert "DummyTransformation" in desc_p or "DummyTransformation" in desc_t + + +def test_apply_transforms_single(kg): + t1 = DummyTransformation() + apply_transforms(kg, t1) + # All nodes' page_content should be doubled + for node in kg.nodes: + content = node.get_property("page_content") + assert content == (content[0] * 2) + + +def test_apply_transforms_list(kg): + t1 = DummyTransformation() + t2 = DummyTransformation() + apply_transforms(kg, [t1, t2]) + # Each transformation doubles the content, so after two: x -> xxxx + for node in kg.nodes: + content = node.get_property("page_content") + assert content == (content[0] * 2 * 2) + + +def test_apply_transforms_parallel(kg): + t1 = DummyTransformation() + t2 = DummyTransformation() + p = Parallel(t1, t2) + apply_transforms(kg, p) + # Each transformation in parallel doubles the content, but both operate on the same initial state, so after both: x -> xx (not xxxx) + for node in kg.nodes: + content = node.get_property("page_content") + assert content == (content[0] * 2 * 2) + + +def test_apply_transforms_invalid(): + kg = KnowledgeGraph() + with pytest.raises(ValueError): + apply_transforms(kg, 123) # type: ignore diff --git a/ragas/tests/unit/test_executor.py b/ragas/tests/unit/test_executor.py index be08caaa9..6c23cfea7 100644 --- a/ragas/tests/unit/test_executor.py +++ b/ragas/tests/unit/test_executor.py @@ -66,31 +66,6 @@ async def echo_order(index: int): assert results == list(range(1, 4)) -def test_is_event_loop_running_in_script(): - from ragas.executor import is_event_loop_running - - assert is_event_loop_running() is False - - -def test_as_completed_in_script(): - from ragas.executor import as_completed - - async def echo_order(index: int): - await asyncio.sleep(index) - return index - - async def _run(): - results = [] - for t in await as_completed([echo_order(1), echo_order(2), echo_order(3)], 3): - r = await t - results.append(r) - return results - - results = asyncio.run(_run()) - - assert results == [1, 2, 3] - - def test_executor_timings(): # if we submit n tasks that take 1 second each, # the total time taken should be close to 1 second @@ -111,3 +86,67 @@ async def long_task(): assert len(results) == n_tasks assert all(r == 1 for r in results) assert end_time - start_time < 0.2 + + +def test_executor_exception_handling(): + """Test that exceptions are returned as np.nan when raise_exceptions is False.""" + import numpy as np + + async def fail_task(): + raise ValueError("fail") + + executor = Executor() + executor.submit(fail_task) + results = executor.results() + assert len(results) == 1 + assert np.isnan(results[0]) + + +def test_executor_exception_raises(): + """Test that exceptions are raised when raise_exceptions is True.""" + + async def fail_task(): + raise ValueError("fail") + + executor = Executor(raise_exceptions=True) + executor.submit(fail_task) + with pytest.raises(ValueError): + executor.results() + + +def test_executor_empty_jobs(): + """Test that results() returns an empty list if no jobs are submitted.""" + executor = Executor() + assert executor.results() == [] + + +def test_executor_job_index_after_clear(): + """Test that job indices reset after clearing jobs.""" + + async def echo(x): + return x + + executor = Executor() + executor.submit(echo, 1) + executor.clear_jobs() + executor.submit(echo, 42) + results = executor.results() + assert results == [42] + + +def test_executor_batch_size_edge_cases(): + """Test batch_size=1 and batch_size > number of jobs.""" + + async def echo(x): + return x + + # batch_size=1 + executor = Executor(batch_size=1) + for i in range(3): + executor.submit(echo, i) + assert executor.results() == [0, 1, 2] + # batch_size > jobs + executor = Executor(batch_size=10) + for i in range(3): + executor.submit(echo, i) + assert executor.results() == [0, 1, 2] diff --git a/ragas/tests/unit/test_executor_in_jupyter.ipynb b/ragas/tests/unit/test_executor_in_jupyter.ipynb index e8922e934..a4a493069 100644 --- a/ragas/tests/unit/test_executor_in_jupyter.ipynb +++ b/ragas/tests/unit/test_executor_in_jupyter.ipynb @@ -39,7 +39,7 @@ "metadata": {}, "outputs": [], "source": [ - "from ragas.executor import is_event_loop_running, as_completed" + "from ragas.async_utils import is_event_loop_running, as_completed" ] }, { @@ -59,7 +59,7 @@ "source": [ "async def _run():\n", " results = []\n", - " for t in await as_completed([echo(1), echo(2), echo(3)], 3):\n", + " for t in as_completed([echo(1), echo(2), echo(3)], 3):\n", " r = await t\n", " results.append(r)\n", " return results\n", @@ -78,6 +78,13 @@ "## Test Executor" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "_**NOTE**: Requires `ipywidgets` installed_" + ] + }, { "cell_type": "code", "execution_count": null, @@ -98,7 +105,22 @@ "for i in range(10):\n", " executor.submit(echo, i, name=f\"echo_{i}\")\n", "\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", + "assert results == list(range(10))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# test order of results when they should return in submission order\n", + "executor = Executor(raise_exceptions=True)\n", + "for i in range(10):\n", + " executor.submit(echo, i, name=f\"echo_{i}\")\n", + "\n", + "results = executor.results() # await executor.aresults()\n", "assert results == list(range(10))" ] }, @@ -116,7 +138,7 @@ " executor.submit(echo_random_latency, i, name=f\"echo_order_{i}\")\n", "\n", "# Act\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "# Assert\n", "assert results == list(range(10))" ] @@ -135,7 +157,7 @@ " executor.submit(echo_random_latency, i, name=f\"echo_order_{i}\")\n", "\n", "# Act\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "# Assert\n", "assert results == list(range(10))" ] @@ -154,7 +176,7 @@ " executor.submit(echo_random_latency, i, name=f\"echo_order_{i}\")\n", "\n", "# Act\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "# Assert\n", "assert results == list(range(10))" ] @@ -170,13 +192,13 @@ "for i in range(1000):\n", " executor.submit(asyncio.sleep, 0.01)\n", "\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "assert results, \"Results should be list of None\"\n", "\n", "for i in range(1000):\n", " executor.submit(asyncio.sleep, 0.01)\n", "\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "assert results, \"Results should be list of None\"" ] }, @@ -264,7 +286,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -285,7 +307,7 @@ ], "metadata": { "kernelspec": { - "display_name": "venv", + "display_name": ".venv", "language": "python", "name": "python3" }, @@ -299,7 +321,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.13.0" } }, "nbformat": 4,