""" Main FastAPI application for mem0 Memory System API """ import os import time import logging from datetime import datetime from typing import List, Optional from fastapi import FastAPI, HTTPException, Depends, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR # Import our modules from api.models import * from api.auth import get_api_key, get_admin_api_key, check_rate_limit, rate_limiter from api.service import memory_service, MemoryServiceError # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Create FastAPI app app = FastAPI( title="Mem0 Memory System API", description="REST API for the Mem0 Memory System with Supabase and Ollama integration", version="1.0.0", docs_url="/docs", redoc_url="/redoc" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000", "http://localhost:8080"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Store startup time for uptime calculation startup_time = time.time() # Middleware for logging and rate limit headers @app.middleware("http") async def add_process_time_header(request: Request, call_next): """Add processing time and rate limit headers""" start_time = time.time() # Process request response = await call_next(request) # Add processing time header process_time = time.time() - start_time response.headers["X-Process-Time"] = str(process_time) # Add rate limit headers if API key is present auth_header = request.headers.get("authorization") if auth_header and auth_header.startswith("Bearer "): api_key = auth_header.replace("Bearer ", "") try: _, rate_info = rate_limiter.check_rate_limit(api_key) response.headers["X-RateLimit-Limit"] = str(rate_info["limit"]) response.headers["X-RateLimit-Remaining"] = str(rate_info["remaining"]) response.headers["X-RateLimit-Reset"] = str(rate_info["reset"]) except: pass # Ignore rate limit header errors return response # Exception handlers @app.exception_handler(MemoryServiceError) async def memory_service_exception_handler(request: Request, exc: MemoryServiceError): """Handle memory service errors""" logger.error(f"Memory service error: {exc}") return JSONResponse( status_code=HTTP_500_INTERNAL_SERVER_ERROR, content=ErrorResponse( error=ErrorDetail( code="MEMORY_SERVICE_ERROR", message="Memory service error occurred", details={"error": str(exc)} ).model_dump() ).model_dump() ) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Handle HTTP exceptions with proper format""" error_detail = exc.detail # If detail is already a dict (from our auth), use it directly if isinstance(error_detail, dict): return JSONResponse( status_code=exc.status_code, content=ErrorResponse(error=error_detail).model_dump() ) # Otherwise, create proper error format return JSONResponse( status_code=exc.status_code, content=ErrorResponse( error=ErrorDetail( code="HTTP_ERROR", message=str(error_detail), details={} ).model_dump() ).model_dump() ) # Health endpoints @app.get("/health", response_model=HealthResponse, tags=["Health"]) async def health_check(): """Basic health check endpoint""" uptime = time.time() - startup_time return HealthResponse( status="healthy", uptime=uptime ) @app.get("/status", response_model=SystemStatusResponse, tags=["Health"]) async def system_status(api_key: str = Depends(get_api_key)): """Detailed system status (requires API key)""" try: # Check memory service health health = await memory_service.health_check() # Get mem0 version import mem0 mem0_version = getattr(mem0, '__version__', 'unknown') services_status = { "memory_service": health.get("status", "unknown"), "database": "healthy" if health.get("mem0_initialized") else "unhealthy", "authentication": "healthy", "rate_limiting": "healthy" } overall_status = "healthy" if all(s == "healthy" for s in services_status.values()) else "degraded" return SystemStatusResponse( status=overall_status, version="1.0.0", mem0_version=mem0_version, services=services_status, database={ "provider": "supabase", "status": "connected" if health.get("mem0_initialized") else "disconnected" } ) except Exception as e: logger.error(f"Status check failed: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "STATUS_CHECK_FAILED", "message": "Failed to retrieve system status", "details": {"error": str(e)} } ) # Memory endpoints @app.post("/v1/memories", response_model=StandardResponse, tags=["Memories"]) async def add_memory( memory_request: AddMemoryRequest, api_key: str = Depends(check_rate_limit) ): """Add new memory from messages""" try: logger.info(f"Adding memory for user: {memory_request.user_id}") # Convert to dict for service messages = [msg.model_dump() for msg in memory_request.messages] # Add memory result = await memory_service.add_memory( messages=messages, user_id=memory_request.user_id, metadata=memory_request.metadata ) return StandardResponse( success=True, data=result, message="Memory added successfully" ) except MemoryServiceError as e: logger.error(f"Failed to add memory: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "MEMORY_ADD_FAILED", "message": "Failed to add memory", "details": {"error": str(e)} } ) @app.get("/v1/memories/search", response_model=StandardResponse, tags=["Memories"]) async def search_memories( query: str, user_id: str, limit: int = 10, threshold: float = 0.0, api_key: str = Depends(check_rate_limit) ): """Search memories by content""" try: # Validate parameters if not query.strip(): raise HTTPException( status_code=400, detail={ "code": "INVALID_REQUEST", "message": "Query cannot be empty", "details": {} } ) if not user_id.strip(): raise HTTPException( status_code=400, detail={ "code": "INVALID_REQUEST", "message": "User ID cannot be empty", "details": {} } ) # Validate limits if limit < 1 or limit > 100: limit = min(max(limit, 1), 100) if threshold < 0.0 or threshold > 1.0: threshold = max(min(threshold, 1.0), 0.0) logger.info(f"Searching memories for user: {user_id}, query: {query}") # Search memories result = await memory_service.search_memories( query=query, user_id=user_id, limit=limit, threshold=threshold ) return StandardResponse( success=True, data=result, message=f"Found {result['total_results']} memories" ) except HTTPException: raise except MemoryServiceError as e: logger.error(f"Failed to search memories: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "MEMORY_SEARCH_FAILED", "message": "Failed to search memories", "details": {"error": str(e)} } ) @app.get("/v1/memories/{memory_id}", response_model=StandardResponse, tags=["Memories"]) async def get_memory( memory_id: str, user_id: str, api_key: str = Depends(check_rate_limit) ): """Get specific memory by ID""" try: logger.info(f"Getting memory {memory_id} for user: {user_id}") memory = await memory_service.get_memory(memory_id, user_id) if not memory: raise HTTPException( status_code=404, detail={ "code": "MEMORY_NOT_FOUND", "message": f"Memory with ID '{memory_id}' not found", "details": {"memory_id": memory_id, "user_id": user_id} } ) return StandardResponse( success=True, data=memory, message="Memory retrieved successfully" ) except HTTPException: raise except MemoryServiceError as e: logger.error(f"Failed to get memory: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "MEMORY_GET_FAILED", "message": "Failed to retrieve memory", "details": {"error": str(e)} } ) @app.delete("/v1/memories/{memory_id}", response_model=StandardResponse, tags=["Memories"]) async def delete_memory( memory_id: str, user_id: str, api_key: str = Depends(check_rate_limit) ): """Delete specific memory""" try: logger.info(f"Deleting memory {memory_id} for user: {user_id}") deleted = await memory_service.delete_memory(memory_id, user_id) if not deleted: raise HTTPException( status_code=404, detail={ "code": "MEMORY_NOT_FOUND", "message": f"Memory with ID '{memory_id}' not found", "details": {"memory_id": memory_id, "user_id": user_id} } ) return StandardResponse( success=True, data={ "deleted": True, "memory_id": memory_id, "user_id": user_id }, message="Memory deleted successfully" ) except HTTPException: raise except MemoryServiceError as e: logger.error(f"Failed to delete memory: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "MEMORY_DELETE_FAILED", "message": "Failed to delete memory", "details": {"error": str(e)} } ) @app.get("/v1/memories/user/{user_id}", response_model=StandardResponse, tags=["Memories"]) async def get_user_memories( user_id: str, limit: Optional[int] = None, offset: Optional[int] = None, api_key: str = Depends(check_rate_limit) ): """Get all memories for a user""" try: logger.info(f"Getting memories for user: {user_id}") result = await memory_service.get_user_memories( user_id=user_id, limit=limit, offset=offset ) return StandardResponse( success=True, data=result, message=f"Retrieved {result['total_count']} memories" ) except MemoryServiceError as e: logger.error(f"Failed to get user memories: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "USER_MEMORIES_FAILED", "message": "Failed to retrieve user memories", "details": {"error": str(e)} } ) @app.get("/v1/users/{user_id}/stats", response_model=StandardResponse, tags=["Users"]) async def get_user_stats( user_id: str, api_key: str = Depends(check_rate_limit) ): """Get user memory statistics""" try: logger.info(f"Getting stats for user: {user_id}") stats = await memory_service.get_user_stats(user_id) return StandardResponse( success=True, data=stats, message="User statistics retrieved successfully" ) except MemoryServiceError as e: logger.error(f"Failed to get user stats: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "USER_STATS_FAILED", "message": "Failed to retrieve user statistics", "details": {"error": str(e)} } ) @app.delete("/v1/users/{user_id}/memories", response_model=StandardResponse, tags=["Users"]) async def delete_user_memories( user_id: str, api_key: str = Depends(check_rate_limit) ): """Delete all memories for a user""" try: logger.info(f"Deleting all memories for user: {user_id}") deleted_count = await memory_service.delete_user_memories(user_id) return StandardResponse( success=True, data={ "deleted_count": deleted_count, "user_id": user_id }, message=f"Deleted {deleted_count} memories" ) except MemoryServiceError as e: logger.error(f"Failed to delete user memories: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "USER_DELETE_FAILED", "message": "Failed to delete user memories", "details": {"error": str(e)} } ) # Admin endpoints @app.get("/v1/metrics", response_model=StandardResponse, tags=["Admin"]) async def get_metrics(admin_key: str = Depends(get_admin_api_key)): """Get API metrics (admin only)""" try: # This is a simplified metrics implementation # In production, you'd want to use proper metrics collection metrics = { "total_requests": 0, # Would track in middleware "requests_per_minute": 0.0, "average_response_time": 0.0, "error_rate": 0.0, "active_users": 0, "top_endpoints": [], "uptime": time.time() - startup_time } return StandardResponse( success=True, data=metrics, message="Metrics retrieved successfully" ) except Exception as e: logger.error(f"Failed to get metrics: {e}") raise HTTPException( status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail={ "code": "METRICS_FAILED", "message": "Failed to retrieve metrics", "details": {"error": str(e)} } ) if __name__ == "__main__": import uvicorn host = os.getenv("API_HOST", "localhost") port = int(os.getenv("API_PORT", "8080")) logger.info(f"🚀 Starting Mem0 API server on {host}:{port}") uvicorn.run( "api.main:app", host=host, port=port, reload=True, log_level="info" )