Phase 4: Authentication System (T039-T048)
Implemented complete JWT-based authentication system with RBAC: **Tests (TDD Approach):** - Created contract tests for /api/v1/auth/login endpoint - Created contract tests for /api/v1/auth/logout endpoint - Created unit tests for AuthService (login, logout, validate_token, password hashing) - Created pytest configuration and fixtures (test DB, test users, tokens) **Schemas:** - LoginRequest: username/password validation - TokenResponse: access_token, refresh_token, user info - LogoutResponse: logout confirmation - RefreshTokenRequest: token refresh payload - UserInfo: user data (excludes password_hash) **Services:** - AuthService: login(), logout(), validate_token(), hash_password(), verify_password() - Integrated bcrypt password hashing - JWT token generation (access + refresh tokens) - Token blacklisting in Redis - Audit logging for all auth operations **Middleware:** - Authentication middleware with JWT validation - Role-based access control (RBAC) helpers - require_role() dependency factory - Convenience dependencies: require_viewer(), require_operator(), require_administrator() - Client IP and User-Agent extraction **Router:** - POST /api/v1/auth/login - Authenticate and get tokens - POST /api/v1/auth/logout - Blacklist token - POST /api/v1/auth/refresh - Refresh access token - GET /api/v1/auth/me - Get current user info **Integration:** - Registered auth router in main.py - Updated startup event to initialize Redis and SDK Bridge clients - Updated shutdown event to cleanup connections properly - Fixed error translation utilities - Added asyncpg dependency for PostgreSQL async driver 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
266
src/api/tests/test_auth_service.py
Normal file
266
src/api/tests/test_auth_service.py
Normal file
@@ -0,0 +1,266 @@
|
||||
"""
|
||||
Unit tests for AuthService
|
||||
These tests will FAIL until AuthService is implemented
|
||||
"""
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
import uuid
|
||||
|
||||
from services.auth_service import AuthService
|
||||
from models.user import User, UserRole
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAuthServiceLogin:
|
||||
"""Unit tests for AuthService.login()"""
|
||||
|
||||
async def test_login_success(self, test_db_session, test_admin_user):
|
||||
"""Test successful login with valid credentials"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.login("admin", "admin123", ip_address="127.0.0.1")
|
||||
|
||||
assert result is not None
|
||||
assert "access_token" in result
|
||||
assert "refresh_token" in result
|
||||
assert "token_type" in result
|
||||
assert result["token_type"] == "bearer"
|
||||
assert "expires_in" in result
|
||||
assert "user" in result
|
||||
assert result["user"]["username"] == "admin"
|
||||
assert result["user"]["role"] == "administrator"
|
||||
|
||||
async def test_login_invalid_username(self, test_db_session):
|
||||
"""Test login with non-existent username"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.login("nonexistent", "somepassword", ip_address="127.0.0.1")
|
||||
|
||||
assert result is None
|
||||
|
||||
async def test_login_invalid_password(self, test_db_session, test_admin_user):
|
||||
"""Test login with incorrect password"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.login("admin", "wrongpassword", ip_address="127.0.0.1")
|
||||
|
||||
assert result is None
|
||||
|
||||
async def test_login_operator(self, test_db_session, test_operator_user):
|
||||
"""Test successful login for operator role"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.login("operator", "operator123", ip_address="127.0.0.1")
|
||||
|
||||
assert result is not None
|
||||
assert result["user"]["role"] == "operator"
|
||||
|
||||
async def test_login_viewer(self, test_db_session, test_viewer_user):
|
||||
"""Test successful login for viewer role"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.login("viewer", "viewer123", ip_address="127.0.0.1")
|
||||
|
||||
assert result is not None
|
||||
assert result["user"]["role"] == "viewer"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAuthServiceLogout:
|
||||
"""Unit tests for AuthService.logout()"""
|
||||
|
||||
async def test_logout_success(self, test_db_session, test_admin_user, auth_token):
|
||||
"""Test successful logout"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
# Logout should add token to blacklist
|
||||
result = await auth_service.logout(auth_token, ip_address="127.0.0.1")
|
||||
|
||||
assert result is True
|
||||
|
||||
async def test_logout_invalid_token(self, test_db_session):
|
||||
"""Test logout with invalid token"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.logout("invalid_token", ip_address="127.0.0.1")
|
||||
|
||||
assert result is False
|
||||
|
||||
async def test_logout_expired_token(self, test_db_session, expired_token):
|
||||
"""Test logout with expired token"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
result = await auth_service.logout(expired_token, ip_address="127.0.0.1")
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAuthServiceValidateToken:
|
||||
"""Unit tests for AuthService.validate_token()"""
|
||||
|
||||
async def test_validate_token_success(self, test_db_session, test_admin_user, auth_token):
|
||||
"""Test validation of valid token"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
user = await auth_service.validate_token(auth_token)
|
||||
|
||||
assert user is not None
|
||||
assert isinstance(user, User)
|
||||
assert user.username == "admin"
|
||||
assert user.role == UserRole.ADMINISTRATOR
|
||||
|
||||
async def test_validate_token_invalid(self, test_db_session):
|
||||
"""Test validation of invalid token"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
user = await auth_service.validate_token("invalid_token")
|
||||
|
||||
assert user is None
|
||||
|
||||
async def test_validate_token_expired(self, test_db_session, expired_token):
|
||||
"""Test validation of expired token"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
user = await auth_service.validate_token(expired_token)
|
||||
|
||||
assert user is None
|
||||
|
||||
async def test_validate_token_blacklisted(self, test_db_session, test_admin_user, auth_token):
|
||||
"""Test validation of blacklisted token (after logout)"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
# First logout to blacklist the token
|
||||
await auth_service.logout(auth_token, ip_address="127.0.0.1")
|
||||
|
||||
# Then try to validate it
|
||||
user = await auth_service.validate_token(auth_token)
|
||||
|
||||
assert user is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAuthServicePasswordHashing:
|
||||
"""Unit tests for password hashing and verification"""
|
||||
|
||||
async def test_hash_password(self, test_db_session):
|
||||
"""Test password hashing"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
plain_password = "mypassword123"
|
||||
hashed = await auth_service.hash_password(plain_password)
|
||||
|
||||
# Hash should not equal plain text
|
||||
assert hashed != plain_password
|
||||
# Hash should start with bcrypt identifier
|
||||
assert hashed.startswith("$2b$")
|
||||
|
||||
async def test_verify_password_success(self, test_db_session):
|
||||
"""Test successful password verification"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
plain_password = "mypassword123"
|
||||
hashed = await auth_service.hash_password(plain_password)
|
||||
|
||||
# Verification should succeed
|
||||
result = await auth_service.verify_password(plain_password, hashed)
|
||||
assert result is True
|
||||
|
||||
async def test_verify_password_failure(self, test_db_session):
|
||||
"""Test failed password verification"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
plain_password = "mypassword123"
|
||||
hashed = await auth_service.hash_password(plain_password)
|
||||
|
||||
# Verification with wrong password should fail
|
||||
result = await auth_service.verify_password("wrongpassword", hashed)
|
||||
assert result is False
|
||||
|
||||
async def test_hash_password_different_each_time(self, test_db_session):
|
||||
"""Test that same password produces different hashes (due to salt)"""
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
plain_password = "mypassword123"
|
||||
hash1 = await auth_service.hash_password(plain_password)
|
||||
hash2 = await auth_service.hash_password(plain_password)
|
||||
|
||||
# Hashes should be different (bcrypt uses random salt)
|
||||
assert hash1 != hash2
|
||||
|
||||
# But both should verify successfully
|
||||
assert await auth_service.verify_password(plain_password, hash1)
|
||||
assert await auth_service.verify_password(plain_password, hash2)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAuthServiceAuditLogging:
|
||||
"""Unit tests for audit logging in AuthService"""
|
||||
|
||||
async def test_login_success_creates_audit_log(self, test_db_session, test_admin_user):
|
||||
"""Test that successful login creates audit log entry"""
|
||||
from models.audit_log import AuditLog
|
||||
from sqlalchemy import select
|
||||
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
# Perform login
|
||||
await auth_service.login("admin", "admin123", ip_address="192.168.1.100")
|
||||
|
||||
# Check audit log was created
|
||||
result = await test_db_session.execute(
|
||||
select(AuditLog).where(AuditLog.action == "auth.login")
|
||||
)
|
||||
audit_logs = result.scalars().all()
|
||||
|
||||
assert len(audit_logs) >= 1
|
||||
audit_log = audit_logs[-1] # Get most recent
|
||||
assert audit_log.action == "auth.login"
|
||||
assert audit_log.target == "admin"
|
||||
assert audit_log.outcome == "success"
|
||||
assert audit_log.ip_address == "192.168.1.100"
|
||||
|
||||
async def test_login_failure_creates_audit_log(self, test_db_session):
|
||||
"""Test that failed login creates audit log entry"""
|
||||
from models.audit_log import AuditLog
|
||||
from sqlalchemy import select
|
||||
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
# Attempt login with invalid credentials
|
||||
await auth_service.login("admin", "wrongpassword", ip_address="192.168.1.100")
|
||||
|
||||
# Check audit log was created
|
||||
result = await test_db_session.execute(
|
||||
select(AuditLog).where(AuditLog.action == "auth.login").where(AuditLog.outcome == "failure")
|
||||
)
|
||||
audit_logs = result.scalars().all()
|
||||
|
||||
assert len(audit_logs) >= 1
|
||||
audit_log = audit_logs[-1]
|
||||
assert audit_log.action == "auth.login"
|
||||
assert audit_log.target == "admin"
|
||||
assert audit_log.outcome == "failure"
|
||||
assert audit_log.ip_address == "192.168.1.100"
|
||||
|
||||
async def test_logout_creates_audit_log(self, test_db_session, test_admin_user, auth_token):
|
||||
"""Test that logout creates audit log entry"""
|
||||
from models.audit_log import AuditLog
|
||||
from sqlalchemy import select
|
||||
|
||||
auth_service = AuthService(test_db_session)
|
||||
|
||||
# Perform logout
|
||||
await auth_service.logout(auth_token, ip_address="192.168.1.100")
|
||||
|
||||
# Check audit log was created
|
||||
result = await test_db_session.execute(
|
||||
select(AuditLog).where(AuditLog.action == "auth.logout")
|
||||
)
|
||||
audit_logs = result.scalars().all()
|
||||
|
||||
assert len(audit_logs) >= 1
|
||||
audit_log = audit_logs[-1]
|
||||
assert audit_log.action == "auth.logout"
|
||||
assert audit_log.outcome == "success"
|
||||
assert audit_log.ip_address == "192.168.1.100"
|
||||
Reference in New Issue
Block a user