From ab8b90622ea4f96188bdf0ab5d5d946fffc8ebca Mon Sep 17 00:00:00 2001 From: namoshizun Date: Mon, 5 May 2025 21:30:21 +0800 Subject: [PATCH] feat: add new Handler kwarg `multiprocessing_queue`, when set to false, will enqueue messages to a queue.Queue instance --- .pre-commit-config.yaml | 10 +++++----- loguru/__init__.pyi | 27 ++++++++++++++++++--------- loguru/_better_exceptions.py | 1 - loguru/_defaults.py | 13 +++++++------ loguru/_file_sink.py | 2 +- loguru/_handler.py | 22 +++++++++++++++------- loguru/_logger.py | 17 +++++++++++------ tests/typesafety/test_logger.yml | 6 +++--- 8 files changed, 60 insertions(+), 38 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1e72d8df5..8aad9f464 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,8 +26,8 @@ repos: rev: 24.10.0 hooks: - id: black -- repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.8.0 - hooks: - - id: ruff - args: [--fix, --exit-non-zero-on-fix] +# - repo: https://github.com/astral-sh/ruff-pre-commit +# rev: v0.8.0 +# hooks: +# - id: ruff +# args: [--fix, --exit-non-zero-on-fix, --unsafe-fixes] diff --git a/loguru/__init__.pyi b/loguru/__init__.pyi index cd4f7a114..9cbaf70ff 100644 --- a/loguru/__init__.pyi +++ b/loguru/__init__.pyi @@ -134,6 +134,7 @@ class BasicHandlerConfig(TypedDict, total=False): backtrace: bool diagnose: bool enqueue: bool + multiprocessing_queue: bool catch: bool class FileHandlerConfig(TypedDict, total=False): @@ -146,6 +147,7 @@ class FileHandlerConfig(TypedDict, total=False): backtrace: bool diagnose: bool enqueue: bool + multiprocessing_queue: bool catch: bool rotation: Optional[ Union[ @@ -179,6 +181,7 @@ class AsyncHandlerConfig(TypedDict, total=False): backtrace: bool diagnose: bool enqueue: bool + multiprocessing_queue: bool catch: bool context: Optional[Union[str, BaseContext]] loop: Optional[AbstractEventLoop] @@ -207,8 +210,9 @@ class Logger: backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., + multiprocessing_queue: bool = ..., context: Optional[Union[str, BaseContext]] = ..., - catch: bool = ... + catch: bool = ..., ) -> int: ... @overload def add( @@ -223,9 +227,9 @@ class Logger: backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., - catch: bool = ..., + multiprocessing_queue: bool = ..., context: Optional[Union[str, BaseContext]] = ..., - loop: Optional[AbstractEventLoop] = ... + loop: Optional[AbstractEventLoop] = ..., ) -> int: ... @overload def add( @@ -240,6 +244,7 @@ class Logger: backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., + multiprocessing_queue: bool = ..., context: Optional[Union[str, BaseContext]] = ..., catch: bool = ..., rotation: Optional[ @@ -276,7 +281,7 @@ class Logger: onerror: Optional[Callable[[BaseException], None]] = ..., exclude: Optional[Union[Type[BaseException], Tuple[Type[BaseException], ...]]] = ..., default: Any = ..., - message: str = ... + message: str = ..., ) -> Catcher: ... @overload def catch(self, function: _F) -> _F: ... @@ -290,7 +295,7 @@ class Logger: raw: bool = ..., capture: bool = ..., depth: int = ..., - ansi: bool = ... + ansi: bool = ..., ) -> Logger: ... def bind(__self, **kwargs: Any) -> Logger: ... # noqa: N805 def contextualize(__self, **kwargs: Any) -> Contextualizer: ... # noqa: N805 @@ -318,7 +323,7 @@ class Logger: levels: Optional[Sequence[LevelConfig]] = ..., extra: Optional[Dict[Any, Any]] = ..., patcher: Optional[PatcherFunction] = ..., - activation: Optional[Sequence[ActivationConfig]] = ... + activation: Optional[Sequence[ActivationConfig]] = ..., ) -> List[int]: ... def reinstall(self) -> None: ... # @staticmethod cannot be used with @overload in mypy (python/mypy#7781). @@ -332,7 +337,7 @@ class Logger: pattern: Union[str, Pattern[str]], *, cast: Union[Dict[str, Callable[[str], Any]], Callable[[Dict[str, str]], None]] = ..., - chunk: int = ... + chunk: int = ..., ) -> Generator[Dict[str, Any], None, None]: ... @overload def parse( @@ -341,7 +346,7 @@ class Logger: pattern: Union[bytes, Pattern[bytes]], *, cast: Union[Dict[str, Callable[[bytes], Any]], Callable[[Dict[str, bytes]], None]] = ..., - chunk: int = ... + chunk: int = ..., ) -> Generator[Dict[str, Any], None, None]: ... @overload def trace(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805 @@ -377,7 +382,11 @@ class Logger: def exception(__self, __message: Any) -> None: ... # noqa: N805 @overload def log( - __self, __level: Union[int, str], __message: str, *args: Any, **kwargs: Any # noqa: N805 + __self, + __level: Union[int, str], + __message: str, + *args: Any, + **kwargs: Any, # noqa: N805 ) -> None: ... @overload def log(__self, __level: Union[int, str], __message: Any) -> None: ... # noqa: N805 diff --git a/loguru/_better_exceptions.py b/loguru/_better_exceptions.py index 658fc7f66..c14524067 100644 --- a/loguru/_better_exceptions.py +++ b/loguru/_better_exceptions.py @@ -534,7 +534,6 @@ def _format_exception( yield from self._indent("-" * 35, group_nesting + 1, prefix="+-") def _format_list(self, frames): - def source_message(filename, lineno, name, line): message = ' File "%s", line %d, in %s\n' % (filename, lineno, name) if line: diff --git a/loguru/_defaults.py b/loguru/_defaults.py index cc0bf422a..a075d08c3 100644 --- a/loguru/_defaults.py +++ b/loguru/_defaults.py @@ -43,20 +43,21 @@ def env(key, type_, default=None): LOGURU_BACKTRACE = env("LOGURU_BACKTRACE", bool, True) LOGURU_DIAGNOSE = env("LOGURU_DIAGNOSE", bool, True) LOGURU_ENQUEUE = env("LOGURU_ENQUEUE", bool, False) +LOGURU_MULTIPROCESSING_QUEUE = env("LOGURU_MULTIPROCESSING_QUEUE", bool, True) LOGURU_CONTEXT = env("LOGURU_CONTEXT", str, None) LOGURU_CATCH = env("LOGURU_CATCH", bool, True) LOGURU_TRACE_NO = env("LOGURU_TRACE_NO", int, 5) LOGURU_TRACE_COLOR = env("LOGURU_TRACE_COLOR", str, "") -LOGURU_TRACE_ICON = env("LOGURU_TRACE_ICON", str, "\u270F\uFE0F") # Pencil +LOGURU_TRACE_ICON = env("LOGURU_TRACE_ICON", str, "\u270f\ufe0f") # Pencil LOGURU_DEBUG_NO = env("LOGURU_DEBUG_NO", int, 10) LOGURU_DEBUG_COLOR = env("LOGURU_DEBUG_COLOR", str, "") -LOGURU_DEBUG_ICON = env("LOGURU_DEBUG_ICON", str, "\U0001F41E") # Lady Beetle +LOGURU_DEBUG_ICON = env("LOGURU_DEBUG_ICON", str, "\U0001f41e") # Lady Beetle LOGURU_INFO_NO = env("LOGURU_INFO_NO", int, 20) LOGURU_INFO_COLOR = env("LOGURU_INFO_COLOR", str, "") -LOGURU_INFO_ICON = env("LOGURU_INFO_ICON", str, "\u2139\uFE0F") # Information +LOGURU_INFO_ICON = env("LOGURU_INFO_ICON", str, "\u2139\ufe0f") # Information LOGURU_SUCCESS_NO = env("LOGURU_SUCCESS_NO", int, 25) LOGURU_SUCCESS_COLOR = env("LOGURU_SUCCESS_COLOR", str, "") @@ -64,12 +65,12 @@ def env(key, type_, default=None): LOGURU_WARNING_NO = env("LOGURU_WARNING_NO", int, 30) LOGURU_WARNING_COLOR = env("LOGURU_WARNING_COLOR", str, "") -LOGURU_WARNING_ICON = env("LOGURU_WARNING_ICON", str, "\u26A0\uFE0F") # Warning +LOGURU_WARNING_ICON = env("LOGURU_WARNING_ICON", str, "\u26a0\ufe0f") # Warning LOGURU_ERROR_NO = env("LOGURU_ERROR_NO", int, 40) LOGURU_ERROR_COLOR = env("LOGURU_ERROR_COLOR", str, "") -LOGURU_ERROR_ICON = env("LOGURU_ERROR_ICON", str, "\u274C") # Cross Mark +LOGURU_ERROR_ICON = env("LOGURU_ERROR_ICON", str, "\u274c") # Cross Mark LOGURU_CRITICAL_NO = env("LOGURU_CRITICAL_NO", int, 50) LOGURU_CRITICAL_COLOR = env("LOGURU_CRITICAL_COLOR", str, "") -LOGURU_CRITICAL_ICON = env("LOGURU_CRITICAL_ICON", str, "\u2620\uFE0F") # Skull and Crossbones +LOGURU_CRITICAL_ICON = env("LOGURU_CRITICAL_ICON", str, "\u2620\ufe0f") # Skull and Crossbones diff --git a/loguru/_file_sink.py b/loguru/_file_sink.py index 25d63f946..e6a198cd0 100644 --- a/loguru/_file_sink.py +++ b/loguru/_file_sink.py @@ -174,7 +174,7 @@ def __init__( mode="a", buffering=1, encoding="utf8", - **kwargs + **kwargs, ): self.encoding = encoding diff --git a/loguru/_handler.py b/loguru/_handler.py index 81a3dca08..8807753e4 100644 --- a/loguru/_handler.py +++ b/loguru/_handler.py @@ -4,6 +4,7 @@ import os import threading from contextlib import contextmanager +from queue import Queue from threading import Thread from ._colorizer import Colorizer @@ -45,7 +46,8 @@ def __init__( error_interceptor, exception_formatter, id_, - levels_ansi_codes + levels_ansi_codes, + multiprocessing_queue, ): self._name = name self._sink = sink @@ -90,13 +92,19 @@ def __init__( if self._enqueue: if self._multiprocessing_context is None: - self._queue = multiprocessing.SimpleQueue() - self._confirmation_event = multiprocessing.Event() - self._confirmation_lock = multiprocessing.Lock() + mp = multiprocessing else: - self._queue = self._multiprocessing_context.SimpleQueue() - self._confirmation_event = self._multiprocessing_context.Event() - self._confirmation_lock = self._multiprocessing_context.Lock() + mp = self._multiprocessing_context + + if multiprocessing_queue: + self._queue = mp.SimpleQueue() + self._confirmation_event = mp.Event() + self._confirmation_lock = mp.Lock() + else: + self._queue = Queue() + self._confirmation_event = threading.Event() + self._confirmation_lock = threading.Lock() + self._queue_lock = create_handler_lock() self._owner_process_pid = os.getpid() self._thread = Thread( diff --git a/loguru/_logger.py b/loguru/_logger.py index 3d246de08..cad91cb74 100644 --- a/loguru/_logger.py +++ b/loguru/_logger.py @@ -268,9 +268,10 @@ def add( backtrace=_defaults.LOGURU_BACKTRACE, diagnose=_defaults.LOGURU_DIAGNOSE, enqueue=_defaults.LOGURU_ENQUEUE, + multiprocessing_queue=_defaults.LOGURU_MULTIPROCESSING_QUEUE, context=_defaults.LOGURU_CONTEXT, catch=_defaults.LOGURU_CATCH, - **kwargs + **kwargs, ): r"""Add a handler sending log messages to a sink adequately configured. @@ -300,9 +301,13 @@ def add( Whether the exception trace should display the variables values to eases the debugging. This should be set to ``False`` in production to avoid leaking sensitive data. enqueue : |bool|, optional - Whether the messages to be logged should first pass through a multiprocessing-safe queue - before reaching the sink. This is useful while logging to a file through multiple + Whether the messages to be logged should first pass through a multiprocessing/threading + queue before reaching the sink. This is useful while logging to a file through multiple processes. This also has the advantage of making logging calls non-blocking. + multiprocessing_queue : |bool|, optional + Whether to enqueue messages to a multiprocessing-safe queue or a simple queue. + If you program will never spawn child processes, you may choose to use the simple queue + to reduce the enqueue overhead. context : |multiprocessing.Context| or |str|, optional A context object or name that will be used for all tasks involving internally the |multiprocessing| module, in particular when ``enqueue=True``. If ``None``, the default @@ -1029,6 +1034,7 @@ def add( error_interceptor=error_interceptor, exception_formatter=exception_formatter, levels_ansi_codes=self._core.levels_ansi_codes, + multiprocessing_queue=multiprocessing_queue, ) handlers = self._core.handlers.copy() @@ -1163,7 +1169,7 @@ def catch( default=None, message="An error has been caught in function '{record[function]}', " "process '{record[process].name}' ({record[process].id}), " - "thread '{record[thread].name}' ({record[thread].id}):" + "thread '{record[thread].name}' ({record[thread].id}):", ): """Return a decorator to automatically log possibly caught error in wrapped function. @@ -1315,7 +1321,6 @@ def catch_wrapper(*args, **kwargs): elif isasyncgenfunction(function): class AsyncGenCatchWrapper(AsyncGenerator): - def __init__(self, gen): self._gen = gen @@ -1364,7 +1369,7 @@ def opt( raw=False, capture=True, depth=0, - ansi=False + ansi=False, ): r"""Parametrize a logging call to slightly change generated log message. diff --git a/tests/typesafety/test_logger.yml b/tests/typesafety/test_logger.yml index 5ab5e5532..a9dea5a05 100644 --- a/tests/typesafety/test_logger.yml +++ b/tests/typesafety/test_logger.yml @@ -285,9 +285,9 @@ out: | main:2: error: No overload variant of "add" of "Logger" matches argument types "Callable[[Any], None]", "int" main:2: note: Possible overload variants: - main:2: note: def add(self, sink: Union[TextIO, Writable, Callable[[Message], None], Handler], *, level: Union[str, int] = ..., format: Union[str, Callable[[Record], str]] = ..., filter: Union[str, Callable[[Record], bool], Dict[Optional[str], Union[str, int, bool]], None] = ..., colorize: Optional[bool] = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: Union[str, BaseContext, None] = ..., catch: bool = ...) -> int - main:2: note: def add(self, sink: Callable[[Message], Awaitable[None]], *, level: Union[str, int] = ..., format: Union[str, Callable[[Record], str]] = ..., filter: Union[str, Callable[[Record], bool], Dict[Optional[str], Union[str, int, bool]], None] = ..., colorize: Optional[bool] = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., catch: bool = ..., context: Union[str, BaseContext, None] = ..., loop: Optional[AbstractEventLoop] = ...) -> int - main:2: note: def add(self, sink: Union[str, PathLike[str]], *, level: Union[str, int] = ..., format: Union[str, Callable[[Record], str]] = ..., filter: Union[str, Callable[[Record], bool], Dict[Optional[str], Union[str, int, bool]], None] = ..., colorize: Optional[bool] = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: Union[str, BaseContext, None] = ..., catch: bool = ..., rotation: Union[str, int, time, timedelta, Callable[[Message, TextIO], bool], List[Union[str, int, time, timedelta, Callable[[Message, TextIO], bool]]], None] = ..., retention: Union[str, int, timedelta, Callable[[List[str]], None], None] = ..., compression: Union[str, Callable[[str], None], None] = ..., delay: bool = ..., watch: bool = ..., mode: str = ..., buffering: int = ..., encoding: str = ..., errors: Optional[str] = ..., newline: Optional[str] = ..., closefd: bool = ..., opener: Optional[Callable[[str, int], int]] = ...) -> int + main:2: note: def add(self, sink: Union[TextIO, Writable, Callable[[Message], None], Handler], *, level: Union[str, int] = ..., format: Union[str, Callable[[Record], str]] = ..., filter: Union[str, Callable[[Record], bool], Dict[Optional[str], Union[str, int, bool]], None] = ..., colorize: Optional[bool] = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., multiprocessing_queue: bool = ..., context: Union[str, BaseContext, None] = ..., catch: bool = ...) -> int + main:2: note: def add(self, sink: Callable[[Message], Awaitable[None]], *, level: Union[str, int] = ..., format: Union[str, Callable[[Record], str]] = ..., filter: Union[str, Callable[[Record], bool], Dict[Optional[str], Union[str, int, bool]], None] = ..., colorize: Optional[bool] = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., multiprocessing_queue: bool = ..., context: Union[str, BaseContext, None] = ..., loop: Optional[AbstractEventLoop] = ...) -> int + main:2: note: def add(self, sink: Union[str, PathLike[str]], *, level: Union[str, int] = ..., format: Union[str, Callable[[Record], str]] = ..., filter: Union[str, Callable[[Record], bool], Dict[Optional[str], Union[str, int, bool]], None] = ..., colorize: Optional[bool] = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., multiprocessing_queue: bool = ..., context: Union[str, BaseContext, None] = ..., catch: bool = ..., rotation: Union[str, int, time, timedelta, Callable[[Message, TextIO], bool], List[Union[str, int, time, timedelta, Callable[[Message, TextIO], bool]]], None] = ..., retention: Union[str, int, timedelta, Callable[[List[str]], None], None] = ..., compression: Union[str, Callable[[str], None], None] = ..., delay: bool = ..., watch: bool = ..., mode: str = ..., buffering: int = ..., encoding: str = ..., errors: Optional[str] = ..., newline: Optional[str] = ..., closefd: bool = ..., opener: Optional[Callable[[str, int], int]] = ...) -> int - case: invalid_logged_object_formatting main: |