Files
t6_mem0_v2/api/routes.py
Claude Code 1998bef6f4 Add MCP HTTP/SSE server and complete n8n integration
Major Changes:
- Implemented MCP HTTP/SSE transport server for n8n and web clients
- Created mcp_server/http_server.py with FastAPI for JSON-RPC 2.0 over HTTP
- Added health check endpoint (/health) for container monitoring
- Refactored mcp-server/ to mcp_server/ (Python module structure)
- Updated Dockerfile.mcp to run HTTP server with health checks

MCP Server Features:
- 7 memory tools exposed via MCP (add, search, get, update, delete)
- HTTP/SSE transport on port 8765 for n8n integration
- stdio transport for Claude Code integration
- JSON-RPC 2.0 protocol implementation
- CORS support for web clients

n8n Integration:
- Successfully tested with AI Agent workflows
- MCP Client Tool configuration documented
- Working webhook endpoint tested and verified
- System prompt optimized for automatic user_id usage

Documentation:
- Created comprehensive Mintlify documentation site
- Added docs/mcp/introduction.mdx - MCP server overview
- Added docs/mcp/installation.mdx - Installation guide
- Added docs/mcp/tools.mdx - Complete tool reference
- Added docs/examples/n8n.mdx - n8n integration guide
- Added docs/examples/claude-code.mdx - Claude Code setup
- Updated README.md with MCP HTTP server info
- Updated roadmap to mark Phase 1 as complete

Bug Fixes:
- Fixed synchronized delete operations across Supabase and Neo4j
- Updated memory_service.py with proper error handling
- Fixed Neo4j connection issues in delete operations

Configuration:
- Added MCP_HOST and MCP_PORT environment variables
- Updated .env.example with MCP server configuration
- Updated docker-compose.yml with MCP container health checks

Testing:
- Added test scripts for MCP HTTP endpoint verification
- Created test workflows in n8n
- Verified all 7 memory tools working correctly
- Tested synchronized operations across both stores

Version: 1.0.0
Status: Phase 1 Complete - Production Ready

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 13:56:41 +02:00

379 lines
11 KiB
Python

