From b51e5380f8d79830412abdd85c2c136e0ef35e77 Mon Sep 17 00:00:00 2001 From: Kallil Souza dos Santos Date: Tue, 30 Sep 2025 17:35:51 -0300 Subject: [PATCH 1/2] feat: implement JWT authentication and permissions with corresponding tests --- chats/apps/api/authentication/classes.py | 34 ++++ chats/apps/api/authentication/permissions.py | 11 ++ .../api/authentication/tests/test_classes.py | 145 ++++++++++++++++++ .../api/authentication/tests/test_urls.py | 6 + 4 files changed, 196 insertions(+) create mode 100644 chats/apps/api/authentication/classes.py create mode 100644 chats/apps/api/authentication/permissions.py create mode 100644 chats/apps/api/authentication/tests/test_classes.py create mode 100644 chats/apps/api/authentication/tests/test_urls.py diff --git a/chats/apps/api/authentication/classes.py b/chats/apps/api/authentication/classes.py new file mode 100644 index 000000000..cdb709b72 --- /dev/null +++ b/chats/apps/api/authentication/classes.py @@ -0,0 +1,34 @@ +from chats.apps.api.authentication.token import JWTTokenGenerator +from rest_framework.authentication import BaseAuthentication +from rest_framework.exceptions import AuthenticationFailed + + +class JWTAuthentication(BaseAuthentication): + """ + Authentication class for the JWT token. + """ + + def authenticate(self, request): + """ + Authenticate the request using the JWT token. + """ + token = request.headers.get("Authorization") + + if not token: + raise AuthenticationFailed("No authentication token provided.") + + return self.authenticate_credentials(token) + + def authenticate_credentials(self, token): + """ + Authenticate the credentials using the JWT token. + """ + token_generator = JWTTokenGenerator() + try: + payload = token_generator.verify_token(token) + return (None, payload) + except Exception as e: + raise AuthenticationFailed(f"Invalid authentication token: {str(e)}") + + def authenticate_header(self, request): + return "Token" diff --git a/chats/apps/api/authentication/permissions.py b/chats/apps/api/authentication/permissions.py new file mode 100644 index 000000000..7dc7e6bb2 --- /dev/null +++ b/chats/apps/api/authentication/permissions.py @@ -0,0 +1,11 @@ +from rest_framework.permissions import BasePermission + + +class JWTRequiredPermission(BasePermission): + """ + Custom permission class that requires JWT authentication. + Returns 401 Unauthorized when authentication fails. + """ + + def has_permission(self, request, view): + return request.auth is not None diff --git a/chats/apps/api/authentication/tests/test_classes.py b/chats/apps/api/authentication/tests/test_classes.py new file mode 100644 index 000000000..d04c489c7 --- /dev/null +++ b/chats/apps/api/authentication/tests/test_classes.py @@ -0,0 +1,145 @@ +from datetime import timedelta +import uuid +import jwt + +from django.utils import timezone +from django.test import TestCase +from django.http import HttpRequest +from django.test import override_settings +from rest_framework.views import APIView +from rest_framework.test import APITestCase, APIClient +from rest_framework.response import Response +from rest_framework import status + +from chats.apps.api.authentication.classes import JWTAuthentication +from chats.apps.api.authentication.token import JWTTokenGenerator + + +from chats.apps.api.authentication.permissions import JWTRequiredPermission + + +class JWTAuthenticationTests(TestCase): + def setUp(self): + self.token_generator = JWTTokenGenerator() + self.valid_token = self.token_generator.generate_token( + {"user_id": 1, "username": "testuser"} + ) + + def test_authenticate(self): + authentication = JWTAuthentication() + request = HttpRequest() + request.META["HTTP_AUTHORIZATION"] = self.valid_token + result = authentication.authenticate(request) + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + self.assertIsNone(result[0]) + self.assertIsNotNone(result[1]) + + def test_authenticate_credentials(self): + authentication = JWTAuthentication() + result = authentication.authenticate_credentials(self.valid_token) + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + self.assertIsNone(result[0]) + self.assertIsNotNone(result[1]) + + +class MockView(APIView): + authentication_classes = [JWTAuthentication] + permission_classes = [JWTRequiredPermission] + + def get(self, request): + """Test endpoint that returns the authenticated data from the token.""" + + return Response( + { + "authenticated": request.auth is not None, + "token_data": request.auth if request.auth else None, + } + ) + + +@override_settings(ROOT_URLCONF="chats.apps.api.authentication.tests.test_urls") +class MockViewAPITestCase(APITestCase): + """Test cases for MockView using JWTAuthentication.""" + + def setUp(self): + self.client = APIClient() + self.token_generator = JWTTokenGenerator() + self.test_payload = { + "room": str(uuid.uuid4()), + } + self.valid_token = self.token_generator.generate_token(self.test_payload) + + def test_authenticated_request_with_valid_token(self): + """Test that a request with a valid JWT token is authenticated and token data is available.""" + response = self.client.get("/mock/", HTTP_AUTHORIZATION=self.valid_token) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertTrue(response.data["authenticated"]) + self.assertIsNotNone(response.data["token_data"]) + + # Verify that the token data contains the original payload + token_data = response.data["token_data"] + self.assertEqual(token_data["room"], self.test_payload["room"]) + + # Verify JWT standard claims are present + self.assertIn("iat", token_data) + self.assertIn("exp", token_data) + self.assertIn("nbf", token_data) + + def test_unauthenticated_request_without_token(self): + """Test that a request without a token is not authenticated.""" + response = self.client.get("/mock/") + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_unauthenticated_request_with_invalid_token(self): + """Test that a request with an invalid token is not authenticated.""" + invalid_token = "invalid.jwt.token" + response = self.client.get("/mock/", HTTP_AUTHORIZATION=invalid_token) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_unauthenticated_request_with_malformed_token(self): + """Test that a request with a malformed token is not authenticated.""" + malformed_token = "Bearer invalid.jwt.token" + response = self.client.get("/mock/", HTTP_AUTHORIZATION=malformed_token) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_unauthenticated_request_with_expired_token(self): + """Test that a request with an expired token is not authenticated.""" + expired_payload = { + **self.test_payload, + "iat": int((timezone.now() - timedelta(hours=25)).timestamp()), + "exp": int((timezone.now() - timedelta(hours=1)).timestamp()), + "nbf": int((timezone.now() - timedelta(hours=25)).timestamp()), + } + + expired_token = jwt.encode( + expired_payload, + self.token_generator.secret_key, + algorithm=self.token_generator.algorithm, + ) + + response = self.client.get("/mock/", HTTP_AUTHORIZATION=expired_token) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_token_data_structure(self): + """Test that the token data structure is correct and complete.""" + response = self.client.get("/mock/", HTTP_AUTHORIZATION=self.valid_token) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + token_data = response.data["token_data"] + + for key, value in self.test_payload.items(): + self.assertEqual(token_data[key], value) + + self.assertIsInstance(token_data["iat"], int) + self.assertIsInstance(token_data["exp"], int) + self.assertIsInstance(token_data["nbf"], int) + + current_time = int(timezone.now().timestamp()) + self.assertGreater(token_data["exp"], current_time) diff --git a/chats/apps/api/authentication/tests/test_urls.py b/chats/apps/api/authentication/tests/test_urls.py new file mode 100644 index 000000000..57fa3fcc6 --- /dev/null +++ b/chats/apps/api/authentication/tests/test_urls.py @@ -0,0 +1,6 @@ +from django.urls import path +from chats.apps.api.authentication.tests.test_classes import MockView + +urlpatterns = [ + path("mock/", MockView.as_view(), name="mock_view"), +] From bc9f539df808f1fed2a3767713b9265ffbf481fe Mon Sep 17 00:00:00 2001 From: Kallil Souza dos Santos Date: Wed, 1 Oct 2025 11:30:02 -0300 Subject: [PATCH 2/2] fix: enhance JWT token validation by ensuring proper token format and handling errors in authentication --- chats/apps/api/authentication/classes.py | 7 +++++- .../api/authentication/tests/test_classes.py | 22 ++++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/chats/apps/api/authentication/classes.py b/chats/apps/api/authentication/classes.py index cdb709b72..52734e8e9 100644 --- a/chats/apps/api/authentication/classes.py +++ b/chats/apps/api/authentication/classes.py @@ -14,9 +14,14 @@ def authenticate(self, request): """ token = request.headers.get("Authorization") - if not token: + if not token or not token.startswith("Token "): raise AuthenticationFailed("No authentication token provided.") + try: + token = token.split(" ")[1] + except IndexError: + raise AuthenticationFailed("Invalid authentication token.") + return self.authenticate_credentials(token) def authenticate_credentials(self, token): diff --git a/chats/apps/api/authentication/tests/test_classes.py b/chats/apps/api/authentication/tests/test_classes.py index d04c489c7..95107b9a3 100644 --- a/chats/apps/api/authentication/tests/test_classes.py +++ b/chats/apps/api/authentication/tests/test_classes.py @@ -28,7 +28,7 @@ def setUp(self): def test_authenticate(self): authentication = JWTAuthentication() request = HttpRequest() - request.META["HTTP_AUTHORIZATION"] = self.valid_token + request.META["HTTP_AUTHORIZATION"] = f"Token {self.valid_token}" result = authentication.authenticate(request) self.assertIsNotNone(result) self.assertEqual(len(result), 2) @@ -73,7 +73,9 @@ def setUp(self): def test_authenticated_request_with_valid_token(self): """Test that a request with a valid JWT token is authenticated and token data is available.""" - response = self.client.get("/mock/", HTTP_AUTHORIZATION=self.valid_token) + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {self.valid_token}" + ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertTrue(response.data["authenticated"]) @@ -97,14 +99,18 @@ def test_unauthenticated_request_without_token(self): def test_unauthenticated_request_with_invalid_token(self): """Test that a request with an invalid token is not authenticated.""" invalid_token = "invalid.jwt.token" - response = self.client.get("/mock/", HTTP_AUTHORIZATION=invalid_token) + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {invalid_token}" + ) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_unauthenticated_request_with_malformed_token(self): """Test that a request with a malformed token is not authenticated.""" malformed_token = "Bearer invalid.jwt.token" - response = self.client.get("/mock/", HTTP_AUTHORIZATION=malformed_token) + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {malformed_token}" + ) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) @@ -123,13 +129,17 @@ def test_unauthenticated_request_with_expired_token(self): algorithm=self.token_generator.algorithm, ) - response = self.client.get("/mock/", HTTP_AUTHORIZATION=expired_token) + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {expired_token}" + ) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_token_data_structure(self): """Test that the token data structure is correct and complete.""" - response = self.client.get("/mock/", HTTP_AUTHORIZATION=self.valid_token) + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {self.valid_token}" + ) self.assertEqual(response.status_code, status.HTTP_200_OK) token_data = response.data["token_data"]