""" Memory service layer - abstraction over mem0 core functionality """ import logging import time from typing import List, Dict, Any, Optional from datetime import datetime from mem0 import Memory from config import load_config, get_mem0_config logger = logging.getLogger(__name__) class MemoryServiceError(Exception): """Base exception for memory service errors""" pass class MemoryService: """Service layer for memory operations""" def __init__(self): self._memory = None self._config = None self._initialize_memory() def _initialize_memory(self): """Initialize mem0 Memory instance""" try: logger.info("Initializing mem0 Memory service...") system_config = load_config() self._config = get_mem0_config(system_config, "ollama") self._memory = Memory.from_config(self._config) logger.info("✅ Memory service initialized successfully") except Exception as e: logger.error(f"❌ Failed to initialize memory service: {e}") raise MemoryServiceError(f"Failed to initialize memory service: {e}") @property def memory(self) -> Memory: """Get mem0 Memory instance""" if self._memory is None: self._initialize_memory() return self._memory async def add_memory(self, messages: List[Dict[str, str]], user_id: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Add new memory from messages""" try: logger.info(f"Adding memory for user {user_id}") # Convert messages to content string content = self._messages_to_content(messages) # Add metadata if metadata is None: metadata = {} metadata.update({ "source": "api", "timestamp": datetime.now().isoformat(), "message_count": len(messages) }) # Add memory using mem0 result = self.memory.add(content, user_id=user_id, metadata=metadata) logger.info(f"✅ Memory added for user {user_id}: {result}") return result except Exception as e: logger.error(f"❌ Failed to add memory for user {user_id}: {e}") raise MemoryServiceError(f"Failed to add memory: {e}") async def search_memories(self, query: str, user_id: str, limit: int = 10, threshold: float = 0.0) -> Dict[str, Any]: """Search memories for a user""" try: logger.info(f"Searching memories for user {user_id} with query: {query}") start_time = time.time() # Search using mem0 result = self.memory.search(query, user_id=user_id, limit=limit) execution_time = time.time() - start_time # Process results if isinstance(result, dict) and 'results' in result: results = result['results'] # Filter by threshold if specified if threshold > 0.0: results = [r for r in results if r.get('score', 0) >= threshold] else: results = [] search_response = { "results": results, "query": query, "total_results": len(results), "execution_time": execution_time } logger.info(f"✅ Search completed for user {user_id}: {len(results)} results in {execution_time:.3f}s") return search_response except Exception as e: logger.error(f"❌ Failed to search memories for user {user_id}: {e}") raise MemoryServiceError(f"Failed to search memories: {e}") async def get_memory(self, memory_id: str, user_id: str) -> Optional[Dict[str, Any]]: """Get specific memory by ID""" try: logger.info(f"Getting memory {memory_id} for user {user_id}") # Get all user memories and find the specific one all_memories = self.memory.get_all(user_id=user_id) if isinstance(all_memories, dict) and 'results' in all_memories: for memory in all_memories['results']: if memory.get('id') == memory_id: logger.info(f"✅ Found memory {memory_id} for user {user_id}") return memory logger.warning(f"Memory {memory_id} not found for user {user_id}") return None except Exception as e: logger.error(f"❌ Failed to get memory {memory_id} for user {user_id}: {e}") raise MemoryServiceError(f"Failed to get memory: {e}") async def update_memory(self, memory_id: str, user_id: str, content: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: """Update existing memory""" try: logger.info(f"Updating memory {memory_id} for user {user_id}") # First check if memory exists existing_memory = await self.get_memory(memory_id, user_id) if not existing_memory: return None # mem0 doesn't have direct update, so we'll delete and re-add # This is a simplified implementation if content: # Delete old memory self.memory.delete(memory_id) # Add new memory with updated content updated_metadata = existing_memory.get('metadata', {}) if metadata: updated_metadata.update(metadata) result = self.memory.add(content, user_id=user_id, metadata=updated_metadata) logger.info(f"✅ Memory updated for user {user_id}: {result}") return result logger.warning(f"No content provided for updating memory {memory_id}") return existing_memory except Exception as e: logger.error(f"❌ Failed to update memory {memory_id} for user {user_id}: {e}") raise MemoryServiceError(f"Failed to update memory: {e}") async def delete_memory(self, memory_id: str, user_id: str) -> bool: """Delete specific memory""" try: logger.info(f"Deleting memory {memory_id} for user {user_id}") # Check if memory exists first existing_memory = await self.get_memory(memory_id, user_id) if not existing_memory: logger.warning(f"Memory {memory_id} not found for user {user_id}") return False # Delete using mem0 self.memory.delete(memory_id) logger.info(f"✅ Memory {memory_id} deleted for user {user_id}") return True except Exception as e: logger.error(f"❌ Failed to delete memory {memory_id} for user {user_id}: {e}") raise MemoryServiceError(f"Failed to delete memory: {e}") async def get_user_memories(self, user_id: str, limit: Optional[int] = None, offset: Optional[int] = None) -> Dict[str, Any]: """Get all memories for a user""" try: logger.info(f"Getting all memories for user {user_id}") # Get all user memories result = self.memory.get_all(user_id=user_id) if isinstance(result, dict) and 'results' in result: all_memories = result['results'] else: all_memories = [] # Apply pagination if specified if offset is not None: all_memories = all_memories[offset:] if limit is not None: all_memories = all_memories[:limit] response = { "results": all_memories, "user_id": user_id, "total_count": len(all_memories) } logger.info(f"✅ Retrieved {len(all_memories)} memories for user {user_id}") return response except Exception as e: logger.error(f"❌ Failed to get memories for user {user_id}: {e}") raise MemoryServiceError(f"Failed to get user memories: {e}") async def delete_user_memories(self, user_id: str) -> int: """Delete all memories for a user""" try: logger.info(f"Deleting all memories for user {user_id}") # Get all user memories user_memories = await self.get_user_memories(user_id) memories = user_memories.get('results', []) deleted_count = 0 for memory in memories: memory_id = memory.get('id') if memory_id: if await self.delete_memory(memory_id, user_id): deleted_count += 1 logger.info(f"✅ Deleted {deleted_count} memories for user {user_id}") return deleted_count except Exception as e: logger.error(f"❌ Failed to delete memories for user {user_id}: {e}") raise MemoryServiceError(f"Failed to delete user memories: {e}") async def get_user_stats(self, user_id: str) -> Dict[str, Any]: """Get statistics for a user""" try: logger.info(f"Getting stats for user {user_id}") # Get all user memories user_memories = await self.get_user_memories(user_id) memories = user_memories.get('results', []) if not memories: return { "user_id": user_id, "total_memories": 0, "recent_memories": 0, "oldest_memory": None, "newest_memory": None, "storage_usage": {"estimated_size": 0} } # Calculate statistics now = datetime.now() recent_count = 0 oldest_time = None newest_time = None for memory in memories: created_at_str = memory.get('created_at') if created_at_str: try: created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) # Check if recent (last 24 hours) if (now - created_at).total_seconds() < 86400: recent_count += 1 # Track oldest and newest if oldest_time is None or created_at < oldest_time: oldest_time = created_at if newest_time is None or created_at > newest_time: newest_time = created_at except (ValueError, TypeError): continue stats = { "user_id": user_id, "total_memories": len(memories), "recent_memories": recent_count, "oldest_memory": oldest_time, "newest_memory": newest_time, "storage_usage": { "estimated_size": sum(len(str(m)) for m in memories), "memory_count": len(memories) } } logger.info(f"✅ Retrieved stats for user {user_id}: {stats['total_memories']} memories") return stats except Exception as e: logger.error(f"❌ Failed to get stats for user {user_id}: {e}") raise MemoryServiceError(f"Failed to get user stats: {e}") def _messages_to_content(self, messages: List[Dict[str, str]]) -> str: """Convert messages list to content string""" if not messages: return "" if len(messages) == 1: return messages[0].get('content', '') # Combine multiple messages content_parts = [] for msg in messages: role = msg.get('role', 'user') content = msg.get('content', '') if content.strip(): content_parts.append(f"{role}: {content}") return " | ".join(content_parts) async def health_check(self) -> Dict[str, Any]: """Check service health""" try: # Simple health check - try to access the memory instance if self._memory is not None: return {"status": "healthy", "mem0_initialized": True} else: return {"status": "unhealthy", "mem0_initialized": False} except Exception as e: logger.error(f"Health check failed: {e}") return {"status": "unhealthy", "error": str(e)} # Global service instance memory_service = MemoryService()