Files
geutebruck-api/src/api/tests/test_auth_service.py
Geutebruck API Developer fbebe10711 Phase 4: Authentication System (T039-T048)
Implemented complete JWT-based authentication system with RBAC:

**Tests (TDD Approach):**
- Created contract tests for /api/v1/auth/login endpoint
- Created contract tests for /api/v1/auth/logout endpoint
- Created unit tests for AuthService (login, logout, validate_token, password hashing)
- Created pytest configuration and fixtures (test DB, test users, tokens)

**Schemas:**
- LoginRequest: username/password validation
- TokenResponse: access_token, refresh_token, user info
- LogoutResponse: logout confirmation
- RefreshTokenRequest: token refresh payload
- UserInfo: user data (excludes password_hash)

**Services:**
- AuthService: login(), logout(), validate_token(), hash_password(), verify_password()
- Integrated bcrypt password hashing
- JWT token generation (access + refresh tokens)
- Token blacklisting in Redis
- Audit logging for all auth operations

**Middleware:**
- Authentication middleware with JWT validation
- Role-based access control (RBAC) helpers
- require_role() dependency factory
- Convenience dependencies: require_viewer(), require_operator(), require_administrator()
- Client IP and User-Agent extraction

**Router:**
- POST /api/v1/auth/login - Authenticate and get tokens
- POST /api/v1/auth/logout - Blacklist token
- POST /api/v1/auth/refresh - Refresh access token
- GET /api/v1/auth/me - Get current user info

**Integration:**
- Registered auth router in main.py
- Updated startup event to initialize Redis and SDK Bridge clients
- Updated shutdown event to cleanup connections properly
- Fixed error translation utilities
- Added asyncpg dependency for PostgreSQL async driver

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-09 09:04:16 +01:00

267 lines
9.5 KiB
Python

