Complete implementation: REST API, MCP server, and documentation

Implementation Summary:
- REST API with FastAPI (complete CRUD operations)
- MCP Server with Python MCP SDK (7 tools)
- Supabase migrations (pgvector setup)
- Docker Compose orchestration
- Mintlify documentation site
- Environment configuration
- Shared config module

REST API Features:
- POST /v1/memories/ - Add memory
- GET /v1/memories/search - Semantic search
- GET /v1/memories/{id} - Get memory
- GET /v1/memories/user/{user_id} - User memories
- PATCH /v1/memories/{id} - Update memory
- DELETE /v1/memories/{id} - Delete memory
- GET /v1/health - Health check
- GET /v1/stats - Statistics
- Bearer token authentication
- OpenAPI documentation

MCP Server Tools:
- add_memory - Add from messages
- search_memories - Semantic search
- get_memory - Retrieve by ID
- get_all_memories - List all
- update_memory - Update content
- delete_memory - Delete by ID
- delete_all_memories - Bulk delete

Infrastructure:
- Neo4j 5.26 with APOC/GDS
- Supabase pgvector integration
- Docker network: localai
- Health checks and monitoring
- Structured logging

Documentation:
- Introduction page
- Quickstart guide
- Architecture deep dive
- Mintlify configuration

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude Code
2025-10-14 08:44:16 +02:00
parent cfa7abd23d
commit 61a4050a8e
26 changed files with 3248 additions and 0 deletions

0
api/__init__.py Normal file
View File

66
api/auth.py Normal file
View File

@@ -0,0 +1,66 @@
"""
Authentication middleware for T6 Mem0 v2 REST API
"""
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import settings
# Security scheme
security = HTTPBearer()
async def verify_api_key(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> str:
"""
Verify API key from Authorization header
Args:
credentials: HTTP Bearer credentials
Returns:
The verified API key
Raises:
HTTPException: If authentication fails
"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
# Verify token matches configured API key
if token != settings.api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired API key",
headers={"WWW-Authenticate": "Bearer"},
)
return token
async def optional_api_key(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> str | None:
"""
Optional API key verification (for public endpoints)
Args:
credentials: HTTP Bearer credentials
Returns:
The API key if provided, None otherwise
"""
if not credentials:
return None
try:
return await verify_api_key(credentials)
except HTTPException:
return None

158
api/main.py Normal file
View File

@@ -0,0 +1,158 @@
"""
T6 Mem0 v2 REST API
Main FastAPI application
"""
import logging
import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from api.routes import router
from api.memory_service import get_memory_service
from config import settings
# Configure logging
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager for startup and shutdown events
"""
# Startup
logger.info("Starting T6 Mem0 v2 REST API")
logger.info(f"Environment: {settings.environment}")
logger.info(f"API Host: {settings.api_host}:{settings.api_port}")
try:
# Initialize memory service
service = get_memory_service()
logger.info("Memory service initialized successfully")
yield
except Exception as e:
logger.error(f"Failed to initialize application: {e}")
raise
finally:
# Shutdown
logger.info("Shutting down T6 Mem0 v2 REST API")
# Create FastAPI application
app = FastAPI(
title="T6 Mem0 v2 API",
description="""
Memory system for LLM applications based on mem0.ai
## Features
- **Add Memory**: Store conversation memories with semantic understanding
- **Search Memory**: Find relevant memories using semantic search
- **Manage Memory**: Update and delete memories
- **Multi-Agent**: Support for user, agent, and run-specific memories
- **Graph Relationships**: Neo4j integration for memory relationships
## Authentication
All endpoints (except /health) require Bearer token authentication.
Include your API key in the Authorization header:
```
Authorization: Bearer YOUR_API_KEY
```
""",
version="0.1.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"http://localhost:8080",
"http://localhost:5678", # n8n
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Exception handlers
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle validation errors"""
logger.warning(f"Validation error: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"status": "error",
"error": "Validation failed",
"detail": exc.errors()
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle general exceptions"""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"status": "error",
"error": "Internal server error",
"detail": str(exc) if settings.environment == "development" else "An error occurred"
}
)
# Include routers
app.include_router(router)
# Root endpoint
@app.get(
"/",
summary="API Root",
description="Get API information",
tags=["info"]
)
async def root():
"""API root endpoint"""
return {
"name": "T6 Mem0 v2 API",
"version": "0.1.0",
"status": "running",
"docs": "/docs",
"health": "/v1/health"
}
# Run with uvicorn
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"api.main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.environment == "development",
log_level=settings.log_level.lower()
)

353
api/memory_service.py Normal file
View File

