diff --git a/src/browsergym/workarena/api/utils.py b/src/browsergym/workarena/api/utils.py index 47a6b2a..69b8609 100644 --- a/src/browsergym/workarena/api/utils.py +++ b/src/browsergym/workarena/api/utils.py @@ -2,6 +2,12 @@ from ..instance import SNowInstance + +def table_api_call(instance: SNowInstance, *args, **kwargs) -> dict: + """Wrapper around SNowInstance.table_api_call for backwards compatibility""" + return instance.table_api_call(*args, **kwargs) + + from requests.exceptions import HTTPError from time import sleep @@ -9,101 +15,6 @@ SNOW_API_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} -def table_api_call( - instance: SNowInstance, - table: str, - data: dict = {}, - params: dict = {}, - json: dict = {}, - method: str = "GET", - wait_for_record: bool = False, - max_retries: int = 5, - raise_on_wait_expired: bool = True, -) -> dict: - """ - Make a call to the ServiceNow Table API - - Parameters: - ----------- - instance: SNowInstance - The ServiceNow instance to interact with - table: str - The name of the table to interact with - data: dict - The data to send with the request - params: dict - The parameters to pass to the API - json: dict - The JSON data to send with the request - method: str - The HTTP method to use (GET, POST, PUT, DELETE). - wait_for_record: bool - If True, will wait up to 2 seconds for the record to be present before returning - max_retries: int - The number of retries to attempt before failing - raise_on_wait_expired: bool - If True, will raise an exception if the record is not found after max_retries. - Otherwise, will return an empty result. - - Returns: - -------- - dict - The JSON response from the API - - """ - - # Query API - response = requests.request( - method=method, - url=instance.snow_url + f"/api/now/table/{table}", - auth=instance.snow_credentials, - headers=SNOW_API_HEADERS, - data=data, - params=params, - json=json, - ) - if method == "POST": - sys_id = response.json()["result"]["sys_id"] - data = {} - params = {"sysparm_query": f"sys_id={sys_id}"} - - # Check for HTTP success code (fail otherwise) - response.raise_for_status() - - record_exists = False - num_retries = 0 - if method == "POST" or wait_for_record: - while not record_exists: - sleep(0.5) - get_response = table_api_call( - instance=instance, - table=table, - params=params, - json=json, - data=data, - method="GET", - ) - record_exists = len(get_response["result"]) > 0 - num_retries += 1 - if num_retries > max_retries: - if raise_on_wait_expired: - raise HTTPError(f"Record not found after {max_retries} retries") - else: - return {"result": []} - if method == "GET": - response = get_response - - if method != "DELETE": - # Decode the JSON response into a dictionary if necessary - # When using wait_for_record=True, the response is already a dict as it is a recursive call - if type(response) == dict: - return response - else: - return response.json() - else: - return response - - def table_column_info(instance: SNowInstance, table: str) -> dict: """ Get the column information for a ServiceNow table diff --git a/src/browsergym/workarena/install.py b/src/browsergym/workarena/install.py index 9d82493..cbd8e5f 100644 --- a/src/browsergym/workarena/install.py +++ b/src/browsergym/workarena/install.py @@ -56,7 +56,7 @@ def _set_sys_property(property_name: str, value: str): Set a sys_property in the instance. """ - instance = SNowInstance() + instance = SNowInstance(check_installed=False) property = table_api_call( instance=instance, @@ -82,20 +82,7 @@ def _set_sys_property(property_name: str, value: str): assert property["result"]["value"] == value, f"Error setting {property_name}." -def _get_sys_property(property_name: str) -> str: - """ - Get a sys_property from the instance. - - """ - instance = SNowInstance() - - property_value = table_api_call( - instance=instance, - table="sys_properties", - params={"sysparm_query": f"name={property_name}", "sysparm_fields": "value"}, - )["result"][0]["value"] - - return property_value +# Remove the _get_sys_property function as it's now in instance.py def _install_update_set(path: str, name: str): @@ -113,7 +100,7 @@ def _install_update_set(path: str, name: str): """ with sync_playwright() as playwright: - instance = SNowInstance() + instance = SNowInstance(check_installed=False) browser = playwright.chromium.launch(headless=True, slow_mo=1000) page = browser.new_page() url_login(instance, page) @@ -352,7 +339,7 @@ def setup_knowledge_bases(): """ # Get the ServiceNow instance - instance = SNowInstance() + instance = SNowInstance(check_installed=False) # Mapping between knowledge base name and filepath + whether or not to disable comments + whether or not to add article name knowledge_bases = { KB_NAME: (KB_FILEPATH, True, False), @@ -419,7 +406,7 @@ def check_workflows_installed(): """ expected_workflow_names = [x["name"] for x in WORKFLOWS.values()] workflows = table_api_call( - instance=SNowInstance(), + instance=SNowInstance(check_installed=False), table="wf_workflow", params={ "sysparm_query": "nameIN" + ",".join(expected_workflow_names), @@ -611,9 +598,9 @@ def setup_list_columns(): } logging.info("... Creating a new user account to validate list columns") - admin_instance = SNowInstance() + admin_instance = SNowInstance(check_installed=False) username, password, usysid = create_user(instance=admin_instance) - user_instance = SNowInstance(snow_credentials=(username, password)) + user_instance = SNowInstance(check_installed=False, snow_credentials=(username, password)) for task, task_info in list_mappings.items(): expected_columns_path = task_info["expected_columns_path"] @@ -717,9 +704,9 @@ def setup_form_fields(): } logging.info("... Creating a new user account to validate form fields") - admin_instance = SNowInstance() + admin_instance = SNowInstance(check_installed=False) username, password, usysid = create_user(instance=admin_instance) - user_instance = SNowInstance(snow_credentials=(username, password)) + user_instance = SNowInstance(check_installed=False, snow_credentials=(username, password)) for task, task_info in task_mapping.items(): expected_fields_path = task_info["expected_fields_path"] @@ -779,7 +766,7 @@ def check_instance_release_support(): bool: True if the version is supported, False otherwise. """ - instance = SNowInstance() + instance = SNowInstance(check_installed=False) version_info = instance.release_version if version_info["build name"] not in SNOW_SUPPORTED_RELEASES: logging.error( @@ -816,7 +803,11 @@ def disable_welcome_help_popup(): Disable the welcome help popup """ - set_user_preference(instance=SNowInstance(), key="overview_help.visited.navui", value="true") + set_user_preference( + instance=SNowInstance(check_installed=False), + key="overview_help.visited.navui", + value="true", + ) logging.info("Welcome help popup disabled.") @@ -841,24 +832,24 @@ def setup_ui_themes(): logging.info("Setting default UI theme") _set_sys_property( property_name="glide.ui.polaris.theme.custom", - value=get_workarena_theme_variants(SNowInstance())[0]["theme.sys_id"], + value=get_workarena_theme_variants(SNowInstance(check_installed=False))[0]["theme.sys_id"], ) # Set admin user's theme variant # ... get user's sysid admin_user = table_api_call( - instance=SNowInstance(), + instance=SNowInstance(check_installed=False), table="sys_user", params={"sysparm_query": "user_name=admin", "sysparm_fields": "sys_id"}, )["result"][0] # ... set user preference set_user_preference( - instance=SNowInstance(), + instance=SNowInstance(check_installed=False), user=admin_user["sys_id"], key="glide.ui.polaris.theme.variant", value=[ x["style.sys_id"] - for x in get_workarena_theme_variants(SNowInstance()) + for x in get_workarena_theme_variants(SNowInstance(check_installed=False)) if x["style.name"] == "Workarena" ][0], ) @@ -870,7 +861,7 @@ def check_ui_themes_installed(): """ expected_variants = set([v.lower() for v in UI_THEMES_UPDATE_SET["variants"]]) - installed_themes = get_workarena_theme_variants(SNowInstance()) + installed_themes = get_workarena_theme_variants(SNowInstance(check_installed=False)) installed_themes = set([t["style.name"].lower() for t in installed_themes]) assert ( @@ -893,7 +884,7 @@ def wipe_system_admin_preferences(): """ logging.info("Wiping all system admin preferences") sys_admin_prefs = table_api_call( - instance=SNowInstance(), + instance=SNowInstance(check_installed=False), table="sys_user_preference", params={"sysparm_query": "user.user_name=admin", "sysparm_fields": "sys_id,name"}, )["result"] @@ -903,7 +894,9 @@ def wipe_system_admin_preferences(): for pref in sys_admin_prefs: logging.info(f"...... deleting {pref['name']}") table_api_call( - instance=SNowInstance(), table=f"sys_user_preference/{pref['sys_id']}", method="DELETE" + instance=SNowInstance(check_installed=False), + table=f"sys_user_preference/{pref['sys_id']}", + method="DELETE", ) @@ -926,7 +919,7 @@ def patch_report_filters(): """ logging.info("Patching reports with date filter...") - instance = SNowInstance() + instance = SNowInstance(check_installed=False) # Get all reports that are not already patched reports = table_api_call( @@ -1054,7 +1047,8 @@ def main(): logging.basicConfig(level=logging.INFO) try: - past_install_date = _get_sys_property("workarena.installation.date") + instance = SNowInstance(check_installed=False) + past_install_date = instance._get_sys_property("workarena.installation.date") logging.info(f"Detected previous installation on {past_install_date}. Reinstalling...") except: past_install_date = "never" @@ -1068,7 +1062,7 @@ def main(): ██ ███ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ███ ███ ██████ ██ ██ ██ ██ ██ ██ ██ ██ ███████ ██ ████ ██ ██ -Instance: {SNowInstance().snow_url} +Instance: {SNowInstance(check_installed=False).snow_url} Previous installation: {past_install_date} """ diff --git a/src/browsergym/workarena/instance.py b/src/browsergym/workarena/instance.py index db7023a..c635eaa 100644 --- a/src/browsergym/workarena/instance.py +++ b/src/browsergym/workarena/instance.py @@ -1,12 +1,16 @@ import os import requests import re - from playwright.sync_api import sync_playwright from typing import Optional +from requests.exceptions import HTTPError +from time import sleep from .config import SNOW_BROWSER_TIMEOUT +# ServiceNow API configuration +SNOW_API_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} + class SNowInstance: """ @@ -18,6 +22,7 @@ def __init__( self, snow_url: Optional[str] = None, snow_credentials: Optional[tuple[str, str]] = None, + check_installed: bool = True, ) -> None: """ Set up a ServiceNow instance API @@ -55,6 +60,96 @@ def __init__( self.snow_url = snow_url.rstrip("/") self.snow_credentials = snow_credentials self.check_status() + if check_installed: + self.check_is_installed() + + def table_api_call( + self, + table: str, + data: dict = {}, + params: dict = {}, + json: dict = {}, + method: str = "GET", + wait_for_record: bool = False, + max_retries: int = 5, + raise_on_wait_expired: bool = True, + ) -> dict: + """ + Make a call to the ServiceNow Table API + """ + # Query API + response = requests.request( + method=method, + url=self.snow_url + f"/api/now/table/{table}", + auth=self.snow_credentials, + headers=SNOW_API_HEADERS, + data=data, + params=params, + json=json, + ) + if method == "POST": + sys_id = response.json()["result"]["sys_id"] + data = {} + params = {"sysparm_query": f"sys_id={sys_id}"} + + # Check for HTTP success code (fail otherwise) + response.raise_for_status() + + record_exists = False + num_retries = 0 + if method == "POST" or wait_for_record: + while not record_exists: + sleep(0.5) + get_response = self.table_api_call( + table=table, + params=params, + json=json, + data=data, + method="GET", + ) + record_exists = len(get_response["result"]) > 0 + num_retries += 1 + if num_retries > max_retries: + if raise_on_wait_expired: + raise HTTPError(f"Record not found after {max_retries} retries") + else: + return {"result": []} + if method == "GET": + response = get_response + + if method != "DELETE": + if type(response) == dict: + return response + else: + return response.json() + else: + return response + + def _get_sys_property(self, property_name: str) -> str: + """ + Get a sys_property from the instance. + """ + property_value = self.table_api_call( + table="sys_properties", + params={"sysparm_query": f"name={property_name}", "sysparm_fields": "value"}, + )["result"][0]["value"] + + return property_value + + def check_is_installed(self): + """ + Check if the ServiceNow instance is installed. + Bascally, check if if the installation date is set. + """ + property_name = "workarena.installation.date" + try: + self._get_sys_property(property_name) + except Exception: + raise RuntimeError( + f"ServiceNow instance is most likey not installed. " + "Please install the WorkArena plugin by running `workarena-install`.\n" + "Alternatively, your credentials might not be correct. Please check them." + ) def check_status(self): """