Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions chats/apps/api/authentication/classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from chats.apps.api.authentication.token import JWTTokenGenerator
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed


class JWTAuthentication(BaseAuthentication):
"""
Authentication class for the JWT token.
"""

def authenticate(self, request):
"""
Authenticate the request using the JWT token.
"""
token = request.headers.get("Authorization")

if not token or not token.startswith("Token "):
raise AuthenticationFailed("No authentication token provided.")

try:
token = token.split(" ")[1]
except IndexError:
raise AuthenticationFailed("Invalid authentication token.")

return self.authenticate_credentials(token)

def authenticate_credentials(self, token):
"""
Authenticate the credentials using the JWT token.
"""
token_generator = JWTTokenGenerator()
try:
payload = token_generator.verify_token(token)
return (None, payload)
except Exception as e:
raise AuthenticationFailed(f"Invalid authentication token: {str(e)}")

def authenticate_header(self, request):
return "Token"
11 changes: 11 additions & 0 deletions chats/apps/api/authentication/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from rest_framework.permissions import BasePermission


class JWTRequiredPermission(BasePermission):
"""
Custom permission class that requires JWT authentication.
Returns 401 Unauthorized when authentication fails.
"""

def has_permission(self, request, view):
return request.auth is not None
155 changes: 155 additions & 0 deletions chats/apps/api/authentication/tests/test_classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from datetime import timedelta
import uuid
import jwt

from django.utils import timezone
from django.test import TestCase
from django.http import HttpRequest
from django.test import override_settings
from rest_framework.views import APIView
from rest_framework.test import APITestCase, APIClient
from rest_framework.response import Response
from rest_framework import status

from chats.apps.api.authentication.classes import JWTAuthentication
from chats.apps.api.authentication.token import JWTTokenGenerator


from chats.apps.api.authentication.permissions import JWTRequiredPermission


class JWTAuthenticationTests(TestCase):
def setUp(self):
self.token_generator = JWTTokenGenerator()
self.valid_token = self.token_generator.generate_token(
{"user_id": 1, "username": "testuser"}
)

def test_authenticate(self):
authentication = JWTAuthentication()
request = HttpRequest()
request.META["HTTP_AUTHORIZATION"] = f"Token {self.valid_token}"
result = authentication.authenticate(request)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
self.assertIsNone(result[0])
self.assertIsNotNone(result[1])

def test_authenticate_credentials(self):
authentication = JWTAuthentication()
result = authentication.authenticate_credentials(self.valid_token)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
self.assertIsNone(result[0])
self.assertIsNotNone(result[1])


class MockView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [JWTRequiredPermission]

def get(self, request):
"""Test endpoint that returns the authenticated data from the token."""

return Response(
{
"authenticated": request.auth is not None,
"token_data": request.auth if request.auth else None,
}
)


@override_settings(ROOT_URLCONF="chats.apps.api.authentication.tests.test_urls")
class MockViewAPITestCase(APITestCase):
"""Test cases for MockView using JWTAuthentication."""

def setUp(self):
self.client = APIClient()
self.token_generator = JWTTokenGenerator()
self.test_payload = {
"room": str(uuid.uuid4()),
}
self.valid_token = self.token_generator.generate_token(self.test_payload)

def test_authenticated_request_with_valid_token(self):
"""Test that a request with a valid JWT token is authenticated and token data is available."""
response = self.client.get(
"/mock/", HTTP_AUTHORIZATION=f"Token {self.valid_token}"
)

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(response.data["authenticated"])
self.assertIsNotNone(response.data["token_data"])

# Verify that the token data contains the original payload
token_data = response.data["token_data"]
self.assertEqual(token_data["room"], self.test_payload["room"])

# Verify JWT standard claims are present
self.assertIn("iat", token_data)
self.assertIn("exp", token_data)
self.assertIn("nbf", token_data)

def test_unauthenticated_request_without_token(self):
"""Test that a request without a token is not authenticated."""
response = self.client.get("/mock/")

self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_unauthenticated_request_with_invalid_token(self):
"""Test that a request with an invalid token is not authenticated."""
invalid_token = "invalid.jwt.token"
response = self.client.get(
"/mock/", HTTP_AUTHORIZATION=f"Token {invalid_token}"
)

self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_unauthenticated_request_with_malformed_token(self):
"""Test that a request with a malformed token is not authenticated."""
malformed_token = "Bearer invalid.jwt.token"
response = self.client.get(
"/mock/", HTTP_AUTHORIZATION=f"Token {malformed_token}"
)

self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_unauthenticated_request_with_expired_token(self):
"""Test that a request with an expired token is not authenticated."""
expired_payload = {
**self.test_payload,
"iat": int((timezone.now() - timedelta(hours=25)).timestamp()),
"exp": int((timezone.now() - timedelta(hours=1)).timestamp()),
"nbf": int((timezone.now() - timedelta(hours=25)).timestamp()),
}

expired_token = jwt.encode(
expired_payload,
self.token_generator.secret_key,
algorithm=self.token_generator.algorithm,
)

response = self.client.get(
"/mock/", HTTP_AUTHORIZATION=f"Token {expired_token}"
)

self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_token_data_structure(self):
"""Test that the token data structure is correct and complete."""
response = self.client.get(
"/mock/", HTTP_AUTHORIZATION=f"Token {self.valid_token}"
)

self.assertEqual(response.status_code, status.HTTP_200_OK)
token_data = response.data["token_data"]

for key, value in self.test_payload.items():
self.assertEqual(token_data[key], value)

self.assertIsInstance(token_data["iat"], int)
self.assertIsInstance(token_data["exp"], int)
self.assertIsInstance(token_data["nbf"], int)

current_time = int(timezone.now().timestamp())
self.assertGreater(token_data["exp"], current_time)
6 changes: 6 additions & 0 deletions chats/apps/api/authentication/tests/test_urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.urls import path
from chats.apps.api.authentication.tests.test_classes import MockView

urlpatterns = [
path("mock/", MockView.as_view(), name="mock_view"),
]