From e2409fcc33fa50cc98e18397a7e1ec5823b4e256 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 11:18:24 -0400 Subject: [PATCH 1/8] refactor: move is_event_loop_running and as_completed to async_utils This helps separate async features from executor and engine modules, and will prevent circular imports in these modules. --- docs/references/executor.md | 1 - ragas/src/ragas/async_utils.py | 50 +++++++++++++++++--- ragas/src/ragas/executor.py | 35 +------------- ragas/src/ragas/metrics/base.py | 2 +- ragas/src/ragas/testset/transforms/engine.py | 2 +- ragas/tests/unit/test_async_utils.py | 25 +++++++++- ragas/tests/unit/test_executor.py | 25 ---------- 7 files changed, 71 insertions(+), 69 deletions(-) diff --git a/docs/references/executor.md b/docs/references/executor.md index e8eb86ba8..91c415359 100644 --- a/docs/references/executor.md +++ b/docs/references/executor.md @@ -3,4 +3,3 @@ members: - Executor - run_async_batch - - is_event_loop_running diff --git a/ragas/src/ragas/async_utils.py b/ragas/src/ragas/async_utils.py index f327955c2..44110fb15 100644 --- a/ragas/src/ragas/async_utils.py +++ b/ragas/src/ragas/async_utils.py @@ -1,20 +1,57 @@ """Async utils.""" import asyncio -from typing import Any, Coroutine, List, Optional +import logging +import typing as t from tqdm.auto import tqdm -from ragas.executor import is_event_loop_running -from ragas.utils import batched +logger = logging.getLogger(__name__) + + +def is_event_loop_running() -> bool: + """ + Check if an event loop is currently running. + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return False + else: + return loop.is_running() + + +async def as_completed( + coroutines: t.Sequence[t.Coroutine], max_workers: int = -1 +) -> t.Iterator[asyncio.Future]: + """ + Wrap coroutines with a semaphore if max_workers is specified. + + Returns an iterator of futures that completes as tasks finish. + """ + if max_workers == -1: + tasks = [asyncio.create_task(coro) for coro in coroutines] + else: + semaphore = asyncio.Semaphore(max_workers) + + async def sema_coro(coro): + async with semaphore: + return await coro + + tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines] + + return asyncio.as_completed(tasks) + + def run_async_tasks( - tasks: List[Coroutine], - batch_size: Optional[int] = None, + tasks: t.List[t.Coroutine], + batch_size: t.Optional[int] = None, show_progress: bool = True, progress_bar_desc: str = "Running async tasks", -) -> List[Any]: + max_workers: int = -1, +) -> t.List[t.Any]: """ Execute async tasks with optional batching and progress tracking. @@ -25,6 +62,7 @@ def run_async_tasks( batch_size: Optional size for batching tasks. If None, runs all concurrently show_progress: Whether to display progress bars """ + from ragas.utils import batched async def _run(): total_tasks = len(tasks) diff --git a/ragas/src/ragas/executor.py b/ragas/src/ragas/executor.py index a0e209694..67842db58 100644 --- a/ragas/src/ragas/executor.py +++ b/ragas/src/ragas/executor.py @@ -9,6 +9,7 @@ import numpy as np from tqdm.auto import tqdm +from ragas.async_utils import as_completed, process_futures from ragas.run_config import RunConfig from ragas.utils import batched @@ -17,40 +18,6 @@ logger = logging.getLogger(__name__) -def is_event_loop_running() -> bool: - """ - Check if an event loop is currently running. - """ - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return False - else: - return loop.is_running() - - -async def as_completed( - coroutines: t.List[t.Coroutine], max_workers: int -) -> t.Iterator[asyncio.Future]: - """ - Wrap coroutines with a semaphore if max_workers is specified. - - Returns an iterator of futures that completes as tasks finish. - """ - if max_workers == -1: - tasks = [asyncio.create_task(coro) for coro in coroutines] - - else: - semaphore = asyncio.Semaphore(max_workers) - - async def sema_coro(coro): - async with semaphore: - return await coro - - tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines] - return asyncio.as_completed(tasks) - - @dataclass class Executor: """ diff --git a/ragas/src/ragas/metrics/base.py b/ragas/src/ragas/metrics/base.py index 11c1b2947..f78a55f11 100644 --- a/ragas/src/ragas/metrics/base.py +++ b/ragas/src/ragas/metrics/base.py @@ -12,9 +12,9 @@ from tqdm import tqdm from ragas._analytics import EvaluationEvent, _analytics_batcher +from ragas.async_utils import is_event_loop_running from ragas.callbacks import ChainType, new_group from ragas.dataset_schema import MetricAnnotation, MultiTurnSample, SingleTurnSample -from ragas.executor import is_event_loop_running from ragas.losses import BinaryMetricLoss, MSELoss from ragas.prompt import FewShotPydanticPrompt, PromptMixin from ragas.run_config import RunConfig diff --git a/ragas/src/ragas/testset/transforms/engine.py b/ragas/src/ragas/testset/transforms/engine.py index 0eb312b86..76070cb81 100644 --- a/ragas/src/ragas/testset/transforms/engine.py +++ b/ragas/src/ragas/testset/transforms/engine.py @@ -6,7 +6,7 @@ from tqdm.auto import tqdm -from ragas.executor import as_completed, is_event_loop_running +from ragas.async_utils import as_completed, run_async_tasks from ragas.run_config import RunConfig from ragas.testset.graph import KnowledgeGraph from ragas.testset.transforms.base import BaseGraphTransformation diff --git a/ragas/tests/unit/test_async_utils.py b/ragas/tests/unit/test_async_utils.py index 2666a35ff..55d80744a 100644 --- a/ragas/tests/unit/test_async_utils.py +++ b/ragas/tests/unit/test_async_utils.py @@ -1,6 +1,29 @@ +import asyncio + import pytest -from ragas.async_utils import run_async_tasks +from ragas.async_utils import as_completed, is_event_loop_running, run_async_tasks + + +def test_is_event_loop_running_in_script(): + assert is_event_loop_running() is False + + +def test_as_completed_in_script(): + async def echo_order(index: int): + await asyncio.sleep(index) + return index + + async def _run(): + results = [] + for t in await as_completed([echo_order(1), echo_order(2), echo_order(3)], 3): + r = await t + results.append(r) + return results + + results = asyncio.run(_run()) + + assert results == [1, 2, 3] @pytest.fixture diff --git a/ragas/tests/unit/test_executor.py b/ragas/tests/unit/test_executor.py index be08caaa9..0354a8a82 100644 --- a/ragas/tests/unit/test_executor.py +++ b/ragas/tests/unit/test_executor.py @@ -66,31 +66,6 @@ async def echo_order(index: int): assert results == list(range(1, 4)) -def test_is_event_loop_running_in_script(): - from ragas.executor import is_event_loop_running - - assert is_event_loop_running() is False - - -def test_as_completed_in_script(): - from ragas.executor import as_completed - - async def echo_order(index: int): - await asyncio.sleep(index) - return index - - async def _run(): - results = [] - for t in await as_completed([echo_order(1), echo_order(2), echo_order(3)], 3): - r = await t - results.append(r) - return results - - results = asyncio.run(_run()) - - assert results == [1, 2, 3] - - def test_executor_timings(): # if we submit n tasks that take 1 second each, # the total time taken should be close to 1 second From 39b38aebfdd9daba6c64ba5f9484dd5f06c20332 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 11:18:49 -0400 Subject: [PATCH 2/8] feat: add ProgressBarManager for enhanced progress tracking Introduces a new ProgressBarManager class to manage progress bars for both batch and non-batch execution. This includes methods for creating single and nested progress bars, as well as updating batch progress bars, improving user experience during long-running tasks. --- ragas/src/ragas/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/ragas/src/ragas/utils.py b/ragas/src/ragas/utils.py index 337fdde54..323ce9c3c 100644 --- a/ragas/src/ragas/utils.py +++ b/ragas/src/ragas/utils.py @@ -12,6 +12,7 @@ import numpy as np import tiktoken from datasets import Dataset +from tqdm.auto import tqdm if t.TYPE_CHECKING: from ragas.metrics.base import Metric @@ -239,6 +240,51 @@ def batched(iterable: t.Iterable, n: int) -> t.Iterator[t.Tuple]: yield batch +class ProgressBarManager: + """Manages progress bars for batch and non-batch execution.""" + + def __init__(self, desc: str, show_progress: bool): + self.desc = desc + self.show_progress = show_progress + + def create_single_bar(self, total: int) -> tqdm: + """Create a single progress bar for non-batch execution.""" + return tqdm( + total=total, + desc=self.desc, + disable=not self.show_progress, + ) + + def create_nested_bars(self, total_jobs: int, batch_size: int): + """Create nested progress bars for batch execution.""" + n_batches = (total_jobs + batch_size - 1) // batch_size + + overall_pbar = tqdm( + total=total_jobs, + desc=self.desc, + disable=not self.show_progress, + position=0, + leave=True, + ) + + batch_pbar = tqdm( + total=min(batch_size, total_jobs), + desc=f"Batch 1/{n_batches}", + disable=not self.show_progress, + position=1, + leave=False, + ) + + return overall_pbar, batch_pbar, n_batches + + def update_batch_bar( + self, batch_pbar: tqdm, batch_num: int, n_batches: int, batch_size: int + ): + """Update batch progress bar for new batch.""" + batch_pbar.reset(total=batch_size) + batch_pbar.set_description(f"Batch {batch_num}/{n_batches}") + + _LOGGER_DATE_TIME = "%Y-%m-%d %H:%M:%S" From 9d5a5b46def1707b0b0d4c21aef94939d9376c86 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 11:22:21 -0400 Subject: [PATCH 3/8] feat: refactor async utilities for improved functionality - Refactored `run` function from repeated async, executor, and engine logic. - Changed `as_completed` from async to a regular function for better compatibility. - Added `process_futures` to handle futures with optional progress tracking. --- ragas/src/ragas/async_utils.py | 55 +++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/ragas/src/ragas/async_utils.py b/ragas/src/ragas/async_utils.py index 44110fb15..f7df5f881 100644 --- a/ragas/src/ragas/async_utils.py +++ b/ragas/src/ragas/async_utils.py @@ -21,7 +21,7 @@ def is_event_loop_running() -> bool: return loop.is_running() -async def as_completed( +def as_completed( coroutines: t.Sequence[t.Coroutine], max_workers: int = -1 ) -> t.Iterator[asyncio.Future]: """ @@ -43,6 +43,59 @@ async def sema_coro(coro): return asyncio.as_completed(tasks) +async def process_futures( + futures: t.Iterator[asyncio.Future], pbar: t.Optional[tqdm] = None +) -> t.AsyncGenerator[t.Any, None]: + """ + Process futures with optional progress tracking. + + Args: + futures: Iterator of asyncio futures to process (e.g., from asyncio.as_completed) + pbar: Optional progress bar to update + + Yields: + Results from completed futures as they finish + """ + # Process completed futures as they finish + for future in futures: + try: + result = await future + except Exception as e: + result = e + + if pbar: + pbar.update(1) + yield result + + +def run( + async_func: t.Union[ + t.Callable[[], t.Coroutine[t.Any, t.Any, t.Any]], + t.Coroutine[t.Any, t.Any, t.Any], + ], +) -> t.Any: + """Run an async function in the current event loop or a new one if not running.""" + try: + # Check if we're already in a running event loop + loop = asyncio.get_running_loop() + # If we get here, we're in a running loop - need nest_asyncio + try: + import nest_asyncio + except ImportError as e: + raise ImportError( + "It seems like you're running this in a jupyter-like environment. " + "Please install nest_asyncio with `pip install nest_asyncio` to make it work." + ) from e + + nest_asyncio.apply() + # Create the coroutine if it's a callable, otherwise use directly + coro = async_func() if callable(async_func) else async_func + return loop.run_until_complete(coro) + + except RuntimeError: + # No running event loop, so we can use asyncio.run + coro = async_func() if callable(async_func) else async_func + return asyncio.run(coro) def run_async_tasks( From dc40ab14187a6b01a06c3b88a11f424adf54b22e Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 11:38:15 -0400 Subject: [PATCH 4/8] feat: enhance async task management with ProgressBarManager - Refactored Executor to utilize new async_utils and ProgressBarManager for improved composition - Refactored run_async_tasks to utilize ProgressBarManager for improved progress tracking. - Modified tests to validate the new behavior functions --- ragas/src/ragas/async_utils.py | 75 +++++----------- ragas/src/ragas/executor.py | 126 +++++++++++++++------------ ragas/tests/unit/test_async_utils.py | 69 ++++++++++----- 3 files changed, 139 insertions(+), 131 deletions(-) diff --git a/ragas/src/ragas/async_utils.py b/ragas/src/ragas/async_utils.py index f7df5f881..83a013932 100644 --- a/ragas/src/ragas/async_utils.py +++ b/ragas/src/ragas/async_utils.py @@ -114,67 +114,36 @@ def run_async_tasks( tasks: List of coroutines to execute batch_size: Optional size for batching tasks. If None, runs all concurrently show_progress: Whether to display progress bars + max_workers: Maximum number of concurrent tasks (-1 for unlimited) """ - from ragas.utils import batched + from ragas.utils import ProgressBarManager, batched async def _run(): total_tasks = len(tasks) results = [] + pbm = ProgressBarManager(progress_bar_desc, show_progress) - # If no batching, run all tasks concurrently with single progress bar if not batch_size: - with tqdm( - total=total_tasks, - desc=progress_bar_desc, - disable=not show_progress, - ) as pbar: - for future in asyncio.as_completed(tasks): - result = await future + with pbm.create_single_bar(total_tasks) as pbar: + async for result in process_futures( + as_completed(tasks, max_workers), pbar + ): results.append(result) - pbar.update(1) - return results - - # With batching, show nested progress bars - batches = batched(tasks, batch_size) # generator - n_batches = (total_tasks + batch_size - 1) // batch_size - with ( - tqdm( - total=total_tasks, - desc=progress_bar_desc, - disable=not show_progress, - position=0, - leave=True, - ) as overall_pbar, - tqdm( - total=batch_size, - desc=f"Batch 1/{n_batches}", - disable=not show_progress, - position=1, - leave=False, - ) as batch_pbar, - ): - for i, batch in enumerate(batches, 1): - batch_pbar.reset(total=len(batch)) - batch_pbar.set_description(f"Batch {i}/{n_batches}") - for future in asyncio.as_completed(batch): - result = await future - results.append(result) - overall_pbar.update(1) - batch_pbar.update(1) + else: + total_tasks = len(tasks) + batches = batched(tasks, batch_size) + overall_pbar, batch_pbar, n_batches = pbm.create_nested_bars( + total_tasks, batch_size + ) + with overall_pbar, batch_pbar: + for i, batch in enumerate(batches, 1): + pbm.update_batch_bar(batch_pbar, i, n_batches, len(batch)) + async for result in process_futures( + as_completed(batch, max_workers), batch_pbar + ): + results.append(result) + overall_pbar.update(len(batch)) return results - if is_event_loop_running(): - # an event loop is running so call nested_asyncio to fix this - try: - import nest_asyncio - except ImportError: - raise ImportError( - "It seems like your running this in a jupyter-like environment. " - "Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) - else: - nest_asyncio.apply() - - results = asyncio.run(_run()) - return results + return run(_run) diff --git a/ragas/src/ragas/executor.py b/ragas/src/ragas/executor.py index 67842db58..1aeefa04b 100644 --- a/ragas/src/ragas/executor.py +++ b/ragas/src/ragas/executor.py @@ -9,11 +9,9 @@ import numpy as np from tqdm.auto import tqdm -from ragas.async_utils import as_completed, process_futures +from ragas.async_utils import as_completed, process_futures, run from ragas.run_config import RunConfig -from ragas.utils import batched - -nest_asyncio.apply() +from ragas.utils import ProgressBarManager, batched logger = logging.getLogger(__name__) @@ -96,15 +94,12 @@ async def _process_jobs(self) -> t.List[t.Any]: """Execute jobs with optional progress tracking.""" max_workers = (self.run_config or RunConfig()).max_workers results = [] + pbm = ProgressBarManager(self.desc, self.show_progress) if not self.batch_size: # Use external progress bar if provided, otherwise create one if self.pbar is None: - with tqdm( - total=len(self.jobs), - desc=self.desc, - disable=not self.show_progress, - ) as internal_pbar: + with pbm.create_single_bar(len(self.jobs)) as internal_pbar: await self._process_coroutines( self.jobs, internal_pbar, results, max_workers ) @@ -115,71 +110,86 @@ async def _process_jobs(self) -> t.List[t.Any]: return results - # With batching, show nested progress bars - batches = batched(self.jobs, self.batch_size) # generator of job tuples - n_batches = (len(self.jobs) + self.batch_size - 1) // self.batch_size - - with ( - tqdm( - total=len(self.jobs), - desc=self.desc, - disable=not self.show_progress, - position=1, - leave=True, - ) as overall_pbar, - tqdm( - total=min(self.batch_size, len(self.jobs)), - desc=f"Batch 1/{n_batches}", - disable=not self.show_progress, - position=0, - leave=False, - ) as batch_pbar, - ): + # Process jobs in batches with nested progress bars + await self._process_batched_jobs(self.jobs, pbm, max_workers, results) + return results + + async def _process_batched_jobs( + self, jobs_to_process, progress_manager, max_workers, results + ): + """Process jobs in batches with nested progress tracking.""" + batch_size = self.batch_size or len(jobs_to_process) + batches = batched(jobs_to_process, batch_size) + overall_pbar, batch_pbar, n_batches = progress_manager.create_nested_bars( + len(jobs_to_process), batch_size + ) + + with overall_pbar, batch_pbar: for i, batch in enumerate(batches, 1): - batch_pbar.reset(total=len(batch)) - batch_pbar.set_description(f"Batch {i}/{n_batches}") + progress_manager.update_batch_bar(batch_pbar, i, n_batches, len(batch)) # Create coroutines per batch coroutines = [ afunc(*args, **kwargs) for afunc, args, kwargs, _ in batch ] - for future in await as_completed(coroutines, max_workers): - result = await future - results.append(result) - overall_pbar.update(1) - batch_pbar.update(1) - - return results + async for result in process_futures( + as_completed(coroutines, max_workers), batch_pbar + ): + # Ensure result is always a tuple (counter, value) + if isinstance(result, Exception): + # Find the counter for this failed job + idx = coroutines.index(result.__context__) + counter = ( + batch[idx][0].__closure__[1].cell_contents + ) # counter from closure + results.append((counter, result)) + else: + results.append(result) + # Update overall progress bar for all futures in this batch + overall_pbar.update(len(batch)) async def _process_coroutines(self, jobs, pbar, results, max_workers): """Helper function to process coroutines and update the progress bar.""" coroutines = [afunc(*args, **kwargs) for afunc, args, kwargs, _ in jobs] - for future in await as_completed(coroutines, max_workers): - result = await future - results.append(result) - pbar.update(1) + async for result in process_futures( + as_completed(coroutines, max_workers), pbar + ): + # Ensure result is always a tuple (counter, value) + if isinstance(result, Exception): + idx = coroutines.index(result.__context__) + counter = ( + jobs[idx][0].__closure__[1].cell_contents + ) # counter from closure + results.append((counter, result)) + else: + results.append(result) + + async def aresults(self) -> t.List[t.Any]: + """ + Execute all submitted jobs and return their results asynchronously. + The results are returned in the order of job submission. + + This is the async entry point for executing async jobs when already in an async context. + """ + results = await self._process_jobs() + # If raise_exceptions is True, propagate the exception + for r in results: + if self.raise_exceptions and isinstance(r, Exception): + raise r + sorted_results = sorted(results, key=lambda x: x[0]) + return [r[1] for r in sorted_results] def results(self) -> t.List[t.Any]: """ Execute all submitted jobs and return their results. The results are returned in the order of job submission. + + This is the main sync entry point for executing async jobs. """ - if is_event_loop_running(): - # an event loop is running so call nested_asyncio to fix this - try: - import nest_asyncio - except ImportError as e: - raise ImportError( - "It seems like your running this in a jupyter-like environment. " - "Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) from e - else: - if not self._nest_asyncio_applied: - nest_asyncio.apply() - self._nest_asyncio_applied = True - results = asyncio.run(self._process_jobs()) - sorted_results = sorted(results, key=lambda x: x[0]) - return [r[1] for r in sorted_results] + async def _async_wrapper(): + return await self.aresults() + + return run(_async_wrapper) def run_async_batch( diff --git a/ragas/tests/unit/test_async_utils.py b/ragas/tests/unit/test_async_utils.py index 55d80744a..477f5be05 100644 --- a/ragas/tests/unit/test_async_utils.py +++ b/ragas/tests/unit/test_async_utils.py @@ -2,28 +2,69 @@ import pytest -from ragas.async_utils import as_completed, is_event_loop_running, run_async_tasks +from ragas.async_utils import run_async_tasks def test_is_event_loop_running_in_script(): + from ragas.async_utils import is_event_loop_running + assert is_event_loop_running() is False def test_as_completed_in_script(): - async def echo_order(index: int): - await asyncio.sleep(index) + from ragas.async_utils import as_completed + + async def echo_order(index: int, delay: float): + await asyncio.sleep(delay) return index async def _run(): + # Use decreasing delays so results come out in reverse order + coros = [echo_order(1, 0.3), echo_order(2, 0.2), echo_order(3, 0.1)] results = [] - for t in await as_completed([echo_order(1), echo_order(2), echo_order(3)], 3): + for t in as_completed(coros, 3): r = await t results.append(r) return results results = asyncio.run(_run()) + # Results should be [3, 2, 1] due to decreasing delays + assert results == [3, 2, 1] + + +def test_as_completed_max_workers(): + import time - assert results == [1, 2, 3] + from ragas.async_utils import as_completed + + async def sleeper(idx): + await asyncio.sleep(0.1) + return idx + + async def _run(): + start = time.time() + coros = [sleeper(i) for i in range(5)] + results = [] + for t in as_completed(coros, max_workers=2): + r = await t + results.append(r) + elapsed = time.time() - start + return results, elapsed + + results, elapsed = asyncio.run(_run()) + # With max_workers=2, total time should be at least 0.2s for 5 tasks + assert len(results) == 5 + assert elapsed >= 0.2 + + +def test_run_function(): + from ragas.async_utils import run + + async def foo(): + return 42 + + result = run(foo) + assert result == 42 @pytest.fixture @@ -34,28 +75,16 @@ async def echo_order(index: int): return [echo_order(i) for i in range(1, 11)] -@pytest.mark.asyncio -async def test_run_async_tasks_unbatched(tasks): - # Act +def test_run_async_tasks_unbatched(tasks): results = run_async_tasks(tasks) - - # Assert assert sorted(results) == sorted(range(1, 11)) -@pytest.mark.asyncio -async def test_run_async_tasks_batched(tasks): - # Act +def test_run_async_tasks_batched(tasks): results = run_async_tasks(tasks, batch_size=3) - - # Assert assert sorted(results) == sorted(range(1, 11)) -@pytest.mark.asyncio -async def test_run_async_tasks_no_progress(tasks): - # Act +def test_run_async_tasks_no_progress(tasks): results = run_async_tasks(tasks, show_progress=False) - - # Assert assert sorted(results) == sorted(range(1, 11)) From ce571cf02664f08eab005c1498cb3839234a2c90 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 12:04:26 -0400 Subject: [PATCH 5/8] feat: enhance Executor with job indexing and exception handling - Introduced a locking mechanism to ensure thread-safe job processing. - Added `_jobs_processed` attribute to maintain consistent job indexing across multiple runs. - Implemented `clear_jobs` method to reset job indices. - Updated tests to verify exception handling and job indexing after clearing jobs. - Adjusted Jupyter notebook tests for consistency in results retrieval. --- ragas/src/ragas/executor.py | 48 ++++++++++---- ragas/tests/unit/test_executor.py | 64 +++++++++++++++++++ .../tests/unit/test_executor_in_jupyter.ipynb | 44 +++++++++---- 3 files changed, 131 insertions(+), 25 deletions(-) diff --git a/ragas/src/ragas/executor.py b/ragas/src/ragas/executor.py index 1aeefa04b..7a4506ad3 100644 --- a/ragas/src/ragas/executor.py +++ b/ragas/src/ragas/executor.py @@ -1,11 +1,10 @@ from __future__ import annotations -import asyncio import logging +import threading import typing as t from dataclasses import dataclass, field -import nest_asyncio import numpy as np from tqdm.auto import tqdm @@ -48,15 +47,14 @@ class Executor: raise_exceptions: bool = False batch_size: t.Optional[int] = None run_config: t.Optional[RunConfig] = field(default=None, repr=False) - _nest_asyncio_applied: bool = field(default=False, repr=False) pbar: t.Optional[tqdm] = None + _jobs_processed: int = field(default=0, repr=False) + _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) def wrap_callable_with_index( self, callable: t.Callable, counter: int ) -> t.Callable: - async def wrapped_callable_async( - *args, **kwargs - ) -> t.Tuple[int, t.Callable | float]: + async def wrapped_callable_async(*args, **kwargs) -> t.Tuple[int, t.Any]: try: result = await callable(*args, **kwargs) return counter, result @@ -87,31 +85,53 @@ def submit( """ Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index. """ - callable_with_index = self.wrap_callable_with_index(callable, len(self.jobs)) - self.jobs.append((callable_with_index, args, kwargs, name)) + # Use _jobs_processed for consistent indexing across multiple runs + with self._lock: + callable_with_index = self.wrap_callable_with_index( + callable, self._jobs_processed + ) + self.jobs.append((callable_with_index, args, kwargs, name)) + self._jobs_processed += 1 + + def clear_jobs(self) -> None: + """Clear all submitted jobs and reset counter.""" + with self._lock: + self.jobs.clear() + self._jobs_processed = 0 async def _process_jobs(self) -> t.List[t.Any]: """Execute jobs with optional progress tracking.""" - max_workers = (self.run_config or RunConfig()).max_workers + with self._lock: + if not self.jobs: + return [] + + # Make a copy of jobs to process and clear the original list to prevent re-execution + jobs_to_process = self.jobs.copy() + self.jobs.clear() + + max_workers = ( + self.run_config.max_workers + if self.run_config and hasattr(self.run_config, "max_workers") + else -1 + ) results = [] pbm = ProgressBarManager(self.desc, self.show_progress) if not self.batch_size: # Use external progress bar if provided, otherwise create one if self.pbar is None: - with pbm.create_single_bar(len(self.jobs)) as internal_pbar: + with pbm.create_single_bar(len(jobs_to_process)) as internal_pbar: await self._process_coroutines( - self.jobs, internal_pbar, results, max_workers + jobs_to_process, internal_pbar, results, max_workers ) else: await self._process_coroutines( - self.jobs, self.pbar, results, max_workers + jobs_to_process, self.pbar, results, max_workers ) - return results # Process jobs in batches with nested progress bars - await self._process_batched_jobs(self.jobs, pbm, max_workers, results) + await self._process_batched_jobs(jobs_to_process, pbm, max_workers, results) return results async def _process_batched_jobs( diff --git a/ragas/tests/unit/test_executor.py b/ragas/tests/unit/test_executor.py index 0354a8a82..6c23cfea7 100644 --- a/ragas/tests/unit/test_executor.py +++ b/ragas/tests/unit/test_executor.py @@ -86,3 +86,67 @@ async def long_task(): assert len(results) == n_tasks assert all(r == 1 for r in results) assert end_time - start_time < 0.2 + + +def test_executor_exception_handling(): + """Test that exceptions are returned as np.nan when raise_exceptions is False.""" + import numpy as np + + async def fail_task(): + raise ValueError("fail") + + executor = Executor() + executor.submit(fail_task) + results = executor.results() + assert len(results) == 1 + assert np.isnan(results[0]) + + +def test_executor_exception_raises(): + """Test that exceptions are raised when raise_exceptions is True.""" + + async def fail_task(): + raise ValueError("fail") + + executor = Executor(raise_exceptions=True) + executor.submit(fail_task) + with pytest.raises(ValueError): + executor.results() + + +def test_executor_empty_jobs(): + """Test that results() returns an empty list if no jobs are submitted.""" + executor = Executor() + assert executor.results() == [] + + +def test_executor_job_index_after_clear(): + """Test that job indices reset after clearing jobs.""" + + async def echo(x): + return x + + executor = Executor() + executor.submit(echo, 1) + executor.clear_jobs() + executor.submit(echo, 42) + results = executor.results() + assert results == [42] + + +def test_executor_batch_size_edge_cases(): + """Test batch_size=1 and batch_size > number of jobs.""" + + async def echo(x): + return x + + # batch_size=1 + executor = Executor(batch_size=1) + for i in range(3): + executor.submit(echo, i) + assert executor.results() == [0, 1, 2] + # batch_size > jobs + executor = Executor(batch_size=10) + for i in range(3): + executor.submit(echo, i) + assert executor.results() == [0, 1, 2] diff --git a/ragas/tests/unit/test_executor_in_jupyter.ipynb b/ragas/tests/unit/test_executor_in_jupyter.ipynb index e8922e934..a4a493069 100644 --- a/ragas/tests/unit/test_executor_in_jupyter.ipynb +++ b/ragas/tests/unit/test_executor_in_jupyter.ipynb @@ -39,7 +39,7 @@ "metadata": {}, "outputs": [], "source": [ - "from ragas.executor import is_event_loop_running, as_completed" + "from ragas.async_utils import is_event_loop_running, as_completed" ] }, { @@ -59,7 +59,7 @@ "source": [ "async def _run():\n", " results = []\n", - " for t in await as_completed([echo(1), echo(2), echo(3)], 3):\n", + " for t in as_completed([echo(1), echo(2), echo(3)], 3):\n", " r = await t\n", " results.append(r)\n", " return results\n", @@ -78,6 +78,13 @@ "## Test Executor" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "_**NOTE**: Requires `ipywidgets` installed_" + ] + }, { "cell_type": "code", "execution_count": null, @@ -98,7 +105,22 @@ "for i in range(10):\n", " executor.submit(echo, i, name=f\"echo_{i}\")\n", "\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", + "assert results == list(range(10))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# test order of results when they should return in submission order\n", + "executor = Executor(raise_exceptions=True)\n", + "for i in range(10):\n", + " executor.submit(echo, i, name=f\"echo_{i}\")\n", + "\n", + "results = executor.results() # await executor.aresults()\n", "assert results == list(range(10))" ] }, @@ -116,7 +138,7 @@ " executor.submit(echo_random_latency, i, name=f\"echo_order_{i}\")\n", "\n", "# Act\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "# Assert\n", "assert results == list(range(10))" ] @@ -135,7 +157,7 @@ " executor.submit(echo_random_latency, i, name=f\"echo_order_{i}\")\n", "\n", "# Act\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "# Assert\n", "assert results == list(range(10))" ] @@ -154,7 +176,7 @@ " executor.submit(echo_random_latency, i, name=f\"echo_order_{i}\")\n", "\n", "# Act\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "# Assert\n", "assert results == list(range(10))" ] @@ -170,13 +192,13 @@ "for i in range(1000):\n", " executor.submit(asyncio.sleep, 0.01)\n", "\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "assert results, \"Results should be list of None\"\n", "\n", "for i in range(1000):\n", " executor.submit(asyncio.sleep, 0.01)\n", "\n", - "results = executor.results()\n", + "results = executor.results() # await executor.aresults()\n", "assert results, \"Results should be list of None\"" ] }, @@ -264,7 +286,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -285,7 +307,7 @@ ], "metadata": { "kernelspec": { - "display_name": "venv", + "display_name": ".venv", "language": "python", "name": "python3" }, @@ -299,7 +321,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.13.0" } }, "nbformat": 4, From f2a256426e658829831196be1dc02ef9d5b87d94 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 12:12:31 -0400 Subject: [PATCH 6/8] feat: update engine's apply_transformations to recursively process nested Transformation or Parallel procedures - The apply_transforms() function has been updated to handle different types of transformations recursively: If transforms is a list, it recursively applies each transform in the list. If transforms is a Parallel instance, it recursively applies the transformations contained within the Parallel object. If transforms is a BaseGraphTransformation, it generates an execution plan (a list of coroutines), gets a description, and then runs the coroutines asynchronously using run_async_tasks(). If transforms is none of the above, it raises a ValueError indicating an invalid type. - Move apply_nest_asyncio to async_utils for better organization - Updated the Parallel class to support transformations with improved type hints. - Added unit tests --- ragas/src/ragas/async_utils.py | 16 +++ ragas/src/ragas/testset/transforms/engine.py | 74 +++++------ ragas/tests/unit/test_engine.py | 130 +++++++++++++++++++ 3 files changed, 176 insertions(+), 44 deletions(-) create mode 100644 ragas/tests/unit/test_engine.py diff --git a/ragas/src/ragas/async_utils.py b/ragas/src/ragas/async_utils.py index 83a013932..83c85def4 100644 --- a/ragas/src/ragas/async_utils.py +++ b/ragas/src/ragas/async_utils.py @@ -9,6 +9,22 @@ logger = logging.getLogger(__name__) +def apply_nest_asyncio(): + NEST_ASYNCIO_APPLIED: bool = False + if is_event_loop_running(): + # an event loop is running so call nested_asyncio to fix this + try: + import nest_asyncio + except ImportError: + raise ImportError( + "It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work." + ) + + if not NEST_ASYNCIO_APPLIED: + nest_asyncio.apply() + NEST_ASYNCIO_APPLIED = True + + def is_event_loop_running() -> bool: """ Check if an event loop is currently running. diff --git a/ragas/src/ragas/testset/transforms/engine.py b/ragas/src/ragas/testset/transforms/engine.py index 76070cb81..20a0a5bb1 100644 --- a/ragas/src/ragas/testset/transforms/engine.py +++ b/ragas/src/ragas/testset/transforms/engine.py @@ -1,12 +1,11 @@ from __future__ import annotations -import asyncio import logging import typing as t from tqdm.auto import tqdm -from ragas.async_utils import as_completed, run_async_tasks +from ragas.async_utils import apply_nest_asyncio, as_completed, run_async_tasks from ragas.run_config import RunConfig from ragas.testset.graph import KnowledgeGraph from ragas.testset.transforms.base import BaseGraphTransformation @@ -17,7 +16,7 @@ logger = logging.getLogger(__name__) Transforms = t.Union[ - t.List[BaseGraphTransformation], + t.List[t.Union[BaseGraphTransformation, "Parallel"]], "Parallel", BaseGraphTransformation, ] @@ -32,22 +31,28 @@ class Parallel: >>> Parallel(HeadlinesExtractor(), SummaryExtractor()) """ - def __init__(self, *transformations: BaseGraphTransformation): + def __init__(self, *transformations: t.Union[BaseGraphTransformation, "Parallel"]): self.transformations = list(transformations) - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: coroutines = [] for transformation in self.transformations: coroutines.extend(transformation.generate_execution_plan(kg)) + class_names = [t.__class__.__name__ for t in self.transformations] + logger.debug( + f"Created {len(coroutines)} coroutines for transformations: {class_names}" + ) return coroutines -async def run_coroutines(coroutines: t.List[t.Coroutine], desc: str, max_workers: int): +async def run_coroutines( + coroutines: t.Sequence[t.Coroutine], desc: str, max_workers: int +): """ Run a list of coroutines in parallel. """ for future in tqdm( - await as_completed(coroutines, max_workers=max_workers), + as_completed(coroutines, max_workers=max_workers), desc=desc, total=len(coroutines), # whether you want to keep the progress bar after completion @@ -67,22 +72,6 @@ def get_desc(transform: BaseGraphTransformation | Parallel): return f"Applying {transform.__class__.__name__}" -def apply_nest_asyncio(): - NEST_ASYNCIO_APPLIED: bool = False - if is_event_loop_running(): - # an event loop is running so call nested_asyncio to fix this - try: - import nest_asyncio - except ImportError: - raise ImportError( - "It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) - - if not NEST_ASYNCIO_APPLIED: - nest_asyncio.apply() - NEST_ASYNCIO_APPLIED = True - - def apply_transforms( kg: KnowledgeGraph, transforms: Transforms, @@ -90,39 +79,36 @@ def apply_transforms( callbacks: t.Optional[Callbacks] = None, ): """ - Apply a list of transformations to a knowledge graph in place. + Recursively apply transformations to a knowledge graph in place. """ # apply nest_asyncio to fix the event loop issue in jupyter apply_nest_asyncio() - # if single transformation, wrap it in a list - if isinstance(transforms, BaseGraphTransformation): - transforms = [transforms] + max_workers = getattr(run_config, "max_workers", -1) - # apply the transformations - # if Sequences, apply each transformation sequentially - if isinstance(transforms, t.List): + if isinstance(transforms, list): for transform in transforms: - asyncio.run( - run_coroutines( - transform.generate_execution_plan(kg), - get_desc(transform), - run_config.max_workers, - ) - ) - # if Parallel, collect inside it and run it all + apply_transforms(kg, transform, run_config, callbacks) elif isinstance(transforms, Parallel): - asyncio.run( - run_coroutines( - transforms.generate_execution_plan(kg), - get_desc(transforms), - run_config.max_workers, - ) + apply_transforms(kg, transforms.transformations, run_config, callbacks) + elif isinstance(transforms, BaseGraphTransformation): + logger.debug( + f"Generating execution plan for transformation {transforms.__class__.__name__}" + ) + coros = transforms.generate_execution_plan(kg) + desc = get_desc(transforms) + run_async_tasks( + coros, + batch_size=None, + show_progress=True, + progress_bar_desc=desc, + max_workers=max_workers, ) else: raise ValueError( f"Invalid transforms type: {type(transforms)}. Expects a list of BaseGraphTransformations or a Parallel instance." ) + logger.debug("All transformations applied successfully.") def rollback_transforms(kg: KnowledgeGraph, transforms: Transforms): diff --git a/ragas/tests/unit/test_engine.py b/ragas/tests/unit/test_engine.py new file mode 100644 index 000000000..33aa3c910 --- /dev/null +++ b/ragas/tests/unit/test_engine.py @@ -0,0 +1,130 @@ +import asyncio +import types +import typing as t + +import pytest + +from ragas.testset.graph import KnowledgeGraph, Node, NodeType +from ragas.testset.transforms.base import BaseGraphTransformation +from ragas.testset.transforms.engine import Parallel, apply_transforms, get_desc + + +class DummyTransformation(BaseGraphTransformation): + def __init__(self, name="Dummy"): + self.name = name + + def generate_execution_plan(self, kg): + return [self.double(node) for node in kg.nodes] + + async def transform( + self, kg: KnowledgeGraph + ) -> t.List[t.Tuple[Node, t.Tuple[str, t.Any]]]: + filtered = self.filter(kg) + nodes = sorted( + filtered.nodes, key=lambda n: n.get_property("page_content") or "" + ) + return [(node, await self.double(node)) for node in nodes] + + async def double(self, node): + # Repeat the text in a single node's 'page_content' property + content = node.get_property("page_content") + if content is not None: + node.properties["page_content"] = content * 2 + return node + + +@pytest.fixture +def kg(): + import string + + kg = KnowledgeGraph() + for letter in string.ascii_uppercase[:10]: + node = Node( + properties={"page_content": letter}, + type=NodeType.DOCUMENT, + ) + kg.add(node) + return kg + + +def test_parallel_stores_transformations(): + t1 = DummyTransformation("A") + t2 = DummyTransformation("B") + p = Parallel(t1, t2) + assert p.transformations == [t1, t2] + + +def test_parallel_generate_execution_plan_aggregates(kg): + t1 = DummyTransformation("A") + t2 = DummyTransformation("B") + p = Parallel(t1, t2) + coros = p.generate_execution_plan(kg) + assert len(coros) == len(kg.nodes) * 2 # Each transformation runs on each node + assert all(isinstance(c, types.CoroutineType) for c in coros) + + # Await all coroutines to avoid RuntimeWarning + async def run_all(): + await asyncio.gather(*coros) + + asyncio.run(run_all()) + + +def test_parallel_nested(kg): + t1 = DummyTransformation("A") + t2 = DummyTransformation("B") + p_inner = Parallel(t1) + p_outer = Parallel(p_inner, t2) + coros = p_outer.generate_execution_plan(kg) + assert len(coros) == len(kg.nodes) * 2 # Each transformation runs on each node + assert all(isinstance(c, types.CoroutineType) for c in coros) + + # Await all coroutines to avoid RuntimeWarning + async def run_all(): + await asyncio.gather(*coros) + + asyncio.run(run_all()) + + +def test_get_desc_parallel_and_single(): + t1 = DummyTransformation("A") + p = Parallel(t1) + desc_p = get_desc(p) + desc_t = get_desc(t1) + assert "Parallel" not in desc_t + assert "DummyTransformation" in desc_p or "DummyTransformation" in desc_t + + +def test_apply_transforms_single(kg): + t1 = DummyTransformation() + apply_transforms(kg, t1) + # All nodes' page_content should be doubled + for node in kg.nodes: + content = node.get_property("page_content") + assert content == (content[0] * 2) + + +def test_apply_transforms_list(kg): + t1 = DummyTransformation() + t2 = DummyTransformation() + apply_transforms(kg, [t1, t2]) + # Each transformation doubles the content, so after two: x -> xxxx + for node in kg.nodes: + content = node.get_property("page_content") + assert content == (content[0] * 2 * 2) + + +def test_apply_transforms_parallel(kg): + t1 = DummyTransformation() + t2 = DummyTransformation() + p = Parallel(t1, t2) + apply_transforms(kg, p) + # Each transformation in parallel doubles the content, but both operate on the same initial state, so after both: x -> xx (not xxxx) + for node in kg.nodes: + content = node.get_property("page_content") + assert content == (content[0] * 2 * 2) + + +def test_apply_transforms_invalid(): + kg = KnowledgeGraph() + with pytest.raises(ValueError): + apply_transforms(kg, 123) # type: ignore From 38a33911fd8b915b4137eed3417629964ae6e982 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 13:05:15 -0400 Subject: [PATCH 7/8] chore: use Sequence over List for type checking improvements; add debug logging --- ragas/src/ragas/async_utils.py | 4 +- ragas/src/ragas/testset/transforms/base.py | 95 +++++++++++++------- ragas/src/ragas/testset/transforms/engine.py | 8 +- 3 files changed, 69 insertions(+), 38 deletions(-) diff --git a/ragas/src/ragas/async_utils.py b/ragas/src/ragas/async_utils.py index 83c85def4..57fd2122f 100644 --- a/ragas/src/ragas/async_utils.py +++ b/ragas/src/ragas/async_utils.py @@ -115,7 +115,7 @@ def run( def run_async_tasks( - tasks: t.List[t.Coroutine], + tasks: t.Sequence[t.Coroutine], batch_size: t.Optional[int] = None, show_progress: bool = True, progress_bar_desc: str = "Running async tasks", @@ -127,7 +127,7 @@ def run_async_tasks( NOTE: Order of results is not guaranteed! Args: - tasks: List of coroutines to execute + tasks: Sequence of coroutines to execute batch_size: Optional size for batching tasks. If None, runs all concurrently show_progress: Whether to display progress bars max_workers: Maximum number of concurrent tasks (-1 for unlimited) diff --git a/ragas/src/ragas/testset/transforms/base.py b/ragas/src/ragas/testset/transforms/base.py index 49945e482..e58e12bbe 100644 --- a/ragas/src/ragas/testset/transforms/base.py +++ b/ragas/src/ragas/testset/transforms/base.py @@ -68,20 +68,30 @@ def filter(self, kg: KnowledgeGraph) -> KnowledgeGraph: KnowledgeGraph The filtered knowledge graph. """ - + logger.debug("Filtering KnowledgeGraph with %s", self.filter_nodes.__name__) + filtered_nodes = [node for node in kg.nodes if self.filter_nodes(node)] + node_ids = {node.id for node in filtered_nodes} + filtered_relationships = [ + rel + for rel in kg.relationships + if (rel.source.id in node_ids) and (rel.target.id in node_ids) + ] + logger.debug( + "Filter reduced KnowledgeGraph by %d/%d nodes and %d/%d relationships", + len(kg.nodes) - len(filtered_nodes), + len(kg.nodes), + len(kg.relationships) - len(filtered_relationships), + len(kg.relationships), + ) return KnowledgeGraph( - nodes=[node for node in kg.nodes if self.filter_nodes(node)], - relationships=[ - rel - for rel in kg.relationships - if rel.source in kg.nodes and rel.target in kg.nodes - ], + nodes=filtered_nodes, + relationships=filtered_relationships, ) @abstractmethod - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in sequence by the Executor. This + Generates a sequence of coroutines to be executed in sequence by the Executor. This coroutine will, upon execution, write the transformation into the KnowledgeGraph. Parameters @@ -91,8 +101,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ pass @@ -159,9 +169,9 @@ async def extract(self, node: Node) -> t.Tuple[str, t.Any]: """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in parallel by the Executor. + Generates a sequence of coroutines to be executed in parallel by the Executor. Parameters ---------- @@ -170,8 +180,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ async def apply_extract(node: Node): @@ -186,7 +196,13 @@ async def apply_extract(node: Node): ) filtered = self.filter(kg) - return [apply_extract(node) for node in filtered.nodes] + plan = [apply_extract(node) for node in filtered.nodes] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan @dataclass @@ -197,7 +213,6 @@ class LLMBasedExtractor(Extractor, PromptMixin): tokenizer: Encoding = DEFAULT_TOKENIZER def split_text_by_token_limit(self, text, max_token_limit): - # Tokenize the entire input string tokens = self.tokenizer.encode(text) @@ -268,9 +283,9 @@ async def split(self, node: Node) -> t.Tuple[t.List[Node], t.List[Relationship]] """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in parallel by the Executor. + Generates a sequence of coroutines to be executed in parallel by the Executor. Parameters ---------- @@ -279,8 +294,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ async def apply_split(node: Node): @@ -289,7 +304,13 @@ async def apply_split(node: Node): kg.relationships.extend(relationships) filtered = self.filter(kg) - return [apply_split(node) for node in filtered.nodes] + plan = [apply_split(node) for node in filtered.nodes] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan class RelationshipBuilder(BaseGraphTransformation): @@ -319,9 +340,9 @@ async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]: """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed in parallel by the Executor. + Generates a sequence of coroutines to be executed in parallel by the Executor. Parameters ---------- @@ -330,8 +351,8 @@ def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: Returns ------- - t.List[t.Coroutine] - A list of coroutines to be executed in parallel. + t.Sequence[t.Coroutine] + A sequence of coroutines to be executed in parallel. """ async def apply_build_relationships( @@ -341,14 +362,18 @@ async def apply_build_relationships( original_kg.relationships.extend(relationships) filtered_kg = self.filter(kg) - return [apply_build_relationships(filtered_kg=filtered_kg, original_kg=kg)] + plan = [apply_build_relationships(filtered_kg=filtered_kg, original_kg=kg)] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan @dataclass class NodeFilter(BaseGraphTransformation): - async def transform(self, kg: KnowledgeGraph) -> KnowledgeGraph: - filtered = self.filter(kg) for node in filtered.nodes: @@ -378,9 +403,9 @@ async def custom_filter(self, node: Node, kg: KnowledgeGraph) -> bool: """ pass - def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]: + def generate_execution_plan(self, kg: KnowledgeGraph) -> t.Sequence[t.Coroutine]: """ - Generates a list of coroutines to be executed + Generates a sequence of coroutines to be executed """ async def apply_filter(node: Node): @@ -388,7 +413,13 @@ async def apply_filter(node: Node): kg.remove_node(node) filtered = self.filter(kg) - return [apply_filter(node) for node in filtered.nodes] + plan = [apply_filter(node) for node in filtered.nodes] + logger.debug( + "Created %d coroutines for %s", + len(plan), + self.__class__.__name__, + ) + return plan @dataclass diff --git a/ragas/src/ragas/testset/transforms/engine.py b/ragas/src/ragas/testset/transforms/engine.py index 20a0a5bb1..29eec1a52 100644 --- a/ragas/src/ragas/testset/transforms/engine.py +++ b/ragas/src/ragas/testset/transforms/engine.py @@ -49,7 +49,7 @@ async def run_coroutines( coroutines: t.Sequence[t.Coroutine], desc: str, max_workers: int ): """ - Run a list of coroutines in parallel. + Run a sequence of coroutines in parallel. """ for future in tqdm( as_completed(coroutines, max_workers=max_workers), @@ -86,7 +86,7 @@ def apply_transforms( max_workers = getattr(run_config, "max_workers", -1) - if isinstance(transforms, list): + if isinstance(transforms, t.Sequence): for transform in transforms: apply_transforms(kg, transform, run_config, callbacks) elif isinstance(transforms, Parallel): @@ -106,14 +106,14 @@ def apply_transforms( ) else: raise ValueError( - f"Invalid transforms type: {type(transforms)}. Expects a list of BaseGraphTransformations or a Parallel instance." + f"Invalid transforms type: {type(transforms)}. Expects a sequence of BaseGraphTransformations or a Parallel instance." ) logger.debug("All transformations applied successfully.") def rollback_transforms(kg: KnowledgeGraph, transforms: Transforms): """ - Rollback a list of transformations from a knowledge graph. + Rollback a sequence of transformations from a knowledge graph. Note ---- From 5a741aefe0a1bf401fc34b49487a49a46cfd3174 Mon Sep 17 00:00:00 2001 From: ahgraber Date: Mon, 9 Jun 2025 13:59:53 -0400 Subject: [PATCH 8/8] refactor: remove threading lock from Executor class --- ragas/src/ragas/executor.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/ragas/src/ragas/executor.py b/ragas/src/ragas/executor.py index 7a4506ad3..47a5c108b 100644 --- a/ragas/src/ragas/executor.py +++ b/ragas/src/ragas/executor.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import threading import typing as t from dataclasses import dataclass, field @@ -49,7 +48,6 @@ class Executor: run_config: t.Optional[RunConfig] = field(default=None, repr=False) pbar: t.Optional[tqdm] = None _jobs_processed: int = field(default=0, repr=False) - _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) def wrap_callable_with_index( self, callable: t.Callable, counter: int @@ -86,28 +84,25 @@ def submit( Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index. """ # Use _jobs_processed for consistent indexing across multiple runs - with self._lock: - callable_with_index = self.wrap_callable_with_index( - callable, self._jobs_processed - ) - self.jobs.append((callable_with_index, args, kwargs, name)) - self._jobs_processed += 1 + callable_with_index = self.wrap_callable_with_index( + callable, self._jobs_processed + ) + self.jobs.append((callable_with_index, args, kwargs, name)) + self._jobs_processed += 1 def clear_jobs(self) -> None: """Clear all submitted jobs and reset counter.""" - with self._lock: - self.jobs.clear() - self._jobs_processed = 0 + self.jobs.clear() + self._jobs_processed = 0 async def _process_jobs(self) -> t.List[t.Any]: """Execute jobs with optional progress tracking.""" - with self._lock: - if not self.jobs: - return [] + if not self.jobs: + return [] - # Make a copy of jobs to process and clear the original list to prevent re-execution - jobs_to_process = self.jobs.copy() - self.jobs.clear() + # Make a copy of jobs to process and clear the original list to prevent re-execution + jobs_to_process = self.jobs.copy() + self.jobs.clear() max_workers = ( self.run_config.max_workers