Files
t66_langmem/test_api.py
Docker Config Backup 46faa78237 Initial commit: LangMem fact-based AI memory system with docs and MCP integration
- Complete fact-based memory API with mem0-inspired approach
- Individual fact extraction and deduplication
- ADD/UPDATE/DELETE memory actions
- Precision search with 0.86+ similarity scores
- MCP server for Claude Code integration
- Neo4j graph relationships and PostgreSQL vector storage
- Comprehensive documentation with architecture and API docs
- Matrix communication integration
- Production-ready Docker setup with Ollama and Supabase

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 13:16:19 +02:00

169 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
Test the LangMem API endpoints
"""
import asyncio
import httpx
import json
from uuid import uuid4
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
TEST_USER_ID = f"test_user_{uuid4()}"
async def test_api_endpoints():
"""Test all API endpoints"""
print("🧪 Testing LangMem API Endpoints")
print("=" * 50)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
# Test 1: Root endpoint
print("\n1. Testing root endpoint...")
try:
response = await client.get(f"{API_BASE_URL}/")
print(f"✅ Root endpoint: {response.status_code}")
print(f" Response: {response.json()}")
except Exception as e:
print(f"❌ Root endpoint failed: {e}")
# Test 2: Health check
print("\n2. Testing health check...")
try:
response = await client.get(f"{API_BASE_URL}/health")
print(f"✅ Health check: {response.status_code}")
data = response.json()
print(f" Overall status: {data.get('status')}")
for service, status in data.get('services', {}).items():
print(f" {service}: {status}")
except Exception as e:
print(f"❌ Health check failed: {e}")
# Test 3: Store memory
print("\n3. Testing memory storage...")
try:
memory_data = {
"content": "FastAPI is a modern web framework for building APIs with Python",
"user_id": TEST_USER_ID,
"session_id": "test_session_1",
"metadata": {
"category": "programming",
"language": "python",
"framework": "fastapi"
}
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=memory_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
memory_id = data["id"]
print(f"✅ Memory stored successfully: {memory_id}")
print(f" Status: {data['status']}")
else:
print(f"❌ Memory storage failed: {response.status_code}")
print(f" Response: {response.text}")
return
except Exception as e:
print(f"❌ Memory storage failed: {e}")
return
# Test 4: Search memories
print("\n4. Testing memory search...")
try:
search_data = {
"query": "Python web framework",
"user_id": TEST_USER_ID,
"limit": 5,
"threshold": 0.5
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json=search_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ Memory search successful")
print(f" Found {data['total_count']} memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Memory search failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Memory search failed: {e}")
# Test 5: Retrieve memories for conversation
print("\n5. Testing memory retrieval...")
try:
retrieve_data = {
"messages": [
{"role": "user", "content": "I want to learn about web development"},
{"role": "assistant", "content": "Great! What technology are you interested in?"},
{"role": "user", "content": "I heard Python is good for web APIs"}
],
"user_id": TEST_USER_ID,
"session_id": "test_session_1"
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/retrieve",
json=retrieve_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ Memory retrieval successful")
print(f" Retrieved {data['total_count']} relevant memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Memory retrieval failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Memory retrieval failed: {e}")
# Test 6: Get user memories
print("\n6. Testing user memory listing...")
try:
response = await client.get(
f"{API_BASE_URL}/v1/memories/users/{TEST_USER_ID}",
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ User memory listing successful")
print(f" User has {data['total_count']} memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (created: {memory['created_at'][:19]})")
else:
print(f"❌ User memory listing failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ User memory listing failed: {e}")
print("\n" + "=" * 50)
print("🎉 API Testing Complete!")
if __name__ == "__main__":
asyncio.run(test_api_endpoints())