Files
t66_langmem/tests/test_api.py
Docker Config Backup f0db3e5546 Clean and organize project structure
Major reorganization:
- Created scripts/ directory for all utility scripts
- Created config/ directory for configuration files
- Moved all test files to tests/ directory
- Updated all script paths to work with new structure
- Updated README.md with new project structure diagram

New structure:
├── src/          # Source code (API + MCP)
├── scripts/      # Utility scripts (start-*.sh, docs_server.py, etc.)
├── tests/        # All test files and debug utilities
├── config/       # Configuration files (JSON, Caddy config)
├── docs/         # Documentation website
└── logs/         # Log files

All scripts updated to use relative paths from project root.
Documentation updated with new folder structure.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 14:11:08 +02:00

169 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
Test the LangMem API endpoints
"""
import asyncio
import httpx
import json
from uuid import uuid4
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
TEST_USER_ID = f"test_user_{uuid4()}"
async def test_api_endpoints():
"""Test all API endpoints"""
print("🧪 Testing LangMem API Endpoints")
print("=" * 50)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
# Test 1: Root endpoint
print("\n1. Testing root endpoint...")
try:
response = await client.get(f"{API_BASE_URL}/")
print(f"✅ Root endpoint: {response.status_code}")
print(f" Response: {response.json()}")
except Exception as e:
print(f"❌ Root endpoint failed: {e}")
# Test 2: Health check
print("\n2. Testing health check...")
try:
response = await client.get(f"{API_BASE_URL}/health")
print(f"✅ Health check: {response.status_code}")
data = response.json()
print(f" Overall status: {data.get('status')}")
for service, status in data.get('services', {}).items():
print(f" {service}: {status}")
except Exception as e:
print(f"❌ Health check failed: {e}")
# Test 3: Store memory
print("\n3. Testing memory storage...")
try:
memory_data = {
"content": "FastAPI is a modern web framework for building APIs with Python",
"user_id": TEST_USER_ID,
"session_id": "test_session_1",
"metadata": {
"category": "programming",
"language": "python",
"framework": "fastapi"
}
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=memory_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
memory_id = data["id"]
print(f"✅ Memory stored successfully: {memory_id}")
print(f" Status: {data['status']}")
else:
print(f"❌ Memory storage failed: {response.status_code}")
print(f" Response: {response.text}")
return
except Exception as e:
print(f"❌ Memory storage failed: {e}")
return
# Test 4: Search memories
print("\n4. Testing memory search...")
try:
search_data = {
"query": "Python web framework",
"user_id": TEST_USER_ID,
"limit": 5,
"threshold": 0.5
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json=search_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ Memory search successful")
print(f" Found {data['total_count']} memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Memory search failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Memory search failed: {e}")
# Test 5: Retrieve memories for conversation
print("\n5. Testing memory retrieval...")
try:
retrieve_data = {
"messages": [
{"role": "user", "content": "I want to learn about web development"},
{"role": "assistant", "content": "Great! What technology are you interested in?"},
{"role": "user", "content": "I heard Python is good for web APIs"}
],
"user_id": TEST_USER_ID,
"session_id": "test_session_1"
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/retrieve",
json=retrieve_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ Memory retrieval successful")
print(f" Retrieved {data['total_count']} relevant memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Memory retrieval failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Memory retrieval failed: {e}")
# Test 6: Get user memories
print("\n6. Testing user memory listing...")
try:
response = await client.get(
f"{API_BASE_URL}/v1/memories/users/{TEST_USER_ID}",
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ User memory listing successful")
print(f" User has {data['total_count']} memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (created: {memory['created_at'][:19]})")
else:
print(f"❌ User memory listing failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ User memory listing failed: {e}")
print("\n" + "=" * 50)
print("🎉 API Testing Complete!")
if __name__ == "__main__":
asyncio.run(test_api_endpoints())