Major Changes: - Implemented MCP HTTP/SSE transport server for n8n and web clients - Created mcp_server/http_server.py with FastAPI for JSON-RPC 2.0 over HTTP - Added health check endpoint (/health) for container monitoring - Refactored mcp-server/ to mcp_server/ (Python module structure) - Updated Dockerfile.mcp to run HTTP server with health checks MCP Server Features: - 7 memory tools exposed via MCP (add, search, get, update, delete) - HTTP/SSE transport on port 8765 for n8n integration - stdio transport for Claude Code integration - JSON-RPC 2.0 protocol implementation - CORS support for web clients n8n Integration: - Successfully tested with AI Agent workflows - MCP Client Tool configuration documented - Working webhook endpoint tested and verified - System prompt optimized for automatic user_id usage Documentation: - Created comprehensive Mintlify documentation site - Added docs/mcp/introduction.mdx - MCP server overview - Added docs/mcp/installation.mdx - Installation guide - Added docs/mcp/tools.mdx - Complete tool reference - Added docs/examples/n8n.mdx - n8n integration guide - Added docs/examples/claude-code.mdx - Claude Code setup - Updated README.md with MCP HTTP server info - Updated roadmap to mark Phase 1 as complete Bug Fixes: - Fixed synchronized delete operations across Supabase and Neo4j - Updated memory_service.py with proper error handling - Fixed Neo4j connection issues in delete operations Configuration: - Added MCP_HOST and MCP_PORT environment variables - Updated .env.example with MCP server configuration - Updated docker-compose.yml with MCP container health checks Testing: - Added test scripts for MCP HTTP endpoint verification - Created test workflows in n8n - Verified all 7 memory tools working correctly - Tested synchronized operations across both stores Version: 1.0.0 Status: Phase 1 Complete - Production Ready 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
379 lines
11 KiB
Python
379 lines
11 KiB
Python
"""
|
|
API routes for T6 Mem0 v2 REST API
|
|
"""
|
|
|
|
import logging
|
|
from typing import List
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from api.auth import verify_api_key
|
|
from api.models import (
|
|
AddMemoryRequest,
|
|
AddMemoryResponse,
|
|
SearchMemoryRequest,
|
|
SearchMemoryResponse,
|
|
UpdateMemoryRequest,
|
|
MemoryResponse,
|
|
DeleteMemoryResponse,
|
|
HealthResponse,
|
|
StatsResponse,
|
|
ErrorResponse
|
|
)
|
|
from api.memory_service import get_memory_service, MemoryService
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router
|
|
router = APIRouter(prefix="/v1", tags=["memories"])
|
|
|
|
|
|
def format_memory_response(mem: dict) -> MemoryResponse:
|
|
"""Format memory data to response model"""
|
|
return MemoryResponse(
|
|
id=mem.get('id', ''),
|
|
memory=mem.get('memory', mem.get('data', '')),
|
|
user_id=mem.get('user_id'),
|
|
agent_id=mem.get('agent_id'),
|
|
run_id=mem.get('run_id'),
|
|
metadata=mem.get('metadata', {}),
|
|
created_at=mem.get('created_at'),
|
|
updated_at=mem.get('updated_at'),
|
|
score=mem.get('score')
|
|
)
|
|
|
|
|
|
@router.post(
|
|
"/memories/",
|
|
response_model=AddMemoryResponse,
|
|
status_code=status.HTTP_201_CREATED,
|
|
summary="Add new memory",
|
|
description="Add new memory from conversation messages",
|
|
responses={
|
|
201: {"description": "Memory added successfully"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def add_memory(
|
|
request: AddMemoryRequest,
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Add new memory from messages"""
|
|
try:
|
|
messages = [msg.model_dump() for msg in request.messages]
|
|
|
|
memories = await service.add_memory(
|
|
messages=messages,
|
|
user_id=request.user_id,
|
|
agent_id=request.agent_id,
|
|
run_id=request.run_id,
|
|
metadata=request.metadata
|
|
)
|
|
|
|
formatted_memories = [format_memory_response(mem) for mem in memories]
|
|
|
|
return AddMemoryResponse(
|
|
status="success",
|
|
memories=formatted_memories,
|
|
message=f"Successfully added {len(formatted_memories)} memory(ies)"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding memory: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/memories/search",
|
|
response_model=SearchMemoryResponse,
|
|
summary="Search memories",
|
|
description="Search memories by semantic similarity",
|
|
responses={
|
|
200: {"description": "Search completed successfully"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def search_memories(
|
|
query: str = Query(..., description="Search query"),
|
|
user_id: str | None = Query(None, description="Filter by user ID"),
|
|
agent_id: str | None = Query(None, description="Filter by agent ID"),
|
|
run_id: str | None = Query(None, description="Filter by run ID"),
|
|
limit: int = Query(10, ge=1, le=100, description="Maximum results"),
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Search memories by query"""
|
|
try:
|
|
memories = await service.search_memories(
|
|
query=query,
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
limit=limit
|
|
)
|
|
|
|
formatted_memories = [format_memory_response(mem) for mem in memories]
|
|
|
|
return SearchMemoryResponse(
|
|
status="success",
|
|
memories=formatted_memories,
|
|
count=len(formatted_memories)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching memories: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/memories/{memory_id}",
|
|
response_model=MemoryResponse,
|
|
summary="Get specific memory",
|
|
description="Retrieve a specific memory by ID",
|
|
responses={
|
|
200: {"description": "Memory retrieved successfully"},
|
|
404: {"model": ErrorResponse, "description": "Memory not found"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def get_memory(
|
|
memory_id: str,
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Get specific memory by ID"""
|
|
try:
|
|
memory = await service.get_memory(memory_id)
|
|
|
|
if not memory:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Memory not found: {memory_id}"
|
|
)
|
|
|
|
return format_memory_response(memory)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting memory: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/memories/user/{user_id}",
|
|
response_model=List[MemoryResponse],
|
|
summary="Get user memories",
|
|
description="Get all memories for a specific user",
|
|
responses={
|
|
200: {"description": "Memories retrieved successfully"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def get_user_memories(
|
|
user_id: str,
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Get all memories for a user"""
|
|
try:
|
|
memories = await service.get_all_memories(user_id=user_id)
|
|
logger.info(f"Received {len(memories)} memories from service, types: {[type(m) for m in memories]}")
|
|
return [format_memory_response(mem) for mem in memories]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user memories: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.patch(
|
|
"/memories/{memory_id}",
|
|
response_model=MemoryResponse,
|
|
summary="Update memory",
|
|
description="Update an existing memory",
|
|
responses={
|
|
200: {"description": "Memory updated successfully"},
|
|
404: {"model": ErrorResponse, "description": "Memory not found"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def update_memory(
|
|
memory_id: str,
|
|
request: UpdateMemoryRequest,
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Update existing memory"""
|
|
try:
|
|
if not request.memory_text:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="memory_text is required for update"
|
|
)
|
|
|
|
updated = await service.update_memory(
|
|
memory_id=memory_id,
|
|
data=request.memory_text
|
|
)
|
|
|
|
return format_memory_response(updated)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating memory: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.delete(
|
|
"/memories/{memory_id}",
|
|
response_model=DeleteMemoryResponse,
|
|
summary="Delete memory",
|
|
description="Delete a specific memory",
|
|
responses={
|
|
200: {"description": "Memory deleted successfully"},
|
|
404: {"model": ErrorResponse, "description": "Memory not found"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def delete_memory(
|
|
memory_id: str,
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Delete specific memory"""
|
|
try:
|
|
await service.delete_memory(memory_id)
|
|
|
|
return DeleteMemoryResponse(
|
|
status="success",
|
|
message=f"Memory {memory_id} deleted successfully"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting memory: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.delete(
|
|
"/memories/user/{user_id}",
|
|
response_model=DeleteMemoryResponse,
|
|
summary="Delete user memories",
|
|
description="Delete all memories for a specific user",
|
|
responses={
|
|
200: {"description": "Memories deleted successfully"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def delete_user_memories(
|
|
user_id: str,
|
|
_api_key: str = Depends(verify_api_key),
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Delete all memories for a user"""
|
|
try:
|
|
await service.delete_all_memories(user_id=user_id)
|
|
|
|
return DeleteMemoryResponse(
|
|
status="success",
|
|
message=f"All memories for user {user_id} deleted successfully"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user memories: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/health",
|
|
response_model=HealthResponse,
|
|
summary="Health check",
|
|
description="Check API health status",
|
|
responses={
|
|
200: {"description": "Service is healthy"}
|
|
}
|
|
)
|
|
async def health_check(
|
|
service: MemoryService = Depends(get_memory_service)
|
|
):
|
|
"""Health check endpoint"""
|
|
try:
|
|
health = await service.health_check()
|
|
|
|
return HealthResponse(
|
|
status="healthy" if all(v == "healthy" for v in health.values()) else "degraded",
|
|
version="0.1.0",
|
|
timestamp=datetime.utcnow(),
|
|
dependencies=health
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return JSONResponse(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
content={
|
|
"status": "unhealthy",
|
|
"version": "0.1.0",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"error": str(e)
|
|
}
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/stats",
|
|
response_model=StatsResponse,
|
|
summary="Memory statistics",
|
|
description="Get system-wide memory statistics",
|
|
responses={
|
|
200: {"description": "Statistics retrieved successfully"},
|
|
401: {"model": ErrorResponse, "description": "Unauthorized"},
|
|
500: {"model": ErrorResponse, "description": "Internal server error"}
|
|
}
|
|
)
|
|
async def get_stats(
|
|
_api_key: str = Depends(verify_api_key)
|
|
):
|
|
"""Get memory statistics"""
|
|
# Note: This would require direct database access or extension of mem0 API
|
|
# For now, return placeholder
|
|
return StatsResponse(
|
|
total_memories=0,
|
|
total_users=0,
|
|
total_agents=0,
|
|
avg_memories_per_user=0.0,
|
|
oldest_memory=None,
|
|
newest_memory=None
|
|
)
|