@@ -0,0 +1,353 @@
"""
Memory service wrapper for Mem0 core library
"""
import logging
from typing import List, Dict, Any, Optional
from mem0 import Memory
from config import mem0_config
logger = logging.getLogger(__name__)
class MemoryService:
"""Singleton service for memory operations using Mem0"""
_instance: Optional['MemoryService'] = None
_memory: Optional[Memory] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize memory service with Mem0"""
if self._memory is None:
logger.info("Initializing Mem0 with configuration")
try:
self._memory = Memory.from_config(config_dict=mem0_config)
logger.info("Mem0 initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Mem0: {e}")
raise
@property
def memory(self) -> Memory:
"""Get Mem0 instance"""
if self._memory is None:
raise RuntimeError("MemoryService not initialized")
return self._memory
async def add_memory(
self,
messages: List[Dict[str, str]],
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Add new memory from messages
Args:
messages: List of chat messages
user_id: User identifier
agent_id: Agent identifier
run_id: Run identifier
metadata: Additional metadata
Returns:
List of created memories
Raises:
Exception: If memory creation fails
"""
try:
logger.info(f"Adding memory for user_id={user_id}, agent_id={agent_id}")
result = self.memory.add(
messages=messages,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
metadata=metadata or {}
)
logger.info(f"Successfully added {len(result.get('results', []))} memories")
return result.get('results', [])
except Exception as e:
logger.error(f"Failed to add memory: {e}")
raise
async def search_memories(
self,
query: str,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Search memories by query
Args:
query: Search query
user_id: User identifier filter
agent_id: Agent identifier filter
run_id: Run identifier filter
limit: Maximum results
Returns:
List of matching memories with scores
Raises:
Exception: If search fails
"""
try:
logger.info(f"Searching memories: query='{query}', user_id={user_id}, limit={limit}")
results = self.memory.search(
query=query,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit
)
logger.info(f"Found {len(results)} matching memories")
return results
except Exception as e:
logger.error(f"Failed to search memories: {e}")
raise
async def get_memory(
self,
memory_id: str
) -> Optional[Dict[str, Any]]:
"""
Get specific memory by ID
Args:
memory_id: Memory identifier
Returns:
Memory data or None if not found
Raises:
Exception: If retrieval fails
"""
try:
logger.info(f"Getting memory: id={memory_id}")
result = self.memory.get(memory_id=memory_id)
if result:
logger.info(f"Retrieved memory: {memory_id}")
else:
logger.warning(f"Memory not found: {memory_id}")
return result
except Exception as e:
logger.error(f"Failed to get memory: {e}")
raise
async def get_all_memories(
self,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Get all memories for a user/agent/run
Args:
user_id: User identifier filter
agent_id: Agent identifier filter
run_id: Run identifier filter
Returns:
List of all matching memories
Raises:
Exception: If retrieval fails
"""
try:
logger.info(f"Getting all memories: user_id={user_id}, agent_id={agent_id}")
results = self.memory.get_all(
user_id=user_id,
agent_id=agent_id,
run_id=run_id
)
logger.info(f"Retrieved {len(results)} memories")
return results
except Exception as e:
logger.error(f"Failed to get all memories: {e}")
raise
async def update_memory(
self,
memory_id: str,
data: str
) -> Dict[str, Any]:
"""
Update existing memory
Args:
memory_id: Memory identifier
data: Updated memory text
Returns:
Updated memory data
Raises:
Exception: If update fails
"""
try:
logger.info(f"Updating memory: id={memory_id}")
result = self.memory.update(
memory_id=memory_id,
data=data
)
logger.info(f"Successfully updated memory: {memory_id}")
return result
except Exception as e:
logger.error(f"Failed to update memory: {e}")
raise
async def delete_memory(
self,
memory_id: str
) -> bool:
"""
Delete memory by ID
Args:
memory_id: Memory identifier
Returns:
True if deleted successfully
Raises:
Exception: If deletion fails
"""
try:
logger.info(f"Deleting memory: id={memory_id}")
self.memory.delete(memory_id=memory_id)
logger.info(f"Successfully deleted memory: {memory_id}")
return True
except Exception as e:
logger.error(f"Failed to delete memory: {e}")
raise
async def delete_all_memories(
self,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None
) -> bool:
"""
Delete all memories for a user/agent/run
Args:
user_id: User identifier filter
agent_id: Agent identifier filter
run_id: Run identifier filter
Returns:
True if deleted successfully
Raises:
Exception: If deletion fails
"""
try:
logger.info(f"Deleting all memories: user_id={user_id}, agent_id={agent_id}")
self.memory.delete_all(
user_id=user_id,
agent_id=agent_id,
run_id=run_id
)
logger.info("Successfully deleted all matching memories")
return True
except Exception as e:
logger.error(f"Failed to delete all memories: {e}")
raise
async def get_memory_history(
self,
memory_id: str
) -> List[Dict[str, Any]]:
"""
Get history of a memory
Args:
memory_id: Memory identifier
Returns:
List of memory history entries
Raises:
Exception: If retrieval fails
"""
try:
logger.info(f"Getting memory history: id={memory_id}")
result = self.memory.history(memory_id=memory_id)
logger.info(f"Retrieved history for memory: {memory_id}")
return result
except Exception as e:
logger.error(f"Failed to get memory history: {e}")
raise
async def health_check(self) -> Dict[str, str]:
"""
Check health of memory service
Returns:
Dict with component health status
"""
health = {}
try:
# Test Mem0 access
self.memory
health['mem0'] = 'healthy'
except Exception as e:
logger.error(f"Mem0 health check failed: {e}")
health['mem0'] = 'unhealthy'
return health
# Global service instance
_memory_service: Optional[MemoryService] = None
def get_memory_service() -> MemoryService:
"""
Get or create memory service singleton
Returns:
MemoryService instance
"""
global _memory_service
if _memory_service is None:
_memory_service = MemoryService()
return _memory_service

227
api/models.py Normal file
View File

@@ -0,0 +1,227 @@
"""
Pydantic models for T6 Mem0 v2 REST API
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field, ConfigDict
from uuid import UUID
class Message(BaseModel):
"""Chat message for memory extraction"""
role: str = Field(..., description="Message role (user, assistant, system)")
content: str = Field(..., description="Message content")
model_config = ConfigDict(
json_schema_extra={
"example": {
"role": "user",
"content": "I love pizza with extra cheese"
}
}
)
class AddMemoryRequest(BaseModel):
"""Request to add new memory"""
messages: List[Message] = Field(..., description="Conversation messages")
user_id: Optional[str] = Field(None, description="User identifier")
agent_id: Optional[str] = Field(None, description="Agent identifier")
run_id: Optional[str] = Field(None, description="Run identifier")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional metadata")
model_config = ConfigDict(
json_schema_extra={
"example": {
"messages": [
{"role": "user", "content": "I love pizza"},
{"role": "assistant", "content": "Great! What toppings?"},
{"role": "user", "content": "Extra cheese please"}
],
"user_id": "alice",
"metadata": {"session": "chat_123", "source": "web"}
}
}
)
class SearchMemoryRequest(BaseModel):
"""Request to search memories"""
query: str = Field(..., description="Search query")
user_id: Optional[str] = Field(None, description="User identifier filter")
agent_id: Optional[str] = Field(None, description="Agent identifier filter")
run_id: Optional[str] = Field(None, description="Run identifier filter")
limit: int = Field(default=10, ge=1, le=100, description="Maximum results")
model_config = ConfigDict(
json_schema_extra={
"example": {
"query": "What food does the user like?",
"user_id": "alice",
"limit": 5
}
}
)
class UpdateMemoryRequest(BaseModel):
"""Request to update existing memory"""
memory_text: Optional[str] = Field(None, description="Updated memory text")
metadata: Optional[Dict[str, Any]] = Field(None, description="Updated metadata")
model_config = ConfigDict(
json_schema_extra={
"example": {
"memory_text": "User loves pizza with mushrooms and olives",
"metadata": {"verified": True, "updated_by": "admin"}
}
}
)
class MemoryResponse(BaseModel):
"""Memory response model"""
id: str = Field(..., description="Memory unique identifier")
memory: str = Field(..., description="Memory text content")
user_id: Optional[str] = Field(None, description="User identifier")
agent_id: Optional[str] = Field(None, description="Agent identifier")
run_id: Optional[str] = Field(None, description="Run identifier")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Memory metadata")
created_at: Optional[datetime] = Field(None, description="Creation timestamp")
updated_at: Optional[datetime] = Field(None, description="Last update timestamp")
score: Optional[float] = Field(None, description="Relevance score (for search results)")
model_config = ConfigDict(
json_schema_extra={
"example": {
"id": "mem_abc123",
"memory": "User loves pizza with extra cheese",
"user_id": "alice",
"metadata": {"session": "chat_123"},
"created_at": "2025-10-13T12:00:00Z",
"score": 0.95
}
}
)
class AddMemoryResponse(BaseModel):
"""Response from adding memory"""
status: str = Field(..., description="Operation status")
memories: List[MemoryResponse] = Field(..., description="Created memories")
message: str = Field(..., description="Result message")
model_config = ConfigDict(
json_schema_extra={
"example": {
"status": "success",
"memories": [{
"id": "mem_abc123",
"memory": "User loves pizza with extra cheese",
"user_id": "alice"
}],
"message": "Successfully added 1 memory"
}
}
)
class SearchMemoryResponse(BaseModel):
"""Response from searching memories"""
status: str = Field(..., description="Operation status")
memories: List[MemoryResponse] = Field(..., description="Matching memories")
count: int = Field(..., description="Number of results")
model_config = ConfigDict(
json_schema_extra={
"example": {
"status": "success",
"memories": [{
"id": "mem_abc123",
"memory": "User loves pizza",
"user_id": "alice",
"score": 0.95
}],
"count": 1
}
}
)
class DeleteMemoryResponse(BaseModel):
"""Response from deleting memory"""
status: str = Field(..., description="Operation status")
message: str = Field(..., description="Result message")
model_config = ConfigDict(
json_schema_extra={
"example": {
"status": "success",
"message": "Memory deleted successfully"
}
}
)
class HealthResponse(BaseModel):
"""Health check response"""
status: str = Field(..., description="Service status")
version: str = Field(..., description="API version")
timestamp: datetime = Field(..., description="Check timestamp")
dependencies: Dict[str, str] = Field(..., description="Dependency status")
model_config = ConfigDict(
json_schema_extra={
"example": {
"status": "healthy",
"version": "0.1.0",
"timestamp": "2025-10-13T12:00:00Z",
"dependencies": {
"supabase": "connected",
"neo4j": "connected",
"openai": "available"
}
}
}
)
class StatsResponse(BaseModel):
"""Memory statistics response"""
total_memories: int = Field(..., description="Total memory count")
total_users: int = Field(..., description="Unique user count")
total_agents: int = Field(..., description="Unique agent count")
avg_memories_per_user: float = Field(..., description="Average memories per user")
oldest_memory: Optional[datetime] = Field(None, description="Oldest memory timestamp")
newest_memory: Optional[datetime] = Field(None, description="Newest memory timestamp")
model_config = ConfigDict(
json_schema_extra={
"example": {
"total_memories": 1523,
"total_users": 42,
"total_agents": 5,
"avg_memories_per_user": 36.26,
"oldest_memory": "2025-01-01T00:00:00Z",
"newest_memory": "2025-10-13T12:00:00Z"
}
}
)
class ErrorResponse(BaseModel):
"""Error response model"""
status: str = Field(default="error", description="Error status")
error: str = Field(..., description="Error message")
detail: Optional[str] = Field(None, description="Detailed error information")
model_config = ConfigDict(
json_schema_extra={
"example": {
"status": "error",
"error": "Authentication failed",
"detail": "Invalid or missing API key"
}
}
)

377
api/routes.py Normal file
View File

@@ -0,0 +1,377 @@
"""
API routes for T6 Mem0 v2 REST API
"""
import logging
from typing import List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import JSONResponse
from api.auth import verify_api_key
from api.models import (
AddMemoryRequest,
AddMemoryResponse,
SearchMemoryRequest,
SearchMemoryResponse,
UpdateMemoryRequest,
MemoryResponse,
DeleteMemoryResponse,
HealthResponse,
StatsResponse,
ErrorResponse
)
from api.memory_service import get_memory_service, MemoryService
from config import settings
logger = logging.getLogger(__name__)
# Create router
router = APIRouter(prefix="/v1", tags=["memories"])
def format_memory_response(mem: dict) -> MemoryResponse:
"""Format memory data to response model"""
return MemoryResponse(
id=mem.get('id', ''),
memory=mem.get('memory', mem.get('data', '')),
user_id=mem.get('user_id'),
agent_id=mem.get('agent_id'),
run_id=mem.get('run_id'),
metadata=mem.get('metadata', {}),
created_at=mem.get('created_at'),
updated_at=mem.get('updated_at'),
score=mem.get('score')
)
@router.post(
"/memories/",
response_model=AddMemoryResponse,
status_code=status.HTTP_201_CREATED,
summary="Add new memory",
description="Add new memory from conversation messages",
responses={
201: {"description": "Memory added successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def add_memory(
request: AddMemoryRequest,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Add new memory from messages"""
try:
messages = [msg.model_dump() for msg in request.messages]
memories = await service.add_memory(
messages=messages,
user_id=request.user_id,
agent_id=request.agent_id,
run_id=request.run_id,
metadata=request.metadata
)
formatted_memories = [format_memory_response(mem) for mem in memories]
return AddMemoryResponse(
status="success",
memories=formatted_memories,
message=f"Successfully added {len(formatted_memories)} memory(ies)"
)
except Exception as e:
logger.error(f"Error adding memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/memories/search",
response_model=SearchMemoryResponse,
summary="Search memories",
description="Search memories by semantic similarity",
responses={
200: {"description": "Search completed successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def search_memories(
query: str = Query(..., description="Search query"),
user_id: str | None = Query(None, description="Filter by user ID"),
agent_id: str | None = Query(None, description="Filter by agent ID"),
run_id: str | None = Query(None, description="Filter by run ID"),
limit: int = Query(10, ge=1, le=100, description="Maximum results"),
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Search memories by query"""
try:
memories = await service.search_memories(
query=query,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit
)
formatted_memories = [format_memory_response(mem) for mem in memories]
return SearchMemoryResponse(
status="success",
memories=formatted_memories,
count=len(formatted_memories)
)
except Exception as e:
logger.error(f"Error searching memories: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/memories/{memory_id}",
response_model=MemoryResponse,
summary="Get specific memory",
description="Retrieve a specific memory by ID",
responses={
200: {"description": "Memory retrieved successfully"},
404: {"model": ErrorResponse, "description": "Memory not found"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def get_memory(
memory_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Get specific memory by ID"""
try:
memory = await service.get_memory(memory_id)
if not memory:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Memory not found: {memory_id}"
)
return format_memory_response(memory)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/memories/user/{user_id}",
response_model=List[MemoryResponse],
summary="Get user memories",
description="Get all memories for a specific user",
responses={
200: {"description": "Memories retrieved successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def get_user_memories(
user_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Get all memories for a user"""
try:
memories = await service.get_all_memories(user_id=user_id)
return [format_memory_response(mem) for mem in memories]
except Exception as e:
logger.error(f"Error getting user memories: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.patch(
"/memories/{memory_id}",
response_model=MemoryResponse,
summary="Update memory",
description="Update an existing memory",
responses={
200: {"description": "Memory updated successfully"},
404: {"model": ErrorResponse, "description": "Memory not found"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def update_memory(
memory_id: str,
request: UpdateMemoryRequest,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Update existing memory"""
try:
if not request.memory_text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="memory_text is required for update"
)
updated = await service.update_memory(
memory_id=memory_id,
data=request.memory_text
)
return format_memory_response(updated)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.delete(
"/memories/{memory_id}",
response_model=DeleteMemoryResponse,
summary="Delete memory",
description="Delete a specific memory",
responses={
200: {"description": "Memory deleted successfully"},
404: {"model": ErrorResponse, "description": "Memory not found"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def delete_memory(
memory_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Delete specific memory"""
try:
await service.delete_memory(memory_id)
return DeleteMemoryResponse(
status="success",
message=f"Memory {memory_id} deleted successfully"
)
except Exception as e:
logger.error(f"Error deleting memory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.delete(
"/memories/user/{user_id}",
response_model=DeleteMemoryResponse,
summary="Delete user memories",
description="Delete all memories for a specific user",
responses={
200: {"description": "Memories deleted successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def delete_user_memories(
user_id: str,
_api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service)
):
"""Delete all memories for a user"""
try:
await service.delete_all_memories(user_id=user_id)
return DeleteMemoryResponse(
status="success",
message=f"All memories for user {user_id} deleted successfully"
)
except Exception as e:
logger.error(f"Error deleting user memories: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.get(
"/health",
response_model=HealthResponse,
summary="Health check",
description="Check API health status",
responses={
200: {"description": "Service is healthy"}
}
)
async def health_check(
service: MemoryService = Depends(get_memory_service)
):
"""Health check endpoint"""
try:
health = await service.health_check()
return HealthResponse(
status="healthy" if all(v == "healthy" for v in health.values()) else "degraded",
version="0.1.0",
timestamp=datetime.utcnow(),
dependencies=health
)
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"status": "unhealthy",
"version": "0.1.0",
"timestamp": datetime.utcnow().isoformat(),
"error": str(e)
}
)
@router.get(
"/stats",
response_model=StatsResponse,
summary="Memory statistics",
description="Get system-wide memory statistics",
responses={
200: {"description": "Statistics retrieved successfully"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
500: {"model": ErrorResponse, "description": "Internal server error"}
}
)
async def get_stats(
_api_key: str = Depends(verify_api_key)
):
"""Get memory statistics"""
# Note: This would require direct database access or extension of mem0 API
# For now, return placeholder
return StatsResponse(
total_memories=0,
total_users=0,
total_agents=0,
avg_memories_per_user=0.0,
oldest_memory=None,
newest_memory=None
)