"""
Unit tests for AuthService
These tests will FAIL until AuthService is implemented
"""
import pytest
from datetime import datetime, timedelta
import uuid
from services.auth_service import AuthService
from models.user import User, UserRole
@pytest.mark.asyncio
class TestAuthServiceLogin:
"""Unit tests for AuthService.login()"""
async def test_login_success(self, test_db_session, test_admin_user):
"""Test successful login with valid credentials"""
auth_service = AuthService(test_db_session)
result = await auth_service.login("admin", "admin123", ip_address="127.0.0.1")
assert result is not None
assert "access_token" in result
assert "refresh_token" in result
assert "token_type" in result
assert result["token_type"] == "bearer"
assert "expires_in" in result
assert "user" in result
assert result["user"]["username"] == "admin"
assert result["user"]["role"] == "administrator"
async def test_login_invalid_username(self, test_db_session):
"""Test login with non-existent username"""
auth_service = AuthService(test_db_session)
result = await auth_service.login("nonexistent", "somepassword", ip_address="127.0.0.1")
assert result is None
async def test_login_invalid_password(self, test_db_session, test_admin_user):
"""Test login with incorrect password"""
auth_service = AuthService(test_db_session)
result = await auth_service.login("admin", "wrongpassword", ip_address="127.0.0.1")
assert result is None
async def test_login_operator(self, test_db_session, test_operator_user):
"""Test successful login for operator role"""
auth_service = AuthService(test_db_session)
result = await auth_service.login("operator", "operator123", ip_address="127.0.0.1")
assert result is not None
assert result["user"]["role"] == "operator"
async def test_login_viewer(self, test_db_session, test_viewer_user):
"""Test successful login for viewer role"""
auth_service = AuthService(test_db_session)
result = await auth_service.login("viewer", "viewer123", ip_address="127.0.0.1")
assert result is not None
assert result["user"]["role"] == "viewer"
@pytest.mark.asyncio
class TestAuthServiceLogout:
"""Unit tests for AuthService.logout()"""
async def test_logout_success(self, test_db_session, test_admin_user, auth_token):
"""Test successful logout"""
auth_service = AuthService(test_db_session)
# Logout should add token to blacklist
result = await auth_service.logout(auth_token, ip_address="127.0.0.1")
assert result is True
async def test_logout_invalid_token(self, test_db_session):
"""Test logout with invalid token"""
auth_service = AuthService(test_db_session)
result = await auth_service.logout("invalid_token", ip_address="127.0.0.1")
assert result is False
async def test_logout_expired_token(self, test_db_session, expired_token):
"""Test logout with expired token"""
auth_service = AuthService(test_db_session)
result = await auth_service.logout(expired_token, ip_address="127.0.0.1")
assert result is False
@pytest.mark.asyncio
class TestAuthServiceValidateToken:
"""Unit tests for AuthService.validate_token()"""
async def test_validate_token_success(self, test_db_session, test_admin_user, auth_token):
"""Test validation of valid token"""
auth_service = AuthService(test_db_session)
user = await auth_service.validate_token(auth_token)
assert user is not None
assert isinstance(user, User)
assert user.username == "admin"
assert user.role == UserRole.ADMINISTRATOR
async def test_validate_token_invalid(self, test_db_session):
"""Test validation of invalid token"""
auth_service = AuthService(test_db_session)
user = await auth_service.validate_token("invalid_token")
assert user is None
async def test_validate_token_expired(self, test_db_session, expired_token):
"""Test validation of expired token"""
auth_service = AuthService(test_db_session)
user = await auth_service.validate_token(expired_token)
assert user is None
async def test_validate_token_blacklisted(self, test_db_session, test_admin_user, auth_token):
"""Test validation of blacklisted token (after logout)"""
auth_service = AuthService(test_db_session)
# First logout to blacklist the token
await auth_service.logout(auth_token, ip_address="127.0.0.1")
# Then try to validate it
user = await auth_service.validate_token(auth_token)
assert user is None
@pytest.mark.asyncio
class TestAuthServicePasswordHashing:
"""Unit tests for password hashing and verification"""
async def test_hash_password(self, test_db_session):
"""Test password hashing"""
auth_service = AuthService(test_db_session)
plain_password = "mypassword123"
hashed = await auth_service.hash_password(plain_password)
# Hash should not equal plain text
assert hashed != plain_password
# Hash should start with bcrypt identifier
assert hashed.startswith("$2b$")
async def test_verify_password_success(self, test_db_session):
"""Test successful password verification"""
auth_service = AuthService(test_db_session)
plain_password = "mypassword123"
hashed = await auth_service.hash_password(plain_password)
# Verification should succeed
result = await auth_service.verify_password(plain_password, hashed)
assert result is True
async def test_verify_password_failure(self, test_db_session):
"""Test failed password verification"""
auth_service = AuthService(test_db_session)
plain_password = "mypassword123"
hashed = await auth_service.hash_password(plain_password)
# Verification with wrong password should fail
result = await auth_service.verify_password("wrongpassword", hashed)
assert result is False
async def test_hash_password_different_each_time(self, test_db_session):
"""Test that same password produces different hashes (due to salt)"""
auth_service = AuthService(test_db_session)
plain_password = "mypassword123"
hash1 = await auth_service.hash_password(plain_password)
hash2 = await auth_service.hash_password(plain_password)
# Hashes should be different (bcrypt uses random salt)
assert hash1 != hash2
# But both should verify successfully
assert await auth_service.verify_password(plain_password, hash1)
assert await auth_service.verify_password(plain_password, hash2)
@pytest.mark.asyncio
class TestAuthServiceAuditLogging:
"""Unit tests for audit logging in AuthService"""
async def test_login_success_creates_audit_log(self, test_db_session, test_admin_user):
"""Test that successful login creates audit log entry"""
from models.audit_log import AuditLog
from sqlalchemy import select
auth_service = AuthService(test_db_session)
# Perform login
await auth_service.login("admin", "admin123", ip_address="192.168.1.100")
# Check audit log was created
result = await test_db_session.execute(
select(AuditLog).where(AuditLog.action == "auth.login")
)
audit_logs = result.scalars().all()
assert len(audit_logs) >= 1
audit_log = audit_logs[-1] # Get most recent
assert audit_log.action == "auth.login"
assert audit_log.target == "admin"
assert audit_log.outcome == "success"
assert audit_log.ip_address == "192.168.1.100"
async def test_login_failure_creates_audit_log(self, test_db_session):
"""Test that failed login creates audit log entry"""
from models.audit_log import AuditLog
from sqlalchemy import select
auth_service = AuthService(test_db_session)
# Attempt login with invalid credentials
await auth_service.login("admin", "wrongpassword", ip_address="192.168.1.100")
# Check audit log was created
result = await test_db_session.execute(
select(AuditLog).where(AuditLog.action == "auth.login").where(AuditLog.outcome == "failure")
)
audit_logs = result.scalars().all()
assert len(audit_logs) >= 1
audit_log = audit_logs[-1]
assert audit_log.action == "auth.login"
assert audit_log.target == "admin"
assert audit_log.outcome == "failure"
assert audit_log.ip_address == "192.168.1.100"
async def test_logout_creates_audit_log(self, test_db_session, test_admin_user, auth_token):
"""Test that logout creates audit log entry"""
from models.audit_log import AuditLog
from sqlalchemy import select
auth_service = AuthService(test_db_session)
# Perform logout
await auth_service.logout(auth_token, ip_address="192.168.1.100")
# Check audit log was created
result = await test_db_session.execute(
select(AuditLog).where(AuditLog.action == "auth.logout")
)
audit_logs = result.scalars().all()
assert len(audit_logs) >= 1
audit_log = audit_logs[-1]
assert audit_log.action == "auth.logout"
assert audit_log.outcome == "success"
assert audit_log.ip_address == "192.168.1.100"