✅ Fully functional FastAPI server with comprehensive features: 🏗️ Architecture: - Complete API design documentation - Modular structure (models, auth, service, main) - OpenAPI/Swagger auto-documentation 🔧 Core Features: - Memory CRUD endpoints (POST, GET, DELETE) - User management and statistics - Search functionality with filtering - Admin endpoints with proper authorization 🔐 Security & Auth: - API key authentication (Bearer token) - Rate limiting (100 req/min configurable) - Input validation with Pydantic models - Comprehensive error handling 🧪 Testing: - Comprehensive test suite with automated server lifecycle - Simple test suite for quick validation - All functionality verified and working 🐛 Fixes: - Resolved Pydantic v2 compatibility (.dict() → .model_dump()) - Fixed missing dependencies (posthog, qdrant-client, vecs, ollama) - Fixed mem0 package version metadata issues 📊 Performance: - Async operations for scalability - Request timing middleware - Proper error boundaries - Health monitoring endpoints 🎯 Status: Phase 2 100% complete - REST API fully functional 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
111 lines
3.2 KiB
Python
111 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple API test to verify basic functionality
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import subprocess
|
|
import signal
|
|
import os
|
|
import sys
|
|
|
|
# Test configuration
|
|
API_BASE_URL = "http://localhost:8080"
|
|
API_KEY = "mem0_dev_key_123456789"
|
|
TEST_USER_ID = "simple_test_user"
|
|
|
|
def test_api_basic():
|
|
"""Simple API test to verify it's working"""
|
|
print("=" * 50)
|
|
print("🧪 SIMPLE MEM0 API TEST")
|
|
print("=" * 50)
|
|
|
|
# Start API server
|
|
print("🚀 Starting API server...")
|
|
env = os.environ.copy()
|
|
env.update({
|
|
"API_HOST": "localhost",
|
|
"API_PORT": "8080",
|
|
"API_KEYS": API_KEY,
|
|
"ADMIN_API_KEYS": "mem0_admin_key_111222333"
|
|
})
|
|
|
|
server_process = subprocess.Popen([
|
|
sys.executable, "start_api.py"
|
|
], env=env, cwd="/home/klas/mem0")
|
|
|
|
# Wait for server to start
|
|
print("⏳ Waiting for server...")
|
|
time.sleep(8)
|
|
|
|
try:
|
|
# Test health endpoint
|
|
print("🏥 Testing health endpoint...")
|
|
response = requests.get(f"{API_BASE_URL}/health", timeout=5)
|
|
if response.status_code == 200:
|
|
print(" ✅ Health endpoint working")
|
|
else:
|
|
print(f" ❌ Health endpoint failed: {response.status_code}")
|
|
return False
|
|
|
|
# Test authenticated status endpoint
|
|
print("🔐 Testing authenticated endpoint...")
|
|
headers = {"Authorization": f"Bearer {API_KEY}"}
|
|
response = requests.get(f"{API_BASE_URL}/status", headers=headers, timeout=5)
|
|
if response.status_code == 200:
|
|
print(" ✅ Authentication working")
|
|
else:
|
|
print(f" ❌ Authentication failed: {response.status_code}")
|
|
return False
|
|
|
|
# Test adding memory
|
|
print("🧠 Testing memory addition...")
|
|
memory_data = {
|
|
"messages": [
|
|
{"role": "user", "content": "I enjoy Python programming and building APIs"}
|
|
],
|
|
"user_id": TEST_USER_ID,
|
|
"metadata": {"source": "simple_test"}
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/v1/memories",
|
|
headers=headers,
|
|
json=memory_data,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get("success"):
|
|
print(" ✅ Memory addition working")
|
|
else:
|
|
print(f" ❌ Memory addition failed: {data}")
|
|
return False
|
|
else:
|
|
print(f" ❌ Memory addition failed: {response.status_code}")
|
|
try:
|
|
print(f" Error: {response.json()}")
|
|
except:
|
|
print(f" Raw response: {response.text}")
|
|
return False
|
|
|
|
print("\n🎉 Basic API tests passed!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed: {e}")
|
|
return False
|
|
|
|
finally:
|
|
# Stop server
|
|
print("🛑 Stopping server...")
|
|
server_process.terminate()
|
|
server_process.wait()
|
|
print("✅ Server stopped")
|
|
|
|
if __name__ == "__main__":
|
|
os.chdir("/home/klas/mem0")
|
|
success = test_api_basic()
|
|
sys.exit(0 if success else 1) |