diff --git a/chats/apps/api/authentication/classes.py b/chats/apps/api/authentication/classes.py new file mode 100644 index 000000000..52734e8e9 --- /dev/null +++ b/chats/apps/api/authentication/classes.py @@ -0,0 +1,39 @@ +from chats.apps.api.authentication.token import JWTTokenGenerator +from rest_framework.authentication import BaseAuthentication +from rest_framework.exceptions import AuthenticationFailed + + +class JWTAuthentication(BaseAuthentication): + """ + Authentication class for the JWT token. + """ + + def authenticate(self, request): + """ + Authenticate the request using the JWT token. + """ + token = request.headers.get("Authorization") + + if not token or not token.startswith("Token "): + raise AuthenticationFailed("No authentication token provided.") + + try: + token = token.split(" ")[1] + except IndexError: + raise AuthenticationFailed("Invalid authentication token.") + + return self.authenticate_credentials(token) + + def authenticate_credentials(self, token): + """ + Authenticate the credentials using the JWT token. + """ + token_generator = JWTTokenGenerator() + try: + payload = token_generator.verify_token(token) + return (None, payload) + except Exception as e: + raise AuthenticationFailed(f"Invalid authentication token: {str(e)}") + + def authenticate_header(self, request): + return "Token" diff --git a/chats/apps/api/authentication/permissions.py b/chats/apps/api/authentication/permissions.py new file mode 100644 index 000000000..7dc7e6bb2 --- /dev/null +++ b/chats/apps/api/authentication/permissions.py @@ -0,0 +1,11 @@ +from rest_framework.permissions import BasePermission + + +class JWTRequiredPermission(BasePermission): + """ + Custom permission class that requires JWT authentication. + Returns 401 Unauthorized when authentication fails. + """ + + def has_permission(self, request, view): + return request.auth is not None diff --git a/chats/apps/api/authentication/tests/test_classes.py b/chats/apps/api/authentication/tests/test_classes.py new file mode 100644 index 000000000..95107b9a3 --- /dev/null +++ b/chats/apps/api/authentication/tests/test_classes.py @@ -0,0 +1,155 @@ +from datetime import timedelta +import uuid +import jwt + +from django.utils import timezone +from django.test import TestCase +from django.http import HttpRequest +from django.test import override_settings +from rest_framework.views import APIView +from rest_framework.test import APITestCase, APIClient +from rest_framework.response import Response +from rest_framework import status + +from chats.apps.api.authentication.classes import JWTAuthentication +from chats.apps.api.authentication.token import JWTTokenGenerator + + +from chats.apps.api.authentication.permissions import JWTRequiredPermission + + +class JWTAuthenticationTests(TestCase): + def setUp(self): + self.token_generator = JWTTokenGenerator() + self.valid_token = self.token_generator.generate_token( + {"user_id": 1, "username": "testuser"} + ) + + def test_authenticate(self): + authentication = JWTAuthentication() + request = HttpRequest() + request.META["HTTP_AUTHORIZATION"] = f"Token {self.valid_token}" + result = authentication.authenticate(request) + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + self.assertIsNone(result[0]) + self.assertIsNotNone(result[1]) + + def test_authenticate_credentials(self): + authentication = JWTAuthentication() + result = authentication.authenticate_credentials(self.valid_token) + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + self.assertIsNone(result[0]) + self.assertIsNotNone(result[1]) + + +class MockView(APIView): + authentication_classes = [JWTAuthentication] + permission_classes = [JWTRequiredPermission] + + def get(self, request): + """Test endpoint that returns the authenticated data from the token.""" + + return Response( + { + "authenticated": request.auth is not None, + "token_data": request.auth if request.auth else None, + } + ) + + +@override_settings(ROOT_URLCONF="chats.apps.api.authentication.tests.test_urls") +class MockViewAPITestCase(APITestCase): + """Test cases for MockView using JWTAuthentication.""" + + def setUp(self): + self.client = APIClient() + self.token_generator = JWTTokenGenerator() + self.test_payload = { + "room": str(uuid.uuid4()), + } + self.valid_token = self.token_generator.generate_token(self.test_payload) + + def test_authenticated_request_with_valid_token(self): + """Test that a request with a valid JWT token is authenticated and token data is available.""" + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {self.valid_token}" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertTrue(response.data["authenticated"]) + self.assertIsNotNone(response.data["token_data"]) + + # Verify that the token data contains the original payload + token_data = response.data["token_data"] + self.assertEqual(token_data["room"], self.test_payload["room"]) + + # Verify JWT standard claims are present + self.assertIn("iat", token_data) + self.assertIn("exp", token_data) + self.assertIn("nbf", token_data) + + def test_unauthenticated_request_without_token(self): + """Test that a request without a token is not authenticated.""" + response = self.client.get("/mock/") + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_unauthenticated_request_with_invalid_token(self): + """Test that a request with an invalid token is not authenticated.""" + invalid_token = "invalid.jwt.token" + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {invalid_token}" + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_unauthenticated_request_with_malformed_token(self): + """Test that a request with a malformed token is not authenticated.""" + malformed_token = "Bearer invalid.jwt.token" + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {malformed_token}" + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_unauthenticated_request_with_expired_token(self): + """Test that a request with an expired token is not authenticated.""" + expired_payload = { + **self.test_payload, + "iat": int((timezone.now() - timedelta(hours=25)).timestamp()), + "exp": int((timezone.now() - timedelta(hours=1)).timestamp()), + "nbf": int((timezone.now() - timedelta(hours=25)).timestamp()), + } + + expired_token = jwt.encode( + expired_payload, + self.token_generator.secret_key, + algorithm=self.token_generator.algorithm, + ) + + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {expired_token}" + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_token_data_structure(self): + """Test that the token data structure is correct and complete.""" + response = self.client.get( + "/mock/", HTTP_AUTHORIZATION=f"Token {self.valid_token}" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + token_data = response.data["token_data"] + + for key, value in self.test_payload.items(): + self.assertEqual(token_data[key], value) + + self.assertIsInstance(token_data["iat"], int) + self.assertIsInstance(token_data["exp"], int) + self.assertIsInstance(token_data["nbf"], int) + + current_time = int(timezone.now().timestamp()) + self.assertGreater(token_data["exp"], current_time) diff --git a/chats/apps/api/authentication/tests/test_urls.py b/chats/apps/api/authentication/tests/test_urls.py new file mode 100644 index 000000000..57fa3fcc6 --- /dev/null +++ b/chats/apps/api/authentication/tests/test_urls.py @@ -0,0 +1,6 @@ +from django.urls import path +from chats.apps.api.authentication.tests.test_classes import MockView + +urlpatterns = [ + path("mock/", MockView.as_view(), name="mock_view"), +]