"""
API routes for T6 Mem0 v2 REST API
"""
import logging
from typing import List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import JSONResponse
from api.auth import verify_api_key
from api.models import (
AddMemoryRequest,
AddMemoryResponse,
SearchMemoryRequest,
SearchMemoryResponse,
UpdateMemoryRequest,
MemoryResponse,
DeleteMemoryResponse,
HealthResponse,
StatsResponse,
ErrorResponse
)
from api.memory_service import get_memory_service, MemoryService
from config import settings
logger = logging.getLogger(__name__)
# Create router
router = APIRouter(prefix="/v1", tags=["memories"])
def format_memory_response(mem: dict) -> MemoryResponse:
"""Format memory data to response model"""
return MemoryResponse(
id=mem.get('id', ''),
memory=mem.get('memory', mem.get('data', '')),
user_id=mem.get('user_id'),
agent_id=mem.get('agent_id'),
run_id=mem.get('run_id'),
metadata=mem.get('metadata', {}),
created_at=mem.get('created_at'),
updated_at=mem.get('updated_at'),
score=mem.get('score')
)
@router.post(
"/memories/",
response_model=AddMemoryResponse,
status_code=status.HTTP_201_CREATED,
summary="Add new memory",
description="Add new memory from conversation messages",
responses={
201: {"description": "Memory added successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def add_memory(
request: AddMemoryRequest,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Add new memory from messages"""
try:
messages = [msg.model_dump() for msg in request.messages]
memories = await service.add_memory(
messages=messages,
user_id=request.user_id,
agent_id=request.agent_id,
run_id=request.run_id,
metadata=request.metadata
)
formatted_memories = [format_memory_response(mem) for mem in memories]
return AddMemoryResponse(
status="success",
memories=formatted_memories,
message=f"Successfully added {len(formatted_memories)} memory(ies)"
)
except Exception as e:
logger.error(f"Error adding memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/memories/search",
response_model=SearchMemoryResponse,
summary="Search memories",
description="Search memories by semantic similarity",
responses={
200: {"description": "Search completed successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def search_memories(
query: str = Query(..., description="Search query"),
user_id: str | None = Query(None, description="Filter by user ID"),
agent_id: str | None = Query(None, description="Filter by agent ID"),
run_id: str | None = Query(None, description="Filter by run ID"),
limit: int = Query(10, ge=1, le=100, description="Maximum results"),
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Search memories by query"""
try:
memories = await service.search_memories(
query=query,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit
)
formatted_memories = [format_memory_response(mem) for mem in memories]
return SearchMemoryResponse(
status="success",
memories=formatted_memories,
count=len(formatted_memories)
)
except Exception as e:
logger.error(f"Error searching memories: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/memories/{memory_id}",
response_model=MemoryResponse,
summary="Get specific memory",
description="Retrieve a specific memory by ID",
responses={
200: {"description": "Memory retrieved successfully"},
404: {"model": ErrorResponse, "description": "Memory not found"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def get_memory(
memory_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Get specific memory by ID"""
try:
memory = await service.get_memory(memory_id)
if not memory:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Memory not found: {memory_id}"
)
return format_memory_response(memory)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/memories/user/{user_id}",
response_model=List[MemoryResponse],
summary="Get user memories",
description="Get all memories for a specific user",
responses={
200: {"description": "Memories retrieved successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def get_user_memories(
user_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Get all memories for a user"""
try:
memories = await service.get_all_memories(user_id=user_id)
logger.info(f"Received {len(memories)} memories from service, types: {[type(m) for m in memories]}")
return [format_memory_response(mem) for mem in memories]
except Exception as e:
logger.error(f"Error getting user memories: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.patch(
"/memories/{memory_id}",
response_model=MemoryResponse,
summary="Update memory",
description="Update an existing memory",
responses={
200: {"description": "Memory updated successfully"},
404: {"model": ErrorResponse, "description": "Memory not found"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def update_memory(
memory_id: str,
request: UpdateMemoryRequest,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Update existing memory"""
try:
if not request.memory_text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="memory_text is required for update"
)
updated = await service.update_memory(
memory_id=memory_id,
data=request.memory_text
)
return format_memory_response(updated)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.delete(
"/memories/{memory_id}",
response_model=DeleteMemoryResponse,
summary="Delete memory",
description="Delete a specific memory",
responses={
200: {"description": "Memory deleted successfully"},
404: {"model": ErrorResponse, "description": "Memory not found"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def delete_memory(
memory_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Delete specific memory"""
try:
await service.delete_memory(memory_id)
return DeleteMemoryResponse(
status="success",
message=f"Memory {memory_id} deleted successfully"
)
except Exception as e:
logger.error(f"Error deleting memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.delete(
"/memories/user/{user_id}",
response_model=DeleteMemoryResponse,
summary="Delete user memories",
description="Delete all memories for a specific user",
responses={
200: {"description": "Memories deleted successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def delete_user_memories(
user_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Delete all memories for a user"""
try:
await service.delete_all_memories(user_id=user_id)
return DeleteMemoryResponse(
status="success",
message=f"All memories for user {user_id} deleted successfully"
)
except Exception as e:
logger.error(f"Error deleting user memories: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/health",
response_model=HealthResponse,
summary="Health check",
description="Check API health status",
responses={
200: {"description": "Service is healthy"}
}
)
async def health_check(
service: MemoryService = Depends(get_memory_service)
):
"""Health check endpoint"""
try:
health = await service.health_check()
return HealthResponse(
status="healthy" if all(v == "healthy" for v in health.values()) else "degraded",
version="0.1.0",
timestamp=datetime.utcnow(),
dependencies=health
)
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"status": "unhealthy",
"version": "0.1.0",
"timestamp": datetime.utcnow().isoformat(),
"error": str(e)
}
)
@router.get(
"/stats",
response_model=StatsResponse,
summary="Memory statistics",
description="Get system-wide memory statistics",
responses={
200: {"description": "Statistics retrieved successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def get_stats(
_api_key: str = Depends(verify_api_key)
):
"""Get memory statistics"""
# Note: This would require direct database access or extension of mem0 API
# For now, return placeholder
return StatsResponse(
total_memories=0,
total_users=0,
total_agents=0,
avg_memories_per_user=0.0,
oldest_memory=None,
newest_memory=None
)