diff --git a/check.sh b/check.sh index b659675968..a1efa66a20 100755 --- a/check.sh +++ b/check.sh @@ -1,6 +1,7 @@ #!/bin/bash set -ex +set -o pipefail ON_GITHUB_CI=true EXIT_STATUS=0 @@ -55,8 +56,7 @@ MYPY=0 echo "::group::Mypy" # Cleanup previous runs. rm -f mypy_annotate.dat -# Pipefail makes these pipelines fail if mypy does, even if mypy_annotate.py succeeds. -set -o pipefail + mypy --show-error-end --platform linux | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Linux \ || { echo "* Mypy (Linux) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; } # Darwin tests FreeBSD too @@ -64,7 +64,7 @@ mypy --show-error-end --platform darwin | python ./src/trio/_tools/mypy_annotate || { echo "* Mypy (Mac) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; } mypy --show-error-end --platform win32 | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Windows \ || { echo "* Mypy (Windows) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; } -set +o pipefail + # Re-display errors using Github's syntax, read out of mypy_annotate.dat python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat # Then discard. diff --git a/src/trio/_core/__init__.py b/src/trio/_core/__init__.py index fdef90292d..4aa096fd0b 100644 --- a/src/trio/_core/__init__.py +++ b/src/trio/_core/__init__.py @@ -68,7 +68,6 @@ temporarily_detach_coroutine_object, wait_task_rescheduled, ) -from ._unbounded_queue import UnboundedQueue, UnboundedQueueStatistics # Windows imports if sys.platform == "win32": diff --git a/src/trio/_core/_generated_io_kqueue.py b/src/trio/_core/_generated_io_kqueue.py index 556d29e1f2..3618f48d73 100644 --- a/src/trio/_core/_generated_io_kqueue.py +++ b/src/trio/_core/_generated_io_kqueue.py @@ -14,7 +14,7 @@ from collections.abc import Callable from contextlib import AbstractContextManager - from .. import _core + from .._channel import MemoryReceiveChannel from .._file_io import _HasFileNo from ._traps import Abort, RaiseCancelT @@ -46,7 +46,7 @@ def current_kqueue() -> select.kqueue: @enable_ki_protection def monitor_kevent( ident: int, filter: int -) -> AbstractContextManager[_core.UnboundedQueue[select.kevent]]: +) -> AbstractContextManager[MemoryReceiveChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. diff --git a/src/trio/_core/_generated_io_windows.py b/src/trio/_core/_generated_io_windows.py index 211f81215c..b530fda2b6 100644 --- a/src/trio/_core/_generated_io_windows.py +++ b/src/trio/_core/_generated_io_windows.py @@ -14,8 +14,8 @@ from typing_extensions import Buffer + from .._channel import MemoryReceiveChannel from .._file_io import _HasFileNo - from ._unbounded_queue import UnboundedQueue from ._windows_cffi import CData, Handle assert not TYPE_CHECKING or sys.platform == "win32" @@ -191,7 +191,7 @@ def current_iocp() -> int: @enable_ki_protection def monitor_completion_key() -> ( - AbstractContextManager[tuple[int, UnboundedQueue[object]]] + AbstractContextManager[tuple[int, MemoryReceiveChannel[object]]] ): """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 9718c4df80..fed4da83f4 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -1,6 +1,7 @@ from __future__ import annotations import errno +import math import select import sys from contextlib import contextmanager @@ -18,7 +19,8 @@ from typing_extensions import TypeAlias - from .._core import Abort, RaiseCancelT, Task, UnboundedQueue + from .._channel import MemoryReceiveChannel, MemorySendChannel + from .._core import Abort, RaiseCancelT, Task from .._file_io import _HasFileNo assert not TYPE_CHECKING or (sys.platform != "linux" and sys.platform != "win32") @@ -36,8 +38,7 @@ class _KqueueStatistics: @attrs.define(eq=False) class KqueueIOManager: _kqueue: select.kqueue = attrs.Factory(select.kqueue) - # {(ident, filter): Task or UnboundedQueue} - _registered: dict[tuple[int, int], Task | UnboundedQueue[select.kevent]] = ( + _registered: dict[tuple[int, int], Task | MemorySendChannel[select.kevent]] = ( attrs.Factory(dict) ) _force_wakeup: WakeupSocketpair = attrs.Factory(WakeupSocketpair) @@ -98,7 +99,7 @@ def process_events(self, events: EventResult) -> None: if isinstance(receiver, _core.Task): _core.reschedule(receiver, outcome.Value(event)) else: - receiver.put_nowait(event) # TODO: test this line + receiver.send_nowait(event) # TODO: test this line # kevent registration is complicated -- e.g. aio submission can # implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will @@ -122,25 +123,26 @@ def current_kqueue(self) -> select.kqueue: @contextmanager @_public def monitor_kevent( - self, - ident: int, - filter: int, - ) -> Iterator[_core.UnboundedQueue[select.kevent]]: + self, ident: int, filter: int + ) -> Iterator[MemoryReceiveChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. """ + from .._channel import open_memory_channel + key = (ident, filter) if key in self._registered: raise _core.BusyResourceError( "attempt to register multiple listeners for same ident/filter pair", ) - q = _core.UnboundedQueue[select.kevent]() - self._registered[key] = q + send, recv = open_memory_channel[select.kevent](math.inf) + self._registered[key] = send try: - yield q + yield recv finally: - del self._registered[key] + send.close() + self._registered.pop(key, None) @_public async def wait_kevent( @@ -275,9 +277,9 @@ def notify_closing(self, fd: int | _HasFileNo) -> None: for filter_ in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]: key = (fd, filter_) - receiver = self._registered.get(key) - - if receiver is None: + try: + receiver = self._registered.pop(key) + except KeyError: continue if type(receiver) is _core.Task: @@ -285,10 +287,5 @@ def notify_closing(self, fd: int | _HasFileNo) -> None: self._kqueue.control([event], 0) exc = _core.ClosedResourceError("another task closed this fd") _core.reschedule(receiver, outcome.Error(exc)) - del self._registered[key] else: - # XX this is an interesting example of a case where being able - # to close a queue would be useful... - raise NotImplementedError( - "can't close an fd that monitor_kevent is using", - ) + receiver.close() diff --git a/src/trio/_core/_io_windows.py b/src/trio/_core/_io_windows.py index 148253ab88..4676e6c5cf 100644 --- a/src/trio/_core/_io_windows.py +++ b/src/trio/_core/_io_windows.py @@ -2,6 +2,7 @@ import enum import itertools +import math import socket import sys from contextlib import contextmanager @@ -44,9 +45,9 @@ from typing_extensions import Buffer, TypeAlias + from .._channel import MemoryReceiveChannel, MemorySendChannel from .._file_io import _HasFileNo from ._traps import Abort, RaiseCancelT - from ._unbounded_queue import UnboundedQueue EventResult: TypeAlias = int T = TypeVar("T") @@ -455,7 +456,7 @@ def __init__(self) -> None: self._overlapped_waiters: dict[CData, _core.Task] = {} self._posted_too_late_to_cancel: set[CData] = set() - self._completion_key_queues: dict[int, UnboundedQueue[object]] = {} + self._completion_key_queues: dict[int, MemorySendChannel[object]] = {} self._completion_key_counter = itertools.count(CKeys.USER_DEFINED) with socket.socket() as s: @@ -641,7 +642,7 @@ def process_events(self, received: EventResult) -> None: lpOverlapped=overlapped, dwNumberOfBytesTransferred=transferred, ) - queue.put_nowait(info) + queue.send_nowait(info) def _register_with_iocp(self, handle_: int | CData, completion_key: int) -> None: handle = _handle(handle_) @@ -1027,16 +1028,21 @@ def current_iocp(self) -> int: @contextmanager @_public - def monitor_completion_key(self) -> Iterator[tuple[int, UnboundedQueue[object]]]: + def monitor_completion_key( + self, + ) -> Iterator[tuple[int, MemoryReceiveChannel[object]]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__ and `#52 `__. """ + from .._channel import open_memory_channel + key = next(self._completion_key_counter) - queue = _core.UnboundedQueue[object]() - self._completion_key_queues[key] = queue + send, recv = open_memory_channel[object](math.inf) + self._completion_key_queues[key] = send try: - yield (key, queue) + yield (key, recv) finally: + send.close() del self._completion_key_queues[key] diff --git a/src/trio/_core/_parking_lot.py b/src/trio/_core/_parking_lot.py index af6ff610ee..292012ba1e 100644 --- a/src/trio/_core/_parking_lot.py +++ b/src/trio/_core/_parking_lot.py @@ -21,12 +21,11 @@ # theirs and our tasks are lighter, so for us #objects is smaller and #tasks # is larger. # -# This is in the core because for two reasons. First, it's used by -# UnboundedQueue, and UnboundedQueue is used for a number of things in the -# core. And second, it's responsible for providing fairness to all of our -# high-level synchronization primitives (locks, queues, etc.). For now with -# our FIFO scheduler this is relatively trivial (it's just a FIFO waitqueue), -# but in the future we ever start support task priorities or fair scheduling +# This is in the core because it's responsible for providing fairness to all +# of our high-level synchronization primitives (locks, queues, etc.). For now +# with our FIFO scheduler this is relatively trivial (it's just a FIFO +# waitqueue), but in the future we ever start support task priorities or fair +# scheduling # # https://github.com/python-trio/trio/issues/32 # diff --git a/src/trio/_core/_tests/test_io.py b/src/trio/_core/_tests/test_io.py index 379daa025e..e3f7a324fb 100644 --- a/src/trio/_core/_tests/test_io.py +++ b/src/trio/_core/_tests/test_io.py @@ -384,7 +384,6 @@ def check(*, expected_readers: int, expected_writers: int) -> None: check(expected_readers=1, expected_writers=0) -@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning") async def test_io_manager_kqueue_monitors_statistics() -> None: def check( *, @@ -411,13 +410,17 @@ def check( # 1 for call_soon_task check(expected_monitors=0, expected_readers=1, expected_writers=0) - with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ): + with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ) as q: with ( pytest.raises(_core.BusyResourceError), _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ), ): pass # pragma: no cover check(expected_monitors=1, expected_readers=1, expected_writers=0) + _core.notify_closing(a1) + a1.close() + with trio.fail_after(1): + assert len([v async for v in q]) == 0 check(expected_monitors=0, expected_readers=1, expected_writers=0) diff --git a/src/trio/_core/_tests/test_unbounded_queue.py b/src/trio/_core/_tests/test_unbounded_queue.py deleted file mode 100644 index e483ecd30a..0000000000 --- a/src/trio/_core/_tests/test_unbounded_queue.py +++ /dev/null @@ -1,154 +0,0 @@ -from __future__ import annotations - -import itertools - -import pytest - -from ... import _core -from ...testing import assert_checkpoints, wait_all_tasks_blocked - -pytestmark = pytest.mark.filterwarnings( - "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning", -) - - -async def test_UnboundedQueue_basic() -> None: - q: _core.UnboundedQueue[str | int | None] = _core.UnboundedQueue() - q.put_nowait("hi") - assert await q.get_batch() == ["hi"] - with pytest.raises(_core.WouldBlock): - q.get_batch_nowait() - q.put_nowait(1) - q.put_nowait(2) - q.put_nowait(3) - assert q.get_batch_nowait() == [1, 2, 3] - - assert q.empty() - assert q.qsize() == 0 - q.put_nowait(None) - assert not q.empty() - assert q.qsize() == 1 - - stats = q.statistics() - assert stats.qsize == 1 - assert stats.tasks_waiting == 0 - - # smoke test - repr(q) - - -async def test_UnboundedQueue_blocking() -> None: - record = [] - q = _core.UnboundedQueue[int]() - - async def get_batch_consumer() -> None: - while True: - batch = await q.get_batch() - assert batch - record.append(batch) - - async def aiter_consumer() -> None: - async for batch in q: - assert batch - record.append(batch) - - for consumer in (get_batch_consumer, aiter_consumer): - record.clear() - async with _core.open_nursery() as nursery: - nursery.start_soon(consumer) - await _core.wait_all_tasks_blocked() - stats = q.statistics() - assert stats.qsize == 0 - assert stats.tasks_waiting == 1 - q.put_nowait(10) - q.put_nowait(11) - await _core.wait_all_tasks_blocked() - q.put_nowait(12) - await _core.wait_all_tasks_blocked() - assert record == [[10, 11], [12]] - nursery.cancel_scope.cancel() - - -async def test_UnboundedQueue_fairness() -> None: - q = _core.UnboundedQueue[int]() - - # If there's no-one else around, we can put stuff in and take it out - # again, no problem - q.put_nowait(1) - q.put_nowait(2) - assert q.get_batch_nowait() == [1, 2] - - result = None - - async def get_batch(q: _core.UnboundedQueue[int]) -> None: - nonlocal result - result = await q.get_batch() - - # But if someone else is waiting to read, then they get dibs - async with _core.open_nursery() as nursery: - nursery.start_soon(get_batch, q) - await _core.wait_all_tasks_blocked() - q.put_nowait(3) - q.put_nowait(4) - with pytest.raises(_core.WouldBlock): - q.get_batch_nowait() - assert result == [3, 4] - - # If two tasks are trying to read, they alternate - record = [] - - async def reader(name: str) -> None: - while True: - record.append((name, await q.get_batch())) - - async with _core.open_nursery() as nursery: - nursery.start_soon(reader, "a") - await _core.wait_all_tasks_blocked() - nursery.start_soon(reader, "b") - await _core.wait_all_tasks_blocked() - - for i in range(20): - q.put_nowait(i) - await _core.wait_all_tasks_blocked() - - nursery.cancel_scope.cancel() - - assert record == list(zip(itertools.cycle("ab"), [[i] for i in range(20)])) - - -async def test_UnboundedQueue_trivial_yields() -> None: - q = _core.UnboundedQueue[None]() - - q.put_nowait(None) - with assert_checkpoints(): - await q.get_batch() - - q.put_nowait(None) - with assert_checkpoints(): - async for _ in q: # pragma: no branch - break - - -async def test_UnboundedQueue_no_spurious_wakeups() -> None: - # If we have two tasks waiting, and put two items into the queue... then - # only one task wakes up - record = [] - - async def getter(q: _core.UnboundedQueue[int], i: int) -> None: - got = await q.get_batch() - record.append((i, got)) - - async with _core.open_nursery() as nursery: - q = _core.UnboundedQueue[int]() - nursery.start_soon(getter, q, 1) - await wait_all_tasks_blocked() - nursery.start_soon(getter, q, 2) - await wait_all_tasks_blocked() - - for i in range(10): - q.put_nowait(i) - await wait_all_tasks_blocked() - - assert record == [(1, list(range(10)))] - - nursery.cancel_scope.cancel() diff --git a/src/trio/_core/_tests/test_windows.py b/src/trio/_core/_tests/test_windows.py index e4a1bab615..e548326935 100644 --- a/src/trio/_core/_tests/test_windows.py +++ b/src/trio/_core/_tests/test_windows.py @@ -79,10 +79,6 @@ def test_winerror(monkeypatch: pytest.MonkeyPatch) -> None: assert exc.value.filename2 == "b/file" -# The undocumented API that this is testing should be changed to stop using -# UnboundedQueue (or just removed until we have time to redo it), but until -# then we filter out the warning. -@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning") async def test_completion_key_listen() -> None: from .. import _io_windows @@ -98,17 +94,13 @@ async def post(key: int) -> None: with _core.monitor_completion_key() as (key, queue): async with _core.open_nursery() as nursery: nursery.start_soon(post, key) - i = 0 + print("loop") - async for batch in queue: # pragma: no branch - print("got some", batch) - for info in batch: - assert isinstance(info, _io_windows.CompletionKeyEventInfo) - assert info.lpOverlapped == 0 - assert info.dwNumberOfBytesTransferred == i - i += 1 - if i == 10: - break + for i in range(10): + info = await queue.receive() + assert isinstance(info, _io_windows.CompletionKeyEventInfo) + assert info.lpOverlapped == 0 + assert info.dwNumberOfBytesTransferred == i print("end loop") diff --git a/src/trio/_core/_unbounded_queue.py b/src/trio/_core/_unbounded_queue.py deleted file mode 100644 index b9e7974841..0000000000 --- a/src/trio/_core/_unbounded_queue.py +++ /dev/null @@ -1,163 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Generic, TypeVar - -import attrs - -from .. import _core -from .._deprecate import deprecated -from .._util import final - -T = TypeVar("T") - -if TYPE_CHECKING: - from typing_extensions import Self - - -@attrs.frozen -class UnboundedQueueStatistics: - """An object containing debugging information. - - Currently, the following fields are defined: - - * ``qsize``: The number of items currently in the queue. - * ``tasks_waiting``: The number of tasks blocked on this queue's - :meth:`get_batch` method. - - """ - - qsize: int - tasks_waiting: int - - -@final -class UnboundedQueue(Generic[T]): - """An unbounded queue suitable for certain unusual forms of inter-task - communication. - - This class is designed for use as a queue in cases where the producer for - some reason cannot be subjected to back-pressure, i.e., :meth:`put_nowait` - has to always succeed. In order to prevent the queue backlog from actually - growing without bound, the consumer API is modified to dequeue items in - "batches". If a consumer task processes each batch without yielding, then - this helps achieve (but does not guarantee) an effective bound on the - queue's memory use, at the cost of potentially increasing system latencies - in general. You should generally prefer to use a memory channel - instead if you can. - - Currently each batch completely empties the queue, but `this may change in - the future `__. - - A :class:`UnboundedQueue` object can be used as an asynchronous iterator, - where each iteration returns a new batch of items. I.e., these two loops - are equivalent:: - - async for batch in queue: - ... - - while True: - obj = await queue.get_batch() - ... - - """ - - @deprecated( - "0.9.0", - issue=497, - thing="trio.lowlevel.UnboundedQueue", - instead="trio.open_memory_channel(math.inf)", - use_triodeprecationwarning=True, - ) - def __init__(self) -> None: - self._lot = _core.ParkingLot() - self._data: list[T] = [] - # used to allow handoff from put to the first task in the lot - self._can_get = False - - def __repr__(self) -> str: - return f"" - - def qsize(self) -> int: - """Returns the number of items currently in the queue.""" - return len(self._data) - - def empty(self) -> bool: - """Returns True if the queue is empty, False otherwise. - - There is some subtlety to interpreting this method's return value: see - `issue #63 `__. - - """ - return not self._data - - @_core.enable_ki_protection - def put_nowait(self, obj: T) -> None: - """Put an object into the queue, without blocking. - - This always succeeds, because the queue is unbounded. We don't provide - a blocking ``put`` method, because it would never need to block. - - Args: - obj (object): The object to enqueue. - - """ - if not self._data: - assert not self._can_get - if self._lot: - self._lot.unpark(count=1) - else: - self._can_get = True - self._data.append(obj) - - def _get_batch_protected(self) -> list[T]: - data = self._data.copy() - self._data.clear() - self._can_get = False - return data - - def get_batch_nowait(self) -> list[T]: - """Attempt to get the next batch from the queue, without blocking. - - Returns: - list: A list of dequeued items, in order. On a successful call this - list is always non-empty; if it would be empty we raise - :exc:`~trio.WouldBlock` instead. - - Raises: - ~trio.WouldBlock: if the queue is empty. - - """ - if not self._can_get: - raise _core.WouldBlock - return self._get_batch_protected() - - async def get_batch(self) -> list[T]: - """Get the next batch from the queue, blocking as necessary. - - Returns: - list: A list of dequeued items, in order. This list is always - non-empty. - - """ - await _core.checkpoint_if_cancelled() - if not self._can_get: - await self._lot.park() - return self._get_batch_protected() - else: - try: - return self._get_batch_protected() - finally: - await _core.cancel_shielded_checkpoint() - - def statistics(self) -> UnboundedQueueStatistics: - """Return an :class:`UnboundedQueueStatistics` object containing debugging information.""" - return UnboundedQueueStatistics( - qsize=len(self._data), - tasks_waiting=self._lot.statistics().tasks_waiting, - ) - - def __aiter__(self) -> Self: - return self - - async def __anext__(self) -> list[T]: - return await self.get_batch() diff --git a/src/trio/_tools/gen_exports.py b/src/trio/_tools/gen_exports.py index 5b1affe24a..ae6b0293e8 100755 --- a/src/trio/_tools/gen_exports.py +++ b/src/trio/_tools/gen_exports.py @@ -380,6 +380,7 @@ def main() -> None: # pragma: no cover from collections.abc import Callable from contextlib import AbstractContextManager + from .._channel import MemoryReceiveChannel from .. import _core from .._file_io import _HasFileNo from ._traps import Abort, RaiseCancelT @@ -393,8 +394,8 @@ def main() -> None: # pragma: no cover from typing_extensions import Buffer + from .._channel import MemoryReceiveChannel from .._file_io import _HasFileNo - from ._unbounded_queue import UnboundedQueue from ._windows_cffi import Handle, CData """ diff --git a/src/trio/lowlevel.py b/src/trio/lowlevel.py index 9e385a0045..2a869652ad 100644 --- a/src/trio/lowlevel.py +++ b/src/trio/lowlevel.py @@ -22,8 +22,6 @@ RunVarToken as RunVarToken, Task as Task, TrioToken as TrioToken, - UnboundedQueue as UnboundedQueue, - UnboundedQueueStatistics as UnboundedQueueStatistics, add_instrument as add_instrument, add_parking_lot_breaker as add_parking_lot_breaker, cancel_shielded_checkpoint as cancel_shielded_checkpoint,