Skip to content

Providing a way to wrap blocking iterators #1344

Open
@vxgmichel

Description

@vxgmichel

It is currently non-trivial to integrate blocking iterators into trio, see the issues #501 and #1308 about trio.Path.iterdir for instance.

The main problem is that there are many kind of iterators:

  • fast vs slow
  • small/finite vs infinite/large
  • ordered vs sporadic

Delegating to a thread is not free: in particular, there are two costy operations:

  • spawning a thread
  • switching contexts (i.e giving the control back to the trio event loop)

On my machine for instance, the full round trip is 200 us. This means that running trio.to_thread.run_sync for producing each item in range(5000) would add an overhead of about one second.

I've wrote a small benchmark comparing several approaches and I came up with one that (I think) comes close to ticking all the boxes:

import trio
import time
import outcome

async def to_thread_iter_sync(fn, *args, cancellable=False, limiter=None):
    """Convert a blocking iteration into an async iteration using a thread.

    In order to attenuate the overhead of spawning threads and switching
    contexts, values from the blocking iteration are batched for a time one
    order of magnitude greater than the spawn time of a thread.
    """

    def run_batch(items_iter, start_time):
        now = time.monotonic()
        spawn_time = now - start_time
        deadline = now + 10 * spawn_time

        if items_iter is None:
            items_iter = iter(fn(*args))

        batch = []
        while True:

            try:
                item = next(items_iter)
            except Exception as exc:
                batch.append(outcome.Error(exc))
                break
            else:
                batch.append(outcome.Value(item))

            if time.monotonic() > deadline:
                break

        return items_iter, batch

    items_iter = None
    while True:

        items_iter, batch = await trio.to_thread.run_sync(
            run_batch,
            items_iter,
            time.monotonic(),
            cancellable=cancellable,
            limiter=limiter
        )

        for result in batch:
            try:
                yield result.unwrap()
            except StopIteration:
                return

I'd like to submit a PR to add this function as trio.to_thread.iter_sync, if you think the idea is worth considering.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions