Files
t6_mem0_v2/mcp-server/tools.py
Claude Code 61a4050a8e Complete implementation: REST API, MCP server, and documentation
Implementation Summary:
- REST API with FastAPI (complete CRUD operations)
- MCP Server with Python MCP SDK (7 tools)
- Supabase migrations (pgvector setup)
- Docker Compose orchestration
- Mintlify documentation site
- Environment configuration
- Shared config module

REST API Features:
- POST /v1/memories/ - Add memory
- GET /v1/memories/search - Semantic search
- GET /v1/memories/{id} - Get memory
- GET /v1/memories/user/{user_id} - User memories
- PATCH /v1/memories/{id} - Update memory
- DELETE /v1/memories/{id} - Delete memory
- GET /v1/health - Health check
- GET /v1/stats - Statistics
- Bearer token authentication
- OpenAPI documentation

MCP Server Tools:
- add_memory - Add from messages
- search_memories - Semantic search
- get_memory - Retrieve by ID
- get_all_memories - List all
- update_memory - Update content
- delete_memory - Delete by ID
- delete_all_memories - Bulk delete

Infrastructure:
- Neo4j 5.26 with APOC/GDS
- Supabase pgvector integration
- Docker network: localai
- Health checks and monitoring
- Structured logging

Documentation:
- Introduction page
- Quickstart guide
- Architecture deep dive
- Mintlify configuration

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-14 08:44:16 +02:00

344 lines
13 KiB
Python

"""
MCP tools for T6 Mem0 v2
Tool definitions and handlers
"""
import logging
from typing import Any, Dict, List
from mcp.types import Tool, TextContent
from mem0 import Memory
logger = logging.getLogger(__name__)
class MemoryTools:
"""MCP tools for memory operations"""
def __init__(self, memory: Memory):
"""
Initialize memory tools
Args:
memory: Mem0 instance
"""
self.memory = memory
def get_tool_definitions(self) -> List[Tool]:
"""
Get MCP tool definitions
Returns:
List of Tool definitions
"""
return [
Tool(
name="add_memory",
description="Add new memory from messages. Extracts and stores important information from conversation.",
inputSchema={
"type": "object",
"properties": {
"messages": {
"type": "array",
"items": {
"type": "object",
"properties": {
"role": {"type": "string", "enum": ["user", "assistant", "system"]},
"content": {"type": "string"}
},
"required": ["role", "content"]
},
"description": "Conversation messages to extract memory from"
},
"user_id": {
"type": "string",
"description": "User identifier (optional)"
},
"agent_id": {
"type": "string",
"description": "Agent identifier (optional)"
},
"metadata": {
"type": "object",
"description": "Additional metadata (optional)"
}
},
"required": ["messages"]
}
),
Tool(
name="search_memories",
description="Search memories by semantic similarity. Find relevant memories based on a query.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"user_id": {
"type": "string",
"description": "Filter by user ID (optional)"
},
"agent_id": {
"type": "string",
"description": "Filter by agent ID (optional)"
},
"limit": {
"type": "integer",
"minimum": 1,
"maximum": 50,
"default": 10,
"description": "Maximum number of results"
}
},
"required": ["query"]
}
),
Tool(
name="get_memory",
description="Get a specific memory by its ID",
inputSchema={
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "Memory identifier"
}
},
"required": ["memory_id"]
}
),
Tool(
name="get_all_memories",
description="Get all memories for a user or agent",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User identifier (optional)"
},
"agent_id": {
"type": "string",
"description": "Agent identifier (optional)"
}
}
}
),
Tool(
name="update_memory",
description="Update an existing memory's content",
inputSchema={
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "Memory identifier"
},
"data": {
"type": "string",
"description": "New memory content"
}
},
"required": ["memory_id", "data"]
}
),
Tool(
name="delete_memory",
description="Delete a specific memory by ID",
inputSchema={
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "Memory identifier"
}
},
"required": ["memory_id"]
}
),
Tool(
name="delete_all_memories",
description="Delete all memories for a user or agent. Use with caution!",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User identifier (optional)"
},
"agent_id": {
"type": "string",
"description": "Agent identifier (optional)"
}
}
}
)
]
async def handle_add_memory(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle add_memory tool call"""
try:
messages = arguments.get("messages", [])
user_id = arguments.get("user_id")
agent_id = arguments.get("agent_id")
metadata = arguments.get("metadata", {})
result = self.memory.add(
messages=messages,
user_id=user_id,
agent_id=agent_id,
metadata=metadata
)
memories = result.get('results', [])
response = f"Successfully added {len(memories)} memory(ies):\n\n"
for mem in memories:
response += f"- {mem.get('memory', mem.get('data', 'N/A'))}\n"
response += f" ID: {mem.get('id', 'N/A')}\n\n"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error adding memory: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_search_memories(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle search_memories tool call"""
try:
query = arguments.get("query")
user_id = arguments.get("user_id")
agent_id = arguments.get("agent_id")
limit = arguments.get("limit", 10)
memories = self.memory.search(
query=query,
user_id=user_id,
agent_id=agent_id,
limit=limit
)
if not memories:
return [TextContent(type="text", text="No memories found matching your query.")]
response = f"Found {len(memories)} relevant memory(ies):\n\n"
for i, mem in enumerate(memories, 1):
response += f"{i}. {mem.get('memory', mem.get('data', 'N/A'))}\n"
response += f" ID: {mem.get('id', 'N/A')}\n"
if 'score' in mem:
response += f" Relevance: {mem['score']:.2%}\n"
response += "\n"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error searching memories: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_get_memory(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle get_memory tool call"""
try:
memory_id = arguments.get("memory_id")
memory = self.memory.get(memory_id=memory_id)
if not memory:
return [TextContent(type="text", text=f"Memory not found: {memory_id}")]
response = f"Memory Details:\n\n"
response += f"ID: {memory.get('id', 'N/A')}\n"
response += f"Content: {memory.get('memory', memory.get('data', 'N/A'))}\n"
if memory.get('user_id'):
response += f"User ID: {memory['user_id']}\n"
if memory.get('agent_id'):
response += f"Agent ID: {memory['agent_id']}\n"
if memory.get('metadata'):
response += f"Metadata: {memory['metadata']}\n"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error getting memory: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_get_all_memories(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle get_all_memories tool call"""
try:
user_id = arguments.get("user_id")
agent_id = arguments.get("agent_id")
memories = self.memory.get_all(
user_id=user_id,
agent_id=agent_id
)
if not memories:
return [TextContent(type="text", text="No memories found.")]
response = f"Retrieved {len(memories)} memory(ies):\n\n"
for i, mem in enumerate(memories, 1):
response += f"{i}. {mem.get('memory', mem.get('data', 'N/A'))}\n"
response += f" ID: {mem.get('id', 'N/A')}\n\n"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error getting all memories: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_update_memory(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle update_memory tool call"""
try:
memory_id = arguments.get("memory_id")
data = arguments.get("data")
result = self.memory.update(
memory_id=memory_id,
data=data
)
response = f"Memory updated successfully:\n\n"
response += f"ID: {result.get('id', memory_id)}\n"
response += f"New Content: {data}\n"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error updating memory: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_delete_memory(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle delete_memory tool call"""
try:
memory_id = arguments.get("memory_id")
self.memory.delete(memory_id=memory_id)
return [TextContent(type="text", text=f"Memory {memory_id} deleted successfully.")]
except Exception as e:
logger.error(f"Error deleting memory: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_delete_all_memories(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle delete_all_memories tool call"""
try:
user_id = arguments.get("user_id")
agent_id = arguments.get("agent_id")
self.memory.delete_all(
user_id=user_id,
agent_id=agent_id
)
filter_str = f"user_id={user_id}" if user_id else f"agent_id={agent_id}" if agent_id else "all filters"
return [TextContent(type="text", text=f"All memories deleted for {filter_str}.")]
except Exception as e:
logger.error(f"Error deleting all memories: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]