Skip to content

feature: improve async / executor functionality #2070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
1 change: 0 additions & 1 deletion docs/references/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@
members:
- Executor
- run_async_batch
- is_event_loop_running
194 changes: 135 additions & 59 deletions ragas/src/ragas/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,165 @@
"""Async utils."""

import asyncio
from typing import Any, Coroutine, List, Optional
import logging
import typing as t

from tqdm.auto import tqdm

from ragas.executor import is_event_loop_running
from ragas.utils import batched
logger = logging.getLogger(__name__)


def apply_nest_asyncio():
NEST_ASYNCIO_APPLIED: bool = False
if is_event_loop_running():
# an event loop is running so call nested_asyncio to fix this
try:
import nest_asyncio
except ImportError:
raise ImportError(
"It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work."
)

if not NEST_ASYNCIO_APPLIED:
nest_asyncio.apply()
NEST_ASYNCIO_APPLIED = True


def is_event_loop_running() -> bool:
"""
Check if an event loop is currently running.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return False
else:
return loop.is_running()


def as_completed(
coroutines: t.Sequence[t.Coroutine], max_workers: int = -1
) -> t.Iterator[asyncio.Future]:
"""
Wrap coroutines with a semaphore if max_workers is specified.

Returns an iterator of futures that completes as tasks finish.
"""
if max_workers == -1:
tasks = [asyncio.create_task(coro) for coro in coroutines]
else:
semaphore = asyncio.Semaphore(max_workers)

async def sema_coro(coro):
async with semaphore:
return await coro

tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines]

return asyncio.as_completed(tasks)


async def process_futures(
futures: t.Iterator[asyncio.Future], pbar: t.Optional[tqdm] = None
) -> t.AsyncGenerator[t.Any, None]:
"""
Process futures with optional progress tracking.

Args:
futures: Iterator of asyncio futures to process (e.g., from asyncio.as_completed)
pbar: Optional progress bar to update

Yields:
Results from completed futures as they finish
"""
# Process completed futures as they finish
for future in futures:
try:
result = await future
except Exception as e:
result = e

if pbar:
pbar.update(1)
yield result


def run(
async_func: t.Union[
t.Callable[[], t.Coroutine[t.Any, t.Any, t.Any]],
t.Coroutine[t.Any, t.Any, t.Any],
],
) -> t.Any:
"""Run an async function in the current event loop or a new one if not running."""
try:
# Check if we're already in a running event loop
loop = asyncio.get_running_loop()
# If we get here, we're in a running loop - need nest_asyncio
try:
import nest_asyncio
except ImportError as e:
raise ImportError(
"It seems like you're running this in a jupyter-like environment. "
"Please install nest_asyncio with `pip install nest_asyncio` to make it work."
) from e

nest_asyncio.apply()
# Create the coroutine if it's a callable, otherwise use directly
coro = async_func() if callable(async_func) else async_func
return loop.run_until_complete(coro)

except RuntimeError:
# No running event loop, so we can use asyncio.run
coro = async_func() if callable(async_func) else async_func
return asyncio.run(coro)


def run_async_tasks(
tasks: List[Coroutine],
batch_size: Optional[int] = None,
tasks: t.Sequence[t.Coroutine],
batch_size: t.Optional[int] = None,
show_progress: bool = True,
progress_bar_desc: str = "Running async tasks",
) -> List[Any]:
max_workers: int = -1,
) -> t.List[t.Any]:
"""
Execute async tasks with optional batching and progress tracking.

NOTE: Order of results is not guaranteed!

Args:
tasks: List of coroutines to execute
tasks: Sequence of coroutines to execute
batch_size: Optional size for batching tasks. If None, runs all concurrently
show_progress: Whether to display progress bars
max_workers: Maximum number of concurrent tasks (-1 for unlimited)
"""
from ragas.utils import ProgressBarManager, batched

async def _run():
total_tasks = len(tasks)
results = []
pbm = ProgressBarManager(progress_bar_desc, show_progress)

# If no batching, run all tasks concurrently with single progress bar
if not batch_size:
with tqdm(
total=total_tasks,
desc=progress_bar_desc,
disable=not show_progress,
) as pbar:
for future in asyncio.as_completed(tasks):
result = await future
results.append(result)
pbar.update(1)
return results

# With batching, show nested progress bars
batches = batched(tasks, batch_size) # generator
n_batches = (total_tasks + batch_size - 1) // batch_size
with (
tqdm(
total=total_tasks,
desc=progress_bar_desc,
disable=not show_progress,
position=0,
leave=True,
) as overall_pbar,
tqdm(
total=batch_size,
desc=f"Batch 1/{n_batches}",
disable=not show_progress,
position=1,
leave=False,
) as batch_pbar,
):
for i, batch in enumerate(batches, 1):
batch_pbar.reset(total=len(batch))
batch_pbar.set_description(f"Batch {i}/{n_batches}")
for future in asyncio.as_completed(batch):
result = await future
with pbm.create_single_bar(total_tasks) as pbar:
async for result in process_futures(
as_completed(tasks, max_workers), pbar
):
results.append(result)
overall_pbar.update(1)
batch_pbar.update(1)
else:
total_tasks = len(tasks)
batches = batched(tasks, batch_size)
overall_pbar, batch_pbar, n_batches = pbm.create_nested_bars(
total_tasks, batch_size
)
with overall_pbar, batch_pbar:
for i, batch in enumerate(batches, 1):
pbm.update_batch_bar(batch_pbar, i, n_batches, len(batch))
async for result in process_futures(
as_completed(batch, max_workers), batch_pbar
):
results.append(result)
overall_pbar.update(len(batch))

return results

if is_event_loop_running():
# an event loop is running so call nested_asyncio to fix this
try:
import nest_asyncio
except ImportError:
raise ImportError(
"It seems like your running this in a jupyter-like environment. "
"Please install nest_asyncio with `pip install nest_asyncio` to make it work."
)
else:
nest_asyncio.apply()

results = asyncio.run(_run())
return results
return run(_run)
Loading
Loading