|
6 | 6 | from typing import Any, TypeVar
|
7 | 7 |
|
8 | 8 | from models_library.rabbitmq_basic_types import RPCMethodName
|
9 |
| -from pydantic import SecretStr |
10 | 9 |
|
11 | 10 | from ..logging_utils import log_context
|
12 | 11 | from ._errors import RPCServerError
|
13 | 12 |
|
14 | 13 | DecoratedCallable = TypeVar("DecoratedCallable", bound=Callable[..., Any])
|
15 | 14 |
|
| 15 | +# NOTE: this is equivalent to http access logs |
16 | 16 | _logger = logging.getLogger("rpc.access")
|
17 | 17 |
|
18 |
| -_RPC_CUSTOM_ENCODER: dict[Any, Callable[[Any], Any]] = { |
19 |
| - SecretStr: SecretStr.get_secret_value |
20 |
| -} |
| 18 | + |
| 19 | +def _create_func_msg(func, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str: |
| 20 | + msg = f"{func.__name__}(" |
| 21 | + |
| 22 | + if args_msg := ", ".join(map(str, args)): |
| 23 | + msg += args_msg |
| 24 | + |
| 25 | + if kwargs_msg := ", ".join({f"{name}={value}" for name, value in kwargs.items()}): |
| 26 | + if args: |
| 27 | + msg += ", " |
| 28 | + msg += kwargs_msg |
| 29 | + |
| 30 | + return f"{msg})" |
21 | 31 |
|
22 | 32 |
|
23 | 33 | @dataclass
|
24 | 34 | class RPCRouter:
|
25 | 35 | routes: dict[RPCMethodName, Callable] = field(default_factory=dict)
|
26 | 36 |
|
27 |
| - def expose(self) -> Callable[[DecoratedCallable], DecoratedCallable]: |
28 |
| - def decorator(func: DecoratedCallable) -> DecoratedCallable: |
| 37 | + def expose( |
| 38 | + self, |
| 39 | + *, |
| 40 | + reraise_if_error_type: tuple[type[Exception], ...] | None = None, |
| 41 | + ) -> Callable[[DecoratedCallable], DecoratedCallable]: |
| 42 | + def _decorator(func: DecoratedCallable) -> DecoratedCallable: |
29 | 43 | @functools.wraps(func)
|
30 |
| - async def wrapper(*args, **kwargs): |
| 44 | + async def _wrapper(*args, **kwargs): |
| 45 | + |
31 | 46 | with log_context(
|
| 47 | + # NOTE: this is intentionally analogous to the http access log traces. |
| 48 | + # To change log-level use getLogger("rpc.access").set_level(...) |
32 | 49 | _logger,
|
33 | 50 | logging.INFO,
|
34 |
| - msg=f"calling {func.__name__} with {args}, {kwargs}", |
| 51 | + msg=f"RPC call {_create_func_msg(func, args, kwargs)}", |
| 52 | + log_duration=True, |
35 | 53 | ):
|
36 | 54 | try:
|
37 | 55 | return await func(*args, **kwargs)
|
| 56 | + |
38 | 57 | except asyncio.CancelledError:
|
39 | 58 | _logger.debug("call was cancelled")
|
40 | 59 | raise
|
| 60 | + |
41 | 61 | except Exception as exc: # pylint: disable=broad-except
|
| 62 | + if reraise_if_error_type and isinstance( |
| 63 | + exc, reraise_if_error_type |
| 64 | + ): |
| 65 | + raise |
| 66 | + |
42 | 67 | _logger.exception("Unhandled exception:")
|
43 | 68 | # NOTE: we do not return internal exceptions over RPC
|
44 | 69 | raise RPCServerError(
|
45 | 70 | method_name=func.__name__,
|
46 |
| - exc_type=f"{type(exc)}", |
| 71 | + exc_type=f"{exc.__class__.__module__}.{exc.__class__.__name__}", |
47 | 72 | msg=f"{exc}",
|
48 | 73 | ) from None
|
49 | 74 |
|
50 |
| - self.routes[RPCMethodName(func.__name__)] = wrapper |
| 75 | + self.routes[RPCMethodName(func.__name__)] = _wrapper |
51 | 76 | return func
|
52 | 77 |
|
53 |
| - return decorator |
| 78 | + return _decorator |
0 commit comments