diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d598e92..552c0ef 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,6 +36,15 @@ jobs: make install make test + - name: Set up Terraform + uses: hashicorp/setup-terraform@v3 + + - name: Run Lambda integration tests + run: | + pip install terraform-local + make tf-deploy + make test-lambda + - name: Print logs if: always() run: | diff --git a/Makefile b/Makefile index 70543fc..74ce70d 100644 --- a/Makefile +++ b/Makefile @@ -21,10 +21,10 @@ tf-deploy: ## Deploy the app locally via Terraform mkdir -p build/lambda cp -r app/lambda/* build/lambda/ - docker run -it --platform=linux/amd64 --rm --entrypoint= -v $(PWD)/build/lambda:/tmp/lambda public.ecr.aws/lambda/python:3.11 pip install --upgrade --target /tmp/lambda -r /tmp/lambda/requirements.txt + docker run --platform=linux/amd64 --rm --entrypoint= -v $(PWD)/build/lambda:/tmp/lambda public.ecr.aws/lambda/python:3.11 pip install --upgrade --target /tmp/lambda -r /tmp/lambda/requirements.txt ##### NOTE: SOMETIMES THE ARM64 VERSION WORKS, SOMETIMES THE AMD64 VERSION WORKS? ##### - #docker run -it --platform=linux/arm64/v8 --rm --entrypoint= -v $(PWD)/build/lambda:/tmp/lambda public.ecr.aws/lambda/python:3.11 pip install --upgrade --target /tmp/lambda -r /tmp/lambda/requirements.txt + #docker run --platform=linux/arm64/v8 --rm --entrypoint= -v $(PWD)/build/lambda:/tmp/lambda public.ecr.aws/lambda/python:3.11 pip install --upgrade --target /tmp/lambda -r /tmp/lambda/requirements.txt $(VENV_RUN); tflocal init; tflocal apply -auto-approve @@ -34,6 +34,10 @@ test-lambda: ## Run Lambda API tests test-extension: ## Run integration tests (requires LocalStack running with the Extension installed) $(VENV_RUN); pytest tests/test_extension.py -v -s +web-ui: ## Run the Web UI on localhost port 3000 + which serve || npm i -g serve + serve ./app/web + format: ## Run ruff to format the whole codebase $(VENV_RUN); python -m ruff format .; python -m ruff check --output-format=full --fix . diff --git a/README.md b/README.md index afc89de..b2c876c 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Sample app that demonstrates how to use TypeDB + LocalStack, to develop and test * LocalStack Pro (free trial available [here](https://app.localstack.cloud)) * `localstack` CLI * `terraform` CLI +* `node`/`npm` (for Web UI) ## Enable the TypeDB Extension @@ -16,7 +17,7 @@ To enable the TypeDB extension in LocalStack, use this command: $ localstack extensions install "git+https://github.com/whummer/localstack-utils.git#egg=localstack-typedb&subdirectory=localstack-typedb" ``` -## Start localstack +## Start LocalStack ``` $ DOCKER_FLAGS='-e TYPEDB_FLAGS=--development-mode.enabled=true' localstack start @@ -41,6 +42,13 @@ Once the app is deployed, we can run some HTTP requests against the local API Ga $ make requests ``` +## Run the Web UI + +The project ships with a simple Web application, which can be spun up on `http://localhost:3000` via this command: +``` +$ make web-ui +``` + ## License The code in this repo is available under the Apache 2.0 license. diff --git a/app/lambda/handler.py b/app/lambda/handler.py index 977bbb0..3ec2860 100644 --- a/app/lambda/handler.py +++ b/app/lambda/handler.py @@ -1,15 +1,19 @@ import json -import sys import time import logging -import os -from datetime import datetime -from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType, TransactionOptions +from typedb.driver import ( + TypeDB, + Credentials, + DriverOptions, + TransactionType, + TransactionOptions, +) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) + # Timing decorator for functions def log_execution_time(func_name): def decorator(func): @@ -24,7 +28,9 @@ def wrapper(*args, **kwargs): duration = (time.time() - start_time) * 1000 logger.debug(f"Failed {func_name} after {duration:.2f}ms: {str(e)}") raise + return wrapper + return decorator @@ -36,10 +42,12 @@ def wrapper(*args, **kwargs): _driver_created_at = None _driver_timeout = 300 # 5 minutes timeout for driver reuse + def _transaction_options(): """Get transaction options with configured timeout""" return TransactionOptions(transaction_timeout_millis=20_000) + def _cors_response(status_code, body): """Create a response with CORS headers""" return { @@ -47,20 +55,21 @@ def _cors_response(status_code, body): "headers": { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", - "Access-Control-Allow-Headers": "Content-Type, Authorization" + "Access-Control-Allow-Headers": "Content-Type, Authorization", }, - "body": json.dumps(body) if isinstance(body, (dict, list)) else str(body) + "body": json.dumps(body) if isinstance(body, (dict, list)) else str(body), } + def handler(event, context): handler_start = time.time() logger.debug(f"Lambda invoked with event: {json.dumps(event, default=str)}") - + _create_database_and_schema() - + method = event["httpMethod"] path = event.get("path", "") - + # Handle CORS preflight requests if method == "OPTIONS": response = _cors_response(200, "") @@ -69,11 +78,12 @@ def handler(event, context): response = handle_request(event, method, path) except Exception as e: response = _cors_response(400, {"error": str(e)}) - + handler_duration = (time.time() - handler_start) * 1000 logger.debug(f"โœ… Lambda handler completed in {handler_duration:.2f}ms") return response + def handle_request(event, method, path): # Route based on path and method if path == "/users": @@ -140,48 +150,55 @@ def handle_request(event, method, path): logger.debug(f"No route found for {method} request to {path}") return _cors_response(404, {"error": "Not found"}) + @log_execution_time("create_user") def create_user(payload: dict): - logger.debug(f"Creating user") + logger.debug("Creating user") # Validate required fields if "username" not in payload: raise ValueError("Username is required") if "email" not in payload or not payload["email"]: raise ValueError("At least one email is required") - + username = payload["username"] emails = payload["email"] - + # Ensure emails is a list if isinstance(emails, str): emails = [emails] - + # Optional fields profile_picture_uri = payload.get("profile_picture_uri", "") - + try: - with _driver().transaction(db_name, TransactionType.WRITE, _transaction_options()) as tx: + with _driver().transaction( + db_name, TransactionType.WRITE, _transaction_options() + ) as tx: # Create user with username query = f"insert $u isa user, has user-name '{username}'" - + # Add all emails for email in emails: query += f", has email '{email}'" - + # Add profile picture if provided - check if it's HTTP URL or S3 identifier if profile_picture_uri: if profile_picture_uri.startswith("http"): query += f", has profile-picture-url '{profile_picture_uri}'" else: query += f", has profile-picture-s3-uri '{profile_picture_uri}'" - + query += ";" - + tx.query(query).resolve() tx.commit() - - return {"message": "User created successfully", "username": username, "email": emails} - + + return { + "message": "User created successfully", + "username": username, + "email": emails, + } + except Exception as e: error_msg = str(e) if "DVL9" in error_msg and "key constraint violation" in error_msg: @@ -192,23 +209,25 @@ def create_user(payload: dict): @log_execution_time("create_group") def create_group(payload: dict): - logger.debug(f"Creating group") + logger.debug("Creating group") # Validate required fields if "group_name" not in payload: raise ValueError("Group name is required") - + group_name = payload["group_name"] - + try: - with _driver().transaction(db_name, TransactionType.WRITE, _transaction_options()) as tx: + with _driver().transaction( + db_name, TransactionType.WRITE, _transaction_options() + ) as tx: # Create group with group name query = f"insert $g isa group, has group-name '{group_name}';" - + tx.query(query).resolve() tx.commit() - + return {"message": "Group created successfully", "group_name": group_name} - + except Exception as e: error_msg = str(e) if "DVL9" in error_msg and "key constraint violation" in error_msg: @@ -220,130 +239,149 @@ def create_group(payload: dict): @log_execution_time("list_users") def list_users(): logger.debug("Listing users") - driver_start = time.time() - logger.debug("Listing users - opened driver") - - tx_start = time.time() - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: - + + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: query_start = time.time() - result = tx.query( - 'match $u isa user; ' - 'fetch {' - ' "username": $u.user-name, ' - ' "email": [$u.email], ' - ' "profile_picture_url": $u.profile-picture-url, ' - ' "profile_picture_s3_uri": $u.profile-picture-s3-uri' - '};' - ).resolve().as_concept_documents() - query_duration = (time.time() - query_start) * 1000 - + result = ( + tx.query( + "match $u isa user; " + "fetch {" + ' "username": $u.user-name, ' + ' "email": [$u.email], ' + ' "profile_picture_url": $u.profile-picture-url, ' + ' "profile_picture_s3_uri": $u.profile-picture-s3-uri' + "};" + ) + .resolve() + .as_concept_documents() + ) + query_duration = (time.time() - query_start) * 1000 # noqa + list_start = time.time() result = list(result) - list_duration = (time.time() - list_start) * 1000 - + list_duration = (time.time() - list_start) * 1000 # noqa + return result @log_execution_time("list_groups") def list_groups(): logger.debug("Listing groups") - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: - result = tx.query( - 'match $g isa group; ' - 'fetch {' - ' "group_name": $g.group-name' - '};' - ).resolve().as_concept_documents() + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: + result = ( + tx.query('match $g isa group; fetch { "group_name": $g.group-name};') + .resolve() + .as_concept_documents() + ) result = list(result) - + return result @log_execution_time("add_member_to_group") def add_member_to_group(group_name: str, payload: dict): - logger.debug(f"Adding member to group") + logger.debug("Adding member to group") # Validate required fields - either username or group_name must be provided if "username" not in payload and "group_name" not in payload: raise ValueError("Either 'username' or 'group_name' is required") - + if "username" in payload and "group_name" in payload: raise ValueError("Provide either 'username' or 'group_name', not both") - - with _driver().transaction(db_name, TransactionType.WRITE, _transaction_options()) as tx: - if "username" in payload: - # Adding a user to the group - username = payload["username"] - query = ( - f"match " - f" $member isa user, has user-name '{username}'; " - f" $group isa group, has group-name '{group_name}'; " - f"put " - f" $membership isa membership (container: $group, member: $member);" - ) - member_type = "user" - member_name = username - else: - # Adding a group to the group - member_group_name = payload["group_name"] - query = ( - f"match " - f" $member isa group, has group-name '{member_group_name}'; " - f" $group isa group, has group-name '{group_name}'; " - f"put " - f" $membership isa membership (container: $group, member: $member);" - ) - member_type = "group" - member_name = member_group_name - - tx.query(query).resolve() - tx.commit() - + + with _driver().transaction( + db_name, TransactionType.WRITE, _transaction_options() + ) as tx: + if "username" in payload: + # Adding a user to the group + username = payload["username"] + query = ( + f"match " + f" $member isa user, has user-name '{username}'; " + f" $group isa group, has group-name '{group_name}'; " + f"put " + f" $membership isa membership (container: $group, member: $member);" + ) + member_type = "user" + member_name = username + else: + # Adding a group to the group + member_group_name = payload["group_name"] + query = ( + f"match " + f" $member isa group, has group-name '{member_group_name}'; " + f" $group isa group, has group-name '{group_name}'; " + f"put " + f" $membership isa membership (container: $group, member: $member);" + ) + member_type = "group" + member_name = member_group_name + + tx.query(query).resolve() + tx.commit() + return { - "message": f"{member_type.capitalize()} added to group successfully", - "group_name": group_name, + "message": f"{member_type.capitalize()} added to group successfully", + "group_name": group_name, "member_type": member_type, - "member_name": member_name + "member_name": member_name, } + @log_execution_time("list_direct_group_members") def list_direct_group_members(group_name: str): logger.debug(f"Listing direct group members for {group_name}") - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: - result = tx.query( - f'match ' + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: + result = ( + tx.query( + f"match " f' $group isa group, has group-name "{group_name}"; ' - f' $membership isa membership (container: $group, member: $member); ' - f' $member isa! $member-type; ' - f'fetch {{' + f" $membership isa membership (container: $group, member: $member); " + f" $member isa! $member-type; " + f"fetch {{" f' "member_name": $member.name, ' f' "group_name": $group.group-name,' f' "member_type": $member-type' - f'}};' - ).resolve().as_concept_documents() - result = list(result) - + f"}};" + ) + .resolve() + .as_concept_documents() + ) + result = list(result) + return result + @log_execution_time("list_all_group_members") def list_all_group_members(group_name: str): logger.debug(f"Listing all group members for {group_name}") - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: # Use the group-members function from the schema to get all members recursively - result = tx.query( - f'match ' - f' $group isa group, has group-name "{group_name}"; ' - f' let $members in group-members($group); ' - f' $member isa! $member-type; ' - f'fetch {{' - f' "member_type": $member-type, ' - f' "member_name": $members.name, ' - f' "group_name": $group.group-name' - f'}};' - ).resolve().as_concept_documents() + result = ( + tx.query( + f"match " + f' $group isa group, has group-name "{group_name}"; ' + f" let $members in group-members($group); " + f" $member isa! $member-type; " + f"fetch {{" + f' "member_type": $member-type, ' + f' "member_name": $members.name, ' + f' "group_name": $group.group-name' + f"}};" + ) + .resolve() + .as_concept_documents() + ) result = list(result) - + return result @@ -351,23 +389,29 @@ def list_all_group_members(group_name: str): def list_principal_groups(principal_name: str, principal_type: str): """List direct groups for either a user or group principal""" logger.debug(f"Listing direct groups for {principal_name} of type {principal_type}") - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: if principal_type == "user": name_attr = "user-name" else: # group name_attr = "group-name" - - result = tx.query( - f'match ' - f' $principal isa {principal_type}, has {name_attr} "{principal_name}"; ' - f' membership (member: $principal, container: $group); ' - f' $group isa group; ' - f'fetch {{' - f' "group_name": $group.group-name' - f'}};' - ).resolve().as_concept_documents() + + result = ( + tx.query( + f"match " + f' $principal isa {principal_type}, has {name_attr} "{principal_name}"; ' + f" membership (member: $principal, container: $group); " + f" $group isa group; " + f"fetch {{" + f' "group_name": $group.group-name' + f"}};" + ) + .resolve() + .as_concept_documents() + ) result = list(result) - + return result @@ -375,23 +419,29 @@ def list_principal_groups(principal_name: str, principal_type: str): def list_all_principal_groups(principal_name: str, principal_type: str): """List all groups (transitive) for either a user or group principal""" logger.debug(f"Listing all groups for {principal_name} of type {principal_type}") - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: if principal_type == "user": name_attr = "user-name" else: # group name_attr = "group-name" - + # Use the get-groups function from the schema to get all groups transitively - result = tx.query( - f'match ' - f' $principal isa {principal_type}, has {name_attr} "{principal_name}"; ' - f' let $groups in get-groups($principal); ' - f'fetch {{' - f' "group_name": $groups.group-name' - f'}};' - ).resolve().as_concept_documents() + result = ( + tx.query( + f"match " + f' $principal isa {principal_type}, has {name_attr} "{principal_name}"; ' + f" let $groups in get-groups($principal); " + f"fetch {{" + f' "group_name": $groups.group-name' + f"}};" + ) + .resolve() + .as_concept_documents() + ) result = list(result) - + return result @@ -399,7 +449,7 @@ def list_all_principal_groups(principal_name: str, principal_type: str): def reset_database(): """Reset the database by deleting it and recreating it with schema""" logger.debug("Resetting database") - + driver = _driver() # Delete database if it exists if driver.databases.contains(db_name): @@ -407,7 +457,7 @@ def reset_database(): logger.debug(f"Database '{db_name}' deleted") _create_database_and_schema() - + return {"message": "Database reset successfully"} @@ -417,34 +467,44 @@ def _create_database_and_schema(): # Check if database exists, create only if it doesn't if db_name not in [db.name for db in driver.databases.all()]: driver.databases.create(db_name) - - entity_type_count = 0 + # Check if schema already exists by looking for user type schema_check_start = time.time() - with _driver().transaction(db_name, TransactionType.READ, _transaction_options()) as tx: + with _driver().transaction( + db_name, TransactionType.READ, _transaction_options() + ) as tx: check_start = time.time() - row = list(tx.query("match entity $t; reduce $count = count;").resolve().as_concept_rows())[0] - check_duration = (time.time() - check_start) * 1000 + row = list( + tx.query("match entity $t; reduce $count = count;") + .resolve() + .as_concept_rows() + )[0] + check_duration = (time.time() - check_start) * 1000 # noqa schema_check_duration = (time.time() - schema_check_start) * 1000 - logger.debug(f"๐Ÿ“‹ Schema check transaction completed in {schema_check_duration:.2f}ms") + logger.debug( + f"๐Ÿ“‹ Schema check transaction completed in {schema_check_duration:.2f}ms" + ) if row.get("count").get() == 0: logger.debug("Loading schema from file") schema_load_start = time.time() - with _driver().transaction(db_name, TransactionType.SCHEMA, _transaction_options()) as schema_tx: + with _driver().transaction( + db_name, TransactionType.SCHEMA, _transaction_options() + ) as schema_tx: # Load schema from file file_start = time.time() with open("schema.tql", "r") as f: schema_content = f.read() - file_duration = (time.time() - file_start) * 1000 - + file_duration = (time.time() - file_start) * 1000 # noqa + query_start = time.time() schema_tx.query(schema_content).resolve() - query_duration = (time.time() - query_start) * 1000 - + query_duration = (time.time() - query_start) * 1000 # noqa + commit_start = time.time() schema_tx.commit() - commit_duration = (time.time() - commit_start) * 1000 + commit_duration = (time.time() - commit_start) * 1000 # noqa + schema_load_duration = (time.time() - schema_load_start) * 1000 logger.debug(f" --> Schema loaded successfully in {schema_load_duration:.2f}ms") else: @@ -454,17 +514,20 @@ def _create_database_and_schema(): def _driver(): """Get or create a reusable TypeDB driver with connection pooling""" global _global_driver, _driver_created_at - + current_time = time.time() - - expired = _driver_created_at is not None and (current_time - _driver_created_at) > _driver_timeout + + expired = ( + _driver_created_at is not None + and (current_time - _driver_created_at) > _driver_timeout + ) # Check if we have a valid driver and it's not expired if _global_driver is not None and not expired: - logger.debug(f"โ™ป๏ธ Reusing existing driver") + logger.debug("โ™ป๏ธ Reusing existing driver") return _global_driver elif expired: _cleanup_driver() - + # Create new driver or existing one is expired driver_start = time.time() try: @@ -477,10 +540,12 @@ def _driver(): driver_duration = (time.time() - driver_start) * 1000 logger.debug(f"โœ… New driver created in {driver_duration:.2f}ms") return _global_driver - + except Exception as e: driver_duration = (time.time() - driver_start) * 1000 - logger.debug(f"โŒ Failed to create driver after {driver_duration:.2f}ms: {str(e)}") + logger.debug( + f"โŒ Failed to create driver after {driver_duration:.2f}ms: {str(e)}" + ) # Clean up on failure _global_driver = None _driver_created_at = None @@ -490,7 +555,7 @@ def _driver(): def _cleanup_driver(): """Clean up the global driver - useful for testing or forced cleanup""" global _global_driver, _driver_created_at - + if _global_driver is not None: logger.debug("๐Ÿงน Cleaning up global driver") try: diff --git a/index.html b/app/web/index.html similarity index 100% rename from index.html rename to app/web/index.html diff --git a/script.js b/app/web/script.js similarity index 100% rename from script.js rename to app/web/script.js diff --git a/style.css b/app/web/style.css similarity index 100% rename from style.css rename to app/web/style.css diff --git a/tests/test_lambda.py b/tests/test_lambda.py index 884efc0..9425250 100644 --- a/tests/test_lambda.py +++ b/tests/test_lambda.py @@ -1,7 +1,6 @@ import requests -import json import pytest -from time import sleep + class TestLambdaAPI: """Test the Lambda-based HTTP API endpoints""" @@ -9,67 +8,66 @@ class TestLambdaAPI: @pytest.fixture(autouse=True) def setup(self): """Setup for each test - ensure clean state""" - self.base_url = "http://users-api.execute-api.localhost.localstack.cloud:4566/test" + self.base_url = ( + "http://users-api.execute-api.localhost.localstack.cloud:4566/test" + ) self.users_endpoint = f"{self.base_url}/users" self.groups_endpoint = f"{self.base_url}/groups" self.reset_endpoint = f"{self.base_url}/reset" - + # Reset database before each test self.reset_database() - + def reset_database(self): """Reset the database to ensure clean test state""" # Resetting database... response = requests.post(self.reset_endpoint) assert response.status_code == 200 # Reset response processed - + def test_create_and_list_users(self): - """Test creating users and listing them """ + """Test creating users and listing them""" # Creating user alice... alice_data = { "username": "alice", "email": ["alice@example.com", "alice.work@company.com"], - "profile_picture_uri": "https://example.com/alice.jpg" + "profile_picture_uri": "https://example.com/alice.jpg", } response = requests.post( self.users_endpoint, - headers={'content-type': 'application/json'}, - json=alice_data + headers={"content-type": "application/json"}, + json=alice_data, ) # Response processed assert response.status_code == 201 assert response.json()["username"] == "alice" - + # Creating user bob... - bob_data = { - "username": "bob", - "email": "bob@example.com" - } + bob_data = {"username": "bob", "email": "bob@example.com"} response = requests.post( self.users_endpoint, - headers={'content-type': 'application/json'}, - json=bob_data + headers={"content-type": "application/json"}, + json=bob_data, ) # Response processed assert response.status_code == 201 assert response.json()["username"] == "bob" - + # Listing all users... response = requests.get(self.users_endpoint) # Response processed assert response.status_code == 200 - + users = response.json() assert len(users) == 2 - + # Check alice alice = next(u for u in users if u["username"] == "alice") assert "alice@example.com" in alice["email"] assert "alice.work@company.com" in alice["email"] assert alice["profile_picture_url"] == "https://example.com/alice.jpg" assert alice["profile_picture_s3_uri"] is None - + # Check bob bob = next(u for u in users if u["username"] == "bob") assert bob["email"] == ["bob@example.com"] @@ -78,24 +76,21 @@ def test_create_and_list_users(self): def test_duplicate_user_error(self): """Test that creating a duplicate user returns a proper error message""" - user_data = { - "username": "testuser", - "email": "test@example.com" - } - + user_data = {"username": "testuser", "email": "test@example.com"} + # Create user first time - should succeed response = requests.post( self.users_endpoint, - headers={'content-type': 'application/json'}, - json=user_data + headers={"content-type": "application/json"}, + json=user_data, ) assert response.status_code == 201 - + # Try to create same user again - should fail with proper error response = requests.post( self.users_endpoint, - headers={'content-type': 'application/json'}, - json=user_data + headers={"content-type": "application/json"}, + json=user_data, ) assert response.status_code == 400 error_response = response.json() @@ -108,28 +103,28 @@ def test_create_and_list_groups(self): group_data = {"group_name": "developers"} response = requests.post( self.groups_endpoint, - headers={'content-type': 'application/json'}, - json=group_data + headers={"content-type": "application/json"}, + json=group_data, ) # Response processed assert response.status_code == 201 assert response.json()["group_name"] == "developers" - + # Creating group managers... group_data = {"group_name": "managers"} response = requests.post( self.groups_endpoint, - headers={'content-type': 'application/json'}, - json=group_data + headers={"content-type": "application/json"}, + json=group_data, ) # Response processed assert response.status_code == 201 - + # Listing all groups... response = requests.get(self.groups_endpoint) # Response processed assert response.status_code == 200 - + groups = response.json() assert len(groups) == 2 group_names = [g["group_name"] for g in groups] @@ -139,20 +134,20 @@ def test_create_and_list_groups(self): def test_duplicate_group_error(self): """Test that creating a duplicate group returns a proper error message""" group_data = {"group_name": "testgroup"} - + # Create group first time - should succeed response = requests.post( self.groups_endpoint, - headers={'content-type': 'application/json'}, - json=group_data + headers={"content-type": "application/json"}, + json=group_data, ) assert response.status_code == 201 - + # Try to create same group again - should fail with proper error response = requests.post( self.groups_endpoint, - headers={'content-type': 'application/json'}, - json=group_data + headers={"content-type": "application/json"}, + json=group_data, ) assert response.status_code == 400 error_response = response.json() @@ -163,21 +158,29 @@ def test_add_users_to_group(self): """Test adding users to groups""" # First create a user and group user_data = {"username": "groupuser", "email": "groupuser@example.com"} - requests.post(self.users_endpoint, json=user_data, headers={'content-type': 'application/json'}) - + requests.post( + self.users_endpoint, + json=user_data, + headers={"content-type": "application/json"}, + ) + group_data = {"group_name": "testgroup"} - requests.post(self.groups_endpoint, json=group_data, headers={'content-type': 'application/json'}) - + requests.post( + self.groups_endpoint, + json=group_data, + headers={"content-type": "application/json"}, + ) + # Add user to group member_data = {"username": "groupuser"} response = requests.post( f"{self.base_url}/groups/testgroup/members", - headers={'content-type': 'application/json'}, - json=member_data + headers={"content-type": "application/json"}, + json=member_data, ) assert response.status_code == 201 assert "added to group successfully" in response.json()["message"] - + # List group members response = requests.get(f"{self.base_url}/groups/testgroup/members") assert response.status_code == 200 @@ -189,56 +192,68 @@ def test_add_users_to_group(self): def test_complex_workflow(self): """Test a more complex workflow: create users, groups, add members, check relationships""" # Complex Workflow Test - + # Create users users_data = [ {"username": "alice", "email": "alice@company.com"}, {"username": "bob", "email": "bob@company.com"}, - {"username": "charlie", "email": "charlie@company.com"} + {"username": "charlie", "email": "charlie@company.com"}, ] - + for user_data in users_data: - response = requests.post(self.users_endpoint, json=user_data, headers={'content-type': 'application/json'}) + response = requests.post( + self.users_endpoint, + json=user_data, + headers={"content-type": "application/json"}, + ) assert response.status_code == 201 # User created - + # Create groups groups_data = [ {"group_name": "developers"}, {"group_name": "managers"}, - {"group_name": "seniors"} + {"group_name": "seniors"}, ] - + for group_data in groups_data: - response = requests.post(self.groups_endpoint, json=group_data, headers={'content-type': 'application/json'}) + response = requests.post( + self.groups_endpoint, + json=group_data, + headers={"content-type": "application/json"}, + ) assert response.status_code == 201 # Group created - + # Add users to groups memberships = [ ("developers", "alice"), ("developers", "bob"), ("managers", "charlie"), - ("seniors", "alice") + ("seniors", "alice"), ] - + for group_name, username in memberships: member_data = {"username": username} response = requests.post( f"{self.base_url}/groups/{group_name}/members", json=member_data, - headers={'content-type': 'application/json'} + headers={"content-type": "application/json"}, ) assert response.status_code == 201 # User added to group - + # Verify memberships - for group_name, expected_members in [("developers", ["alice", "bob"]), ("managers", ["charlie"]), ("seniors", ["alice"])]: + for group_name, expected_members in [ + ("developers", ["alice", "bob"]), + ("managers", ["charlie"]), + ("seniors", ["alice"]), + ]: response = requests.get(f"{self.base_url}/groups/{group_name}/members") assert response.status_code == 200 members = response.json() member_names = [m["member_name"] for m in members] assert set(member_names) == set(expected_members) # Membership verified - + # Complex Workflow Test Complete