""" API routes for T6 Mem0 v2 REST API """ import logging from typing import List from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi.responses import JSONResponse from api.auth import verify_api_key from api.models import ( AddMemoryRequest, AddMemoryResponse, SearchMemoryRequest, SearchMemoryResponse, UpdateMemoryRequest, MemoryResponse, DeleteMemoryResponse, HealthResponse, StatsResponse, ErrorResponse ) from api.memory_service import get_memory_service, MemoryService from config import settings logger = logging.getLogger(__name__) # Create router router = APIRouter(prefix="/v1", tags=["memories"]) def format_memory_response(mem: dict) -> MemoryResponse: """Format memory data to response model""" return MemoryResponse( id=mem.get('id', ''), memory=mem.get('memory', mem.get('data', '')), user_id=mem.get('user_id'), agent_id=mem.get('agent_id'), run_id=mem.get('run_id'), metadata=mem.get('metadata', {}), created_at=mem.get('created_at'), updated_at=mem.get('updated_at'), score=mem.get('score') ) @router.post( "/memories/", response_model=AddMemoryResponse, status_code=status.HTTP_201_CREATED, summary="Add new memory", description="Add new memory from conversation messages", responses={ 201: {"description": "Memory added successfully"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def add_memory( request: AddMemoryRequest, _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Add new memory from messages""" try: messages = [msg.model_dump() for msg in request.messages] memories = await service.add_memory( messages=messages, user_id=request.user_id, agent_id=request.agent_id, run_id=request.run_id, metadata=request.metadata ) formatted_memories = [format_memory_response(mem) for mem in memories] return AddMemoryResponse( status="success", memories=formatted_memories, message=f"Successfully added {len(formatted_memories)} memory(ies)" ) except Exception as e: logger.error(f"Error adding memory: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.get( "/memories/search", response_model=SearchMemoryResponse, summary="Search memories", description="Search memories by semantic similarity", responses={ 200: {"description": "Search completed successfully"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def search_memories( query: str = Query(..., description="Search query"), user_id: str | None = Query(None, description="Filter by user ID"), agent_id: str | None = Query(None, description="Filter by agent ID"), run_id: str | None = Query(None, description="Filter by run ID"), limit: int = Query(10, ge=1, le=100, description="Maximum results"), _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Search memories by query""" try: memories = await service.search_memories( query=query, user_id=user_id, agent_id=agent_id, run_id=run_id, limit=limit ) formatted_memories = [format_memory_response(mem) for mem in memories] return SearchMemoryResponse( status="success", memories=formatted_memories, count=len(formatted_memories) ) except Exception as e: logger.error(f"Error searching memories: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.get( "/memories/{memory_id}", response_model=MemoryResponse, summary="Get specific memory", description="Retrieve a specific memory by ID", responses={ 200: {"description": "Memory retrieved successfully"}, 404: {"model": ErrorResponse, "description": "Memory not found"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def get_memory( memory_id: str, _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Get specific memory by ID""" try: memory = await service.get_memory(memory_id) if not memory: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Memory not found: {memory_id}" ) return format_memory_response(memory) except HTTPException: raise except Exception as e: logger.error(f"Error getting memory: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.get( "/memories/user/{user_id}", response_model=List[MemoryResponse], summary="Get user memories", description="Get all memories for a specific user", responses={ 200: {"description": "Memories retrieved successfully"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def get_user_memories( user_id: str, _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Get all memories for a user""" try: memories = await service.get_all_memories(user_id=user_id) logger.info(f"Received {len(memories)} memories from service, types: {[type(m) for m in memories]}") return [format_memory_response(mem) for mem in memories] except Exception as e: logger.error(f"Error getting user memories: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.patch( "/memories/{memory_id}", response_model=MemoryResponse, summary="Update memory", description="Update an existing memory", responses={ 200: {"description": "Memory updated successfully"}, 404: {"model": ErrorResponse, "description": "Memory not found"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def update_memory( memory_id: str, request: UpdateMemoryRequest, _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Update existing memory""" try: if not request.memory_text: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="memory_text is required for update" ) updated = await service.update_memory( memory_id=memory_id, data=request.memory_text ) return format_memory_response(updated) except HTTPException: raise except Exception as e: logger.error(f"Error updating memory: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.delete( "/memories/{memory_id}", response_model=DeleteMemoryResponse, summary="Delete memory", description="Delete a specific memory", responses={ 200: {"description": "Memory deleted successfully"}, 404: {"model": ErrorResponse, "description": "Memory not found"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def delete_memory( memory_id: str, _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Delete specific memory""" try: await service.delete_memory(memory_id) return DeleteMemoryResponse( status="success", message=f"Memory {memory_id} deleted successfully" ) except Exception as e: logger.error(f"Error deleting memory: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.delete( "/memories/user/{user_id}", response_model=DeleteMemoryResponse, summary="Delete user memories", description="Delete all memories for a specific user", responses={ 200: {"description": "Memories deleted successfully"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def delete_user_memories( user_id: str, _api_key: str = Depends(verify_api_key), service: MemoryService = Depends(get_memory_service) ): """Delete all memories for a user""" try: await service.delete_all_memories(user_id=user_id) return DeleteMemoryResponse( status="success", message=f"All memories for user {user_id} deleted successfully" ) except Exception as e: logger.error(f"Error deleting user memories: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.get( "/health", response_model=HealthResponse, summary="Health check", description="Check API health status", responses={ 200: {"description": "Service is healthy"} } ) async def health_check( service: MemoryService = Depends(get_memory_service) ): """Health check endpoint""" try: health = await service.health_check() return HealthResponse( status="healthy" if all(v == "healthy" for v in health.values()) else "degraded", version="0.1.0", timestamp=datetime.utcnow(), dependencies=health ) except Exception as e: logger.error(f"Health check failed: {e}") return JSONResponse( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={ "status": "unhealthy", "version": "0.1.0", "timestamp": datetime.utcnow().isoformat(), "error": str(e) } ) @router.get( "/stats", response_model=StatsResponse, summary="Memory statistics", description="Get system-wide memory statistics", responses={ 200: {"description": "Statistics retrieved successfully"}, 401: {"model": ErrorResponse, "description": "Unauthorized"}, 500: {"model": ErrorResponse, "description": "Internal server error"} } ) async def get_stats( _api_key: str = Depends(verify_api_key) ): """Get memory statistics""" # Note: This would require direct database access or extension of mem0 API # For now, return placeholder return StatsResponse( total_memories=0, total_users=0, total_agents=0, avg_memories_per_user=0.0, oldest_memory=None, newest_memory=None )