""" MCP tools for T6 Mem0 v2 Tool definitions and handlers """ import logging from typing import Any, Dict, List from mcp.types import Tool, TextContent from mem0 import Memory from memory_cleanup import MemoryCleanup logger = logging.getLogger(__name__) class MemoryTools: """MCP tools for memory operations""" def __init__(self, memory: Memory): """ Initialize memory tools Args: memory: Mem0 instance """ self.memory = memory self.cleanup = MemoryCleanup(memory) def get_tool_definitions(self) -> List[Tool]: """ Get MCP tool definitions Returns: List of Tool definitions """ return [ Tool( name="add_memory", description="Add new memory from messages. Extracts and stores important information from conversation.", inputSchema={ "type": "object", "properties": { "messages": { "type": "array", "items": { "type": "object", "properties": { "role": {"type": "string", "enum": ["user", "assistant", "system"]}, "content": {"type": "string"} }, "required": ["role", "content"] }, "description": "Conversation messages to extract memory from" }, "user_id": { "type": "string", "description": "User identifier (optional)" }, "agent_id": { "type": "string", "description": "Agent identifier (optional)" }, "metadata": { "type": "object", "description": "Additional metadata (optional)" } }, "required": ["messages"] } ), Tool( name="search_memories", description="Search memories by semantic similarity. Find relevant memories based on a query.", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "user_id": { "type": "string", "description": "Filter by user ID (optional)" }, "agent_id": { "type": "string", "description": "Filter by agent ID (optional)" }, "limit": { "type": "integer", "minimum": 1, "maximum": 50, "default": 10, "description": "Maximum number of results" } }, "required": ["query"] } ), Tool( name="get_memory", description="Get a specific memory by its ID", inputSchema={ "type": "object", "properties": { "memory_id": { "type": "string", "description": "Memory identifier" } }, "required": ["memory_id"] } ), Tool( name="get_all_memories", description="Get all memories for a user or agent", inputSchema={ "type": "object", "properties": { "user_id": { "type": "string", "description": "User identifier (optional)" }, "agent_id": { "type": "string", "description": "Agent identifier (optional)" } } } ), Tool( name="update_memory", description="Update an existing memory's content", inputSchema={ "type": "object", "properties": { "memory_id": { "type": "string", "description": "Memory identifier" }, "data": { "type": "string", "description": "New memory content" } }, "required": ["memory_id", "data"] } ), Tool( name="delete_memory", description="Delete a specific memory by ID", inputSchema={ "type": "object", "properties": { "memory_id": { "type": "string", "description": "Memory identifier" } }, "required": ["memory_id"] } ), Tool( name="delete_all_memories", description="Delete all memories for a user or agent. Use with caution!", inputSchema={ "type": "object", "properties": { "user_id": { "type": "string", "description": "User identifier (optional)" }, "agent_id": { "type": "string", "description": "Agent identifier (optional)" } } } ) ] async def handle_add_memory(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle add_memory tool call""" try: messages = arguments.get("messages", []) user_id = arguments.get("user_id") agent_id = arguments.get("agent_id") metadata = arguments.get("metadata", {}) result = self.memory.add( messages=messages, user_id=user_id, agent_id=agent_id, metadata=metadata ) memories = result.get('results', []) response = f"Successfully added {len(memories)} memory(ies):\n\n" for mem in memories: response += f"- {mem.get('memory', mem.get('data', 'N/A'))}\n" response += f" ID: {mem.get('id', 'N/A')}\n\n" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error adding memory: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_search_memories(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle search_memories tool call""" try: query = arguments.get("query") user_id = arguments.get("user_id") agent_id = arguments.get("agent_id") limit = arguments.get("limit", 10) result = self.memory.search( query=query, user_id=user_id, agent_id=agent_id, limit=limit ) # In mem0 v0.1.118+, search returns dict with 'results' key memories = result.get('results', []) if isinstance(result, dict) else result if not memories: return [TextContent(type="text", text="No memories found matching your query.")] response = f"Found {len(memories)} relevant memory(ies):\n\n" for i, mem in enumerate(memories, 1): # Handle both string and dict responses if isinstance(mem, str): response += f"{i}. {mem}\n\n" elif isinstance(mem, dict): response += f"{i}. {mem.get('memory', mem.get('data', 'N/A'))}\n" response += f" ID: {mem.get('id', 'N/A')}\n" if 'score' in mem: response += f" Relevance: {mem['score']:.2%}\n" response += "\n" else: response += f"{i}. {str(mem)}\n\n" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error searching memories: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_get_memory(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle get_memory tool call""" try: memory_id = arguments.get("memory_id") memory = self.memory.get(memory_id=memory_id) if not memory: return [TextContent(type="text", text=f"Memory not found: {memory_id}")] response = f"Memory Details:\n\n" response += f"ID: {memory.get('id', 'N/A')}\n" response += f"Content: {memory.get('memory', memory.get('data', 'N/A'))}\n" if memory.get('user_id'): response += f"User ID: {memory['user_id']}\n" if memory.get('agent_id'): response += f"Agent ID: {memory['agent_id']}\n" if memory.get('metadata'): response += f"Metadata: {memory['metadata']}\n" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error getting memory: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_get_all_memories(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle get_all_memories tool call""" try: user_id = arguments.get("user_id") agent_id = arguments.get("agent_id") result = self.memory.get_all( user_id=user_id, agent_id=agent_id ) # In mem0 v0.1.118+, get_all returns dict with 'results' key memories = result.get('results', []) if isinstance(result, dict) else result if not memories: return [TextContent(type="text", text="No memories found.")] response = f"Retrieved {len(memories)} memory(ies):\n\n" for i, mem in enumerate(memories, 1): # Handle both string and dict responses if isinstance(mem, str): response += f"{i}. {mem}\n\n" elif isinstance(mem, dict): response += f"{i}. {mem.get('memory', mem.get('data', 'N/A'))}\n" response += f" ID: {mem.get('id', 'N/A')}\n\n" else: response += f"{i}. {str(mem)}\n\n" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error getting all memories: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_update_memory(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle update_memory tool call""" try: memory_id = arguments.get("memory_id") data = arguments.get("data") result = self.memory.update( memory_id=memory_id, data=data ) response = f"Memory updated successfully:\n\n" response += f"ID: {result.get('id', memory_id)}\n" response += f"New Content: {data}\n" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error updating memory: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_delete_memory(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle delete_memory tool call""" try: memory_id = arguments.get("memory_id") self.memory.delete(memory_id=memory_id) return [TextContent(type="text", text=f"Memory {memory_id} deleted successfully.")] except Exception as e: logger.error(f"Error deleting memory: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_delete_all_memories(self, arguments: Dict[str, Any]) -> List[TextContent]: """ Handle delete_all_memories tool call IMPORTANT: Uses synchronized deletion to ensure both Supabase (vector store) and Neo4j (graph store) are cleaned up. """ try: user_id = arguments.get("user_id") agent_id = arguments.get("agent_id") # Use synchronized deletion to clean up both Supabase and Neo4j result = self.cleanup.delete_all_synchronized( user_id=user_id, agent_id=agent_id ) filter_str = f"user_id={user_id}" if user_id else f"agent_id={agent_id}" if agent_id else "all filters" response = f"All memories deleted for {filter_str}.\n" response += f"Supabase: {'✓' if result['supabase_success'] else '✗'}, " response += f"Neo4j: {result['neo4j_nodes_deleted']} nodes deleted" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error deleting all memories: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")]