Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions catalystwan/apigw_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ApiGwAuth(AuthBase, AuthProtocol):
2. Use the token in the Authorization header for subsequent requests.
"""

def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool = False):
def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool | str = False):
self.login = login
self.token = ""
self.org_registered: bool = False
Expand Down Expand Up @@ -86,7 +86,7 @@ def get_token(
base_url: str,
apigw_login: ApiGwLogin,
logger: Optional[logging.Logger] = None,
verify: bool = False,
verify: bool | str = False,
timeout: int = 10,
) -> str:
try:
Expand Down Expand Up @@ -118,7 +118,7 @@ def register_org(
base_url: str,
apigw_login: ApiGwLogin,
logger: Optional[logging.Logger] = None,
verify: bool = False,
verify: bool | str = False,
timeout: int = 10,
) -> None:
try:
Expand Down
3 changes: 1 addition & 2 deletions catalystwan/endpoints/api_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@ def configuration_reload(self) -> None:
...

@post("/apigw/client/registration")
def on_board_client(self, payload: OnBoardClient) -> None:
...
def on_board_client(self, payload: OnBoardClient) -> None: ...
14 changes: 9 additions & 5 deletions catalystwan/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class ManagerSession(ManagerResponseAdapter, APIEndpointClient):
subdomain: subdomain specifying to which view switch when creating provider as a tenant session,
works only on provider user mode
logger: override default module logger
verify: bool or str: bool to verify SSL certificate, or a path to a CA bundle

Attributes:
api: APIContainer: container for API methods
Expand All @@ -222,7 +223,7 @@ class ManagerSession(ManagerResponseAdapter, APIEndpointClient):
api_version: Version: API version
restart_timeout: int: restart timeout in seconds
session_type: SessionType: type of session
verify: bool: verify SSL certificate
verify: bool or str: bool to verify SSL certificate, or a path to a CA bundle

"""

Expand All @@ -233,17 +234,18 @@ def __init__(
subdomain: Optional[str] = None,
logger: Optional[logging.Logger] = None,
request_limiter: Optional[RequestLimiter] = None,
verify: bool | str = False,
) -> None:
self.base_url = base_url
self.subdomain = subdomain
self._session_type = SessionType.NOT_DEFINED
self.server_name: Optional[str] = None
self.logger = logger or logging.getLogger(__name__)
self.response_trace: Callable[
[Optional[Response], Union[Request, PreparedRequest, None]], str
] = response_history_debug
self.response_trace: Callable[[Optional[Response], Union[Request, PreparedRequest, None]], str] = (
response_history_debug
)
super(ManagerSession, self).__init__()
self.verify = False
self.verify = verify
self.headers.update({"User-Agent": USER_AGENT})
self._added_to_auth = False
self._auth = auth
Expand Down Expand Up @@ -429,6 +431,8 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse:
_kwargs = dict(kwargs)
if self.request_timeout is not None: # do not modify user provided kwargs unless property is set
_kwargs.update(timeout=self.request_timeout)
if "verify" not in kwargs:
_kwargs.update(verify=self.verify)
try:
with self._limiter:
response = super(ManagerSession, self).request(method, full_url, *args, **_kwargs)
Expand Down
8 changes: 5 additions & 3 deletions catalystwan/vmanage_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class vManageAuth(AuthBase, AuthProtocol):
2. Get a cross-site request forgery prevention token, which is required for most POST operations.
"""

def __init__(self, username: str, password: str, logger: Optional[logging.Logger] = None, verify: bool = False):
def __init__(
self, username: str, password: str, logger: Optional[logging.Logger] = None, verify: bool | str = False
):
self.username = username
self.password = password
self.xsrftoken: Optional[str] = None
Expand Down Expand Up @@ -203,7 +205,7 @@ def __init__(
password: str,
subdomain: str,
logger: Optional[logging.Logger] = None,
verify: bool = False,
verify: bool | str = False,
):
super().__init__(username, password, logger, verify)
self.subdomain = subdomain
Expand Down Expand Up @@ -264,7 +266,7 @@ def create_vmanage_auth(
password: str,
subdomain: Optional[str] = None,
logger: Optional[logging.Logger] = None,
verify: bool = False,
verify: bool | str = False,
) -> vManageAuth:
if subdomain is not None:
return vSessionAuth(username, password, subdomain, logger=logger, verify=verify)
Expand Down