""" Memory service wrapper for Mem0 core library """ import logging from typing import List, Dict, Any, Optional from mem0 import Memory from config import mem0_config from memory_cleanup import MemoryCleanup logger = logging.getLogger(__name__) class MemoryService: """Singleton service for memory operations using Mem0""" _instance: Optional['MemoryService'] = None _memory: Optional[Memory] = None _cleanup: Optional[MemoryCleanup] = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): """Initialize memory service with Mem0""" if self._memory is None: logger.info("Initializing Mem0 with configuration") try: self._memory = Memory.from_config(config_dict=mem0_config) self._cleanup = MemoryCleanup(self._memory) logger.info("Mem0 initialized successfully") except Exception as e: logger.error(f"Failed to initialize Mem0: {e}") raise @property def memory(self) -> Memory: """Get Mem0 instance""" if self._memory is None: raise RuntimeError("MemoryService not initialized") return self._memory async def add_memory( self, messages: List[Dict[str, str]], user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Add new memory from messages Args: messages: List of chat messages user_id: User identifier agent_id: Agent identifier run_id: Run identifier metadata: Additional metadata Returns: List of created memories Raises: Exception: If memory creation fails """ try: logger.info(f"Adding memory for user_id={user_id}, agent_id={agent_id}") result = self.memory.add( messages=messages, user_id=user_id, agent_id=agent_id, run_id=run_id, metadata=metadata or {} ) logger.info(f"Successfully added {len(result.get('results', []))} memories") return result.get('results', []) except Exception as e: logger.error(f"Failed to add memory: {e}") raise async def search_memories( self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None, limit: int = 10 ) -> List[Dict[str, Any]]: """ Search memories by query Args: query: Search query user_id: User identifier filter agent_id: Agent identifier filter run_id: Run identifier filter limit: Maximum results Returns: List of matching memories with scores Raises: Exception: If search fails """ try: logger.info(f"Searching memories: query='{query}', user_id={user_id}, limit={limit}") result = self.memory.search( query=query, user_id=user_id, agent_id=agent_id, run_id=run_id, limit=limit ) # In mem0 v0.1.118+, search returns dict with 'results' key memories_list = result.get('results', []) if isinstance(result, dict) else result # Handle both string and dict responses from mem0 formatted_results = [] for item in memories_list: if isinstance(item, str): # Convert string memory to dict format formatted_results.append({ 'id': '', 'memory': item, 'user_id': user_id, 'agent_id': agent_id, 'run_id': run_id, 'metadata': {}, 'created_at': None, 'updated_at': None, 'score': None }) elif isinstance(item, dict): # Already in dict format formatted_results.append(item) else: logger.warning(f"Unexpected memory format: {type(item)}") logger.info(f"Found {len(formatted_results)} matching memories") return formatted_results except Exception as e: logger.error(f"Failed to search memories: {e}") raise async def get_memory( self, memory_id: str ) -> Optional[Dict[str, Any]]: """ Get specific memory by ID Args: memory_id: Memory identifier Returns: Memory data or None if not found Raises: Exception: If retrieval fails """ try: logger.info(f"Getting memory: id={memory_id}") result = self.memory.get(memory_id=memory_id) if result: logger.info(f"Retrieved memory: {memory_id}") else: logger.warning(f"Memory not found: {memory_id}") return result except Exception as e: logger.error(f"Failed to get memory: {e}") raise async def get_all_memories( self, user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get all memories for a user/agent/run Args: user_id: User identifier filter agent_id: Agent identifier filter run_id: Run identifier filter Returns: List of all matching memories Raises: Exception: If retrieval fails """ try: logger.info(f"Getting all memories: user_id={user_id}, agent_id={agent_id}") result = self.memory.get_all( user_id=user_id, agent_id=agent_id, run_id=run_id ) # In mem0 v0.1.118+, get_all returns dict with 'results' key memories_list = result.get('results', []) if isinstance(result, dict) else result # Handle both string and dict responses from mem0 formatted_results = [] for item in memories_list: if isinstance(item, str): # Convert string memory to dict format formatted_results.append({ 'id': '', 'memory': item, 'user_id': user_id, 'agent_id': agent_id, 'run_id': run_id, 'metadata': {}, 'created_at': None, 'updated_at': None, 'score': None }) elif isinstance(item, dict): # Already in dict format formatted_results.append(item) else: logger.warning(f"Unexpected memory format: {type(item)}") logger.info(f"Retrieved {len(formatted_results)} memories") return formatted_results except Exception as e: logger.error(f"Failed to get all memories: {e}") raise async def update_memory( self, memory_id: str, data: str ) -> Dict[str, Any]: """ Update existing memory Args: memory_id: Memory identifier data: Updated memory text Returns: Updated memory data Raises: Exception: If update fails """ try: logger.info(f"Updating memory: id={memory_id}") result = self.memory.update( memory_id=memory_id, data=data ) logger.info(f"Successfully updated memory: {memory_id}") return result except Exception as e: logger.error(f"Failed to update memory: {e}") raise async def delete_memory( self, memory_id: str ) -> bool: """ Delete memory by ID Args: memory_id: Memory identifier Returns: True if deleted successfully Raises: Exception: If deletion fails """ try: logger.info(f"Deleting memory: id={memory_id}") self.memory.delete(memory_id=memory_id) logger.info(f"Successfully deleted memory: {memory_id}") return True except Exception as e: logger.error(f"Failed to delete memory: {e}") raise async def delete_all_memories( self, user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None ) -> bool: """ Delete all memories for a user/agent/run IMPORTANT: This uses synchronized deletion to ensure both Supabase (vector store) and Neo4j (graph store) are cleaned up. Args: user_id: User identifier filter agent_id: Agent identifier filter run_id: Run identifier filter Returns: True if deleted successfully Raises: Exception: If deletion fails """ try: logger.info(f"Deleting all memories (synchronized): user_id={user_id}, agent_id={agent_id}") # Use synchronized deletion to clean up both Supabase and Neo4j result = self._cleanup.delete_all_synchronized( user_id=user_id, agent_id=agent_id, run_id=run_id ) logger.info(f"Successfully deleted all matching memories: {result}") return True except Exception as e: logger.error(f"Failed to delete all memories: {e}") raise async def get_memory_history( self, memory_id: str ) -> List[Dict[str, Any]]: """ Get history of a memory Args: memory_id: Memory identifier Returns: List of memory history entries Raises: Exception: If retrieval fails """ try: logger.info(f"Getting memory history: id={memory_id}") result = self.memory.history(memory_id=memory_id) logger.info(f"Retrieved history for memory: {memory_id}") return result except Exception as e: logger.error(f"Failed to get memory history: {e}") raise async def health_check(self) -> Dict[str, str]: """ Check health of memory service Returns: Dict with component health status """ health = {} try: # Test Mem0 access self.memory health['mem0'] = 'healthy' except Exception as e: logger.error(f"Mem0 health check failed: {e}") health['mem0'] = 'unhealthy' return health # Global service instance _memory_service: Optional[MemoryService] = None def get_memory_service() -> MemoryService: """ Get or create memory service singleton Returns: MemoryService instance """ global _memory_service if _memory_service is None: _memory_service = MemoryService() return _memory_service