Files
t6_mem0/test_api_simple.py
Docker Config Backup 8ea9fff334 PHASE 2 COMPLETE: REST API Implementation
 Fully functional FastAPI server with comprehensive features:

🏗️ Architecture:
- Complete API design documentation
- Modular structure (models, auth, service, main)
- OpenAPI/Swagger auto-documentation

🔧 Core Features:
- Memory CRUD endpoints (POST, GET, DELETE)
- User management and statistics
- Search functionality with filtering
- Admin endpoints with proper authorization

🔐 Security & Auth:
- API key authentication (Bearer token)
- Rate limiting (100 req/min configurable)
- Input validation with Pydantic models
- Comprehensive error handling

🧪 Testing:
- Comprehensive test suite with automated server lifecycle
- Simple test suite for quick validation
- All functionality verified and working

🐛 Fixes:
- Resolved Pydantic v2 compatibility (.dict() → .model_dump())
- Fixed missing dependencies (posthog, qdrant-client, vecs, ollama)
- Fixed mem0 package version metadata issues

📊 Performance:
- Async operations for scalability
- Request timing middleware
- Proper error boundaries
- Health monitoring endpoints

🎯 Status: Phase 2 100% complete - REST API fully functional

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-31 13:57:16 +02:00

111 lines
3.2 KiB
Python

#!/usr/bin/env python3
"""
Simple API test to verify basic functionality
"""
import requests
import time
import subprocess
import signal
import os
import sys
# Test configuration
API_BASE_URL = "http://localhost:8080"
API_KEY = "mem0_dev_key_123456789"
TEST_USER_ID = "simple_test_user"
def test_api_basic():
"""Simple API test to verify it's working"""
print("=" * 50)
print("🧪 SIMPLE MEM0 API TEST")
print("=" * 50)
# Start API server
print("🚀 Starting API server...")
env = os.environ.copy()
env.update({
"API_HOST": "localhost",
"API_PORT": "8080",
"API_KEYS": API_KEY,
"ADMIN_API_KEYS": "mem0_admin_key_111222333"
})
server_process = subprocess.Popen([
sys.executable, "start_api.py"
], env=env, cwd="/home/klas/mem0")
# Wait for server to start
print("⏳ Waiting for server...")
time.sleep(8)
try:
# Test health endpoint
print("🏥 Testing health endpoint...")
response = requests.get(f"{API_BASE_URL}/health", timeout=5)
if response.status_code == 200:
print(" ✅ Health endpoint working")
else:
print(f" ❌ Health endpoint failed: {response.status_code}")
return False
# Test authenticated status endpoint
print("🔐 Testing authenticated endpoint...")
headers = {"Authorization": f"Bearer {API_KEY}"}
response = requests.get(f"{API_BASE_URL}/status", headers=headers, timeout=5)
if response.status_code == 200:
print(" ✅ Authentication working")
else:
print(f" ❌ Authentication failed: {response.status_code}")
return False
# Test adding memory
print("🧠 Testing memory addition...")
memory_data = {
"messages": [
{"role": "user", "content": "I enjoy Python programming and building APIs"}
],
"user_id": TEST_USER_ID,
"metadata": {"source": "simple_test"}
}
response = requests.post(
f"{API_BASE_URL}/v1/memories",
headers=headers,
json=memory_data,
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get("success"):
print(" ✅ Memory addition working")
else:
print(f" ❌ Memory addition failed: {data}")
return False
else:
print(f" ❌ Memory addition failed: {response.status_code}")
try:
print(f" Error: {response.json()}")
except:
print(f" Raw response: {response.text}")
return False
print("\n🎉 Basic API tests passed!")
return True
except Exception as e:
print(f"❌ Test failed: {e}")
return False
finally:
# Stop server
print("🛑 Stopping server...")
server_process.terminate()
server_process.wait()
print("✅ Server stopped")
if __name__ == "__main__":
os.chdir("/home/klas/mem0")
success = test_api_basic()
sys.exit(0 if success else 1)