diff --git a/catalystwan/apigw_auth.py b/catalystwan/apigw_auth.py index 2a8266ae..8ed12450 100644 --- a/catalystwan/apigw_auth.py +++ b/catalystwan/apigw_auth.py @@ -34,7 +34,7 @@ class ApiGwAuth(AuthBase, AuthProtocol): 2. Use the token in the Authorization header for subsequent requests. """ - def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool = False): + def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool | str = False): self.login = login self.token = "" self.org_registered: bool = False @@ -86,7 +86,7 @@ def get_token( base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.Logger] = None, - verify: bool = False, + verify: bool | str = False, timeout: int = 10, ) -> str: try: @@ -118,7 +118,7 @@ def register_org( base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.Logger] = None, - verify: bool = False, + verify: bool | str = False, timeout: int = 10, ) -> None: try: diff --git a/catalystwan/endpoints/api_gateway.py b/catalystwan/endpoints/api_gateway.py index 513133ef..abbe430d 100644 --- a/catalystwan/endpoints/api_gateway.py +++ b/catalystwan/endpoints/api_gateway.py @@ -20,5 +20,4 @@ def configuration_reload(self) -> None: ... @post("/apigw/client/registration") - def on_board_client(self, payload: OnBoardClient) -> None: - ... + def on_board_client(self, payload: OnBoardClient) -> None: ... diff --git a/catalystwan/session.py b/catalystwan/session.py index 921af332..da9eb3e4 100644 --- a/catalystwan/session.py +++ b/catalystwan/session.py @@ -211,6 +211,7 @@ class ManagerSession(ManagerResponseAdapter, APIEndpointClient): subdomain: subdomain specifying to which view switch when creating provider as a tenant session, works only on provider user mode logger: override default module logger + verify: bool or str: bool to verify SSL certificate, or a path to a CA bundle Attributes: api: APIContainer: container for API methods @@ -222,7 +223,7 @@ class ManagerSession(ManagerResponseAdapter, APIEndpointClient): api_version: Version: API version restart_timeout: int: restart timeout in seconds session_type: SessionType: type of session - verify: bool: verify SSL certificate + verify: bool or str: bool to verify SSL certificate, or a path to a CA bundle """ @@ -233,17 +234,18 @@ def __init__( subdomain: Optional[str] = None, logger: Optional[logging.Logger] = None, request_limiter: Optional[RequestLimiter] = None, + verify: bool | str = False, ) -> None: self.base_url = base_url self.subdomain = subdomain self._session_type = SessionType.NOT_DEFINED self.server_name: Optional[str] = None self.logger = logger or logging.getLogger(__name__) - self.response_trace: Callable[ - [Optional[Response], Union[Request, PreparedRequest, None]], str - ] = response_history_debug + self.response_trace: Callable[[Optional[Response], Union[Request, PreparedRequest, None]], str] = ( + response_history_debug + ) super(ManagerSession, self).__init__() - self.verify = False + self.verify = verify self.headers.update({"User-Agent": USER_AGENT}) self._added_to_auth = False self._auth = auth @@ -429,6 +431,8 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse: _kwargs = dict(kwargs) if self.request_timeout is not None: # do not modify user provided kwargs unless property is set _kwargs.update(timeout=self.request_timeout) + if "verify" not in kwargs: + _kwargs.update(verify=self.verify) try: with self._limiter: response = super(ManagerSession, self).request(method, full_url, *args, **_kwargs) diff --git a/catalystwan/vmanage_auth.py b/catalystwan/vmanage_auth.py index 0813388c..a7c800af 100644 --- a/catalystwan/vmanage_auth.py +++ b/catalystwan/vmanage_auth.py @@ -71,7 +71,9 @@ class vManageAuth(AuthBase, AuthProtocol): 2. Get a cross-site request forgery prevention token, which is required for most POST operations. """ - def __init__(self, username: str, password: str, logger: Optional[logging.Logger] = None, verify: bool = False): + def __init__( + self, username: str, password: str, logger: Optional[logging.Logger] = None, verify: bool | str = False + ): self.username = username self.password = password self.xsrftoken: Optional[str] = None @@ -203,7 +205,7 @@ def __init__( password: str, subdomain: str, logger: Optional[logging.Logger] = None, - verify: bool = False, + verify: bool | str = False, ): super().__init__(username, password, logger, verify) self.subdomain = subdomain @@ -264,7 +266,7 @@ def create_vmanage_auth( password: str, subdomain: Optional[str] = None, logger: Optional[logging.Logger] = None, - verify: bool = False, + verify: bool | str = False, ) -> vManageAuth: if subdomain is not None: return vSessionAuth(username, password, subdomain, logger=logger, verify=verify)