Clean and organize project structure

Major reorganization:
- Created scripts/ directory for all utility scripts
- Created config/ directory for configuration files
- Moved all test files to tests/ directory
- Updated all script paths to work with new structure
- Updated README.md with new project structure diagram

New structure:
├── src/          # Source code (API + MCP)
├── scripts/      # Utility scripts (start-*.sh, docs_server.py, etc.)
├── tests/        # All test files and debug utilities
├── config/       # Configuration files (JSON, Caddy config)
├── docs/         # Documentation website
└── logs/         # Log files

All scripts updated to use relative paths from project root.
Documentation updated with new folder structure.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Docker Config Backup
2025-07-17 14:11:08 +02:00
parent b74c7d79ca
commit f0db3e5546
25 changed files with 209 additions and 383 deletions

90
tests/check_neo4j_data.py Normal file
View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
Check what data is stored in Neo4j
"""
import asyncio
from neo4j import AsyncGraphDatabase
import json
# Configuration
NEO4J_URL = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "langmem_neo4j_password"
async def check_neo4j_data():
"""Check Neo4j data stored by LangMem"""
print("🔍 Checking Neo4j Data from LangMem")
print("=" * 50)
try:
driver = AsyncGraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USER, NEO4J_PASSWORD))
async with driver.session() as session:
# Check all nodes
print("1. All nodes in the database:")
result = await session.run("MATCH (n) RETURN labels(n) as labels, count(n) as count")
async for record in result:
print(f" {record['labels']}: {record['count']}")
# Check Memory nodes
print("\n2. Memory nodes:")
result = await session.run("MATCH (m:Memory) RETURN m.id as id, m.created_at as created_at")
async for record in result:
print(f" Memory ID: {record['id']}")
print(f" Created: {record['created_at']}")
# Check Entity nodes
print("\n3. Entity nodes:")
result = await session.run("MATCH (e:Entity) RETURN e.name as name, e.type as type, e.properties_json as props")
async for record in result:
print(f" Entity: {record['name']} ({record['type']})")
if record['props']:
try:
props = json.loads(record['props'])
print(f" Properties: {props}")
except:
print(f" Properties: {record['props']}")
# Check relationships
print("\n4. Relationships:")
result = await session.run("""
MATCH (m:Memory)-[r:RELATES_TO]->(e:Entity)
RETURN m.id as memory_id, r.relationship as relationship,
e.name as entity_name, r.confidence as confidence
""")
async for record in result:
print(f" {record['memory_id'][:8]}... {record['relationship']} {record['entity_name']} (confidence: {record['confidence']})")
# Full graph visualization query
print("\n5. Full graph structure:")
result = await session.run("""
MATCH (m:Memory)-[r:RELATES_TO]->(e:Entity)
RETURN m.id as memory_id,
r.relationship as relationship,
e.name as entity_name,
e.type as entity_type,
r.confidence as confidence
ORDER BY r.confidence DESC
""")
print(" Graph relationships (Memory → Entity):")
async for record in result:
print(f" {record['memory_id'][:8]}... →[{record['relationship']}]→ {record['entity_name']} ({record['entity_type']}) [{record['confidence']}]")
await driver.close()
print("\n" + "=" * 50)
print("✅ Neo4j data check complete!")
print("🌐 Neo4j Browser: http://localhost:7474")
print(" Username: neo4j")
print(" Password: langmem_neo4j_password")
print("\n💡 Try these Cypher queries in Neo4j Browser:")
print(" MATCH (n) RETURN n")
print(" MATCH (m:Memory)-[r:RELATES_TO]->(e:Entity) RETURN m, r, e")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
asyncio.run(check_neo4j_data())

View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
Clear all databases - PostgreSQL (vector) and Neo4j (graph)
"""
import asyncio
import asyncpg
from neo4j import AsyncGraphDatabase
# Configuration
SUPABASE_DB_URL = "postgresql://postgres:CzkaYmRvc26Y@localhost:5435/postgres"
NEO4J_URL = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "langmem_neo4j_password"
async def clear_postgresql():
"""Clear PostgreSQL database completely"""
print("🧹 Clearing PostgreSQL database...")
try:
conn = await asyncpg.connect(SUPABASE_DB_URL)
# Drop all tables and extensions
await conn.execute("DROP SCHEMA public CASCADE;")
await conn.execute("CREATE SCHEMA public;")
await conn.execute("GRANT ALL ON SCHEMA public TO postgres;")
await conn.execute("GRANT ALL ON SCHEMA public TO public;")
print(" ✅ PostgreSQL database cleared completely")
await conn.close()
return True
except Exception as e:
print(f" ❌ Error clearing PostgreSQL: {e}")
return False
async def clear_neo4j():
"""Clear Neo4j database completely"""
print("🧹 Clearing Neo4j database...")
try:
driver = AsyncGraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USER, NEO4J_PASSWORD))
async with driver.session() as session:
# Delete all nodes and relationships
await session.run("MATCH (n) DETACH DELETE n")
# Verify it's empty
result = await session.run("MATCH (n) RETURN count(n) as count")
record = await result.single()
node_count = record['count']
print(f" ✅ Neo4j database cleared completely (nodes: {node_count})")
await driver.close()
return True
except Exception as e:
print(f" ❌ Error clearing Neo4j: {e}")
return False
async def restart_langmem_api():
"""Restart LangMem API to recreate tables"""
print("🔄 Restarting LangMem API to recreate tables...")
import subprocess
try:
# Restart the API container
result = subprocess.run(
["docker", "compose", "restart", "langmem-api"],
cwd="/home/klas/langmem-project",
capture_output=True,
text=True
)
if result.returncode == 0:
print(" ✅ LangMem API restarted successfully")
# Wait for API to be ready
await asyncio.sleep(3)
# Check API health
import httpx
async with httpx.AsyncClient() as client:
try:
response = await client.get("http://localhost:8765/health", timeout=10.0)
if response.status_code == 200:
data = response.json()
print(f" ✅ API health status: {data['status']}")
return True
else:
print(f" ⚠️ API health check returned: {response.status_code}")
return False
except Exception as e:
print(f" ⚠️ API health check failed: {e}")
return False
else:
print(f" ❌ Failed to restart API: {result.stderr}")
return False
except Exception as e:
print(f" ❌ Error restarting API: {e}")
return False
async def main():
"""Main function to clear all databases"""
print("🚀 Clearing All LangMem Databases")
print("=" * 50)
# Clear PostgreSQL
postgres_cleared = await clear_postgresql()
# Clear Neo4j
neo4j_cleared = await clear_neo4j()
# Restart API to recreate tables
api_restarted = await restart_langmem_api()
# Summary
print("\n" + "=" * 50)
print("📊 Database Clear Summary:")
print(f" PostgreSQL: {'✅ CLEARED' if postgres_cleared else '❌ FAILED'}")
print(f" Neo4j: {'✅ CLEARED' if neo4j_cleared else '❌ FAILED'}")
print(f" API Restart: {'✅ SUCCESS' if api_restarted else '❌ FAILED'}")
if all([postgres_cleared, neo4j_cleared, api_restarted]):
print("\n🎉 All databases cleared successfully!")
print(" Ready for fresh data storage")
return True
else:
print("\n⚠️ Some operations failed - check logs above")
return False
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Debug the fact extraction system
"""
import asyncio
import sys
import os
# Add the API directory to the path
sys.path.insert(0, '/home/klas/langmem-project/src/api')
from fact_extraction import FactExtractor
async def debug_fact_extraction():
"""Debug fact extraction"""
print("🔍 Debugging Fact Extraction System")
print("=" * 50)
# Test content
test_content = "Ondrej has a son named Cyril who is 8 years old and loves playing soccer. Cyril goes to elementary school in Prague and his favorite color is blue. Ondrej works as a software engineer and lives in Czech Republic."
print(f"Content to extract facts from:")
print(f"'{test_content}'")
print()
# Create fact extractor
extractor = FactExtractor()
print("1. Testing fact extraction...")
facts = await extractor.extract_facts(test_content)
print(f"Extracted {len(facts)} facts:")
for i, fact in enumerate(facts, 1):
print(f" {i}. {fact}")
if not facts:
print("❌ No facts extracted - there might be an issue with the extraction system")
return False
else:
print("✅ Fact extraction working!")
print()
# Test memory action determination
print("2. Testing memory action determination...")
existing_memories = [
{
"id": "test-memory-1",
"content": "Ondrej has a son named Cyril",
"similarity": 0.8
}
]
new_fact = "Ondrej has a son named Cyril who is 8 years old"
action_data = await extractor.determine_memory_action(new_fact, existing_memories)
print(f"New fact: '{new_fact}'")
print(f"Action: {action_data.get('action', 'unknown')}")
print(f"Reason: {action_data.get('reason', 'no reason')}")
return True
async def main():
"""Main function"""
try:
success = await debug_fact_extraction()
if success:
print("\n🎉 Fact extraction debugging complete!")
else:
print("\n❌ Fact extraction has issues that need to be fixed.")
except Exception as e:
print(f"❌ Error during debugging: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
Debug Neo4j relationships to see what's happening
"""
import asyncio
from neo4j import AsyncGraphDatabase
# Configuration
NEO4J_URL = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "langmem_neo4j_password"
async def debug_neo4j_relationships():
"""Debug Neo4j relationships"""
print("🔍 Debugging Neo4j Relationships")
print("=" * 50)
try:
driver = AsyncGraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USER, NEO4J_PASSWORD))
async with driver.session() as session:
# Check all relationship types
print("1. All relationship types in database:")
result = await session.run("CALL db.relationshipTypes()")
async for record in result:
print(f" - {record[0]}")
# Check all relationships
print("\n2. All relationships:")
result = await session.run("MATCH ()-[r]->() RETURN type(r) as rel_type, count(r) as count")
relationship_count = 0
async for record in result:
print(f" {record['rel_type']}: {record['count']}")
relationship_count += record['count']
if relationship_count == 0:
print(" No relationships found!")
# Check MENTIONS relationships specifically
print("\n3. MENTIONS relationships:")
result = await session.run("MATCH (m:Memory)-[r:MENTIONS]->(e:Entity) RETURN m.id, e.name, e.type")
async for record in result:
print(f" Memory {record['m.id'][:8]}... MENTIONS {record['e.name']} ({record['e.type']})")
# Check all relationships with details
print("\n4. All relationships with details:")
result = await session.run("""
MATCH (a)-[r]->(b)
RETURN labels(a)[0] as source_label,
coalesce(a.name, a.id) as source_name,
type(r) as relationship,
labels(b)[0] as target_label,
coalesce(b.name, b.id) as target_name,
r.confidence as confidence
ORDER BY relationship
""")
async for record in result:
print(f" {record['source_label']} '{record['source_name'][:20]}...' ")
print(f" →[{record['relationship']}]→ ")
print(f" {record['target_label']} '{record['target_name'][:20]}...' (conf: {record['confidence']})")
print()
# Check if there are any dynamic relationships
print("\n5. Looking for dynamic relationships (non-MENTIONS):")
result = await session.run("""
MATCH (a)-[r]->(b)
WHERE type(r) <> 'MENTIONS'
RETURN type(r) as rel_type, count(r) as count
""")
found_dynamic = False
async for record in result:
print(f" {record['rel_type']}: {record['count']}")
found_dynamic = True
if not found_dynamic:
print(" No dynamic relationships found!")
print(" This suggests the AI relationship creation might have issues.")
await driver.close()
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
asyncio.run(debug_neo4j_relationships())

186
tests/populate_test_data.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Populate LangMem with test data for Supabase web UI viewing
"""
import asyncio
import httpx
import json
from datetime import datetime
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
# Test memories to store
test_memories = [
{
"content": "Claude Code is an AI-powered CLI tool that helps with software development tasks. It can read files, search codebases, and generate code.",
"user_id": "demo_user",
"session_id": "demo_session_1",
"metadata": {
"category": "tools",
"subcategory": "ai_development",
"importance": "high",
"tags": ["claude", "ai", "cli", "development"]
}
},
{
"content": "FastAPI is a modern, fast web framework for building APIs with Python. It provides automatic API documentation and type hints.",
"user_id": "demo_user",
"session_id": "demo_session_1",
"metadata": {
"category": "frameworks",
"subcategory": "python_web",
"importance": "medium",
"tags": ["fastapi", "python", "web", "api"]
}
},
{
"content": "Docker containers provide lightweight virtualization for applications. They package software with all dependencies for consistent deployment.",
"user_id": "demo_user",
"session_id": "demo_session_2",
"metadata": {
"category": "devops",
"subcategory": "containerization",
"importance": "high",
"tags": ["docker", "containers", "devops", "deployment"]
}
},
{
"content": "PostgreSQL with pgvector extension enables vector similarity search for embeddings. This is useful for semantic search and AI applications.",
"user_id": "demo_user",
"session_id": "demo_session_2",
"metadata": {
"category": "databases",
"subcategory": "vector_search",
"importance": "high",
"tags": ["postgresql", "pgvector", "embeddings", "search"]
}
},
{
"content": "N8N is an open-source workflow automation tool that connects different services and APIs. It provides a visual interface for building workflows.",
"user_id": "demo_user",
"session_id": "demo_session_3",
"metadata": {
"category": "automation",
"subcategory": "workflow_tools",
"importance": "medium",
"tags": ["n8n", "automation", "workflow", "integration"]
}
},
{
"content": "Ollama runs large language models locally on your machine. It supports models like Llama, Mistral, and provides embedding capabilities.",
"user_id": "demo_user",
"session_id": "demo_session_3",
"metadata": {
"category": "ai",
"subcategory": "local_models",
"importance": "high",
"tags": ["ollama", "llm", "local", "embeddings"]
}
},
{
"content": "Supabase is an open-source Firebase alternative that provides database, authentication, and real-time subscriptions with PostgreSQL.",
"user_id": "demo_user",
"session_id": "demo_session_4",
"metadata": {
"category": "backend",
"subcategory": "baas",
"importance": "medium",
"tags": ["supabase", "database", "authentication", "backend"]
}
},
{
"content": "Neo4j is a graph database that stores data as nodes and relationships. It's excellent for modeling complex relationships and network data.",
"user_id": "demo_user",
"session_id": "demo_session_4",
"metadata": {
"category": "databases",
"subcategory": "graph_database",
"importance": "medium",
"tags": ["neo4j", "graph", "relationships", "cypher"]
}
}
]
async def store_test_memories():
"""Store test memories in LangMem API"""
print("🧪 Populating LangMem with test data...")
print("=" * 50)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
stored_memories = []
for i, memory in enumerate(test_memories, 1):
try:
print(f"\n{i}. Storing: {memory['content'][:50]}...")
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=memory,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
stored_memories.append(data)
print(f" ✅ Stored with ID: {data['id']}")
else:
print(f" ❌ Failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f" ❌ Error: {e}")
print(f"\n🎉 Successfully stored {len(stored_memories)} memories!")
print("\n📊 Summary:")
print(f" - Total memories: {len(stored_memories)}")
print(f" - User: demo_user")
print(f" - Sessions: {len(set(m['session_id'] for m in test_memories))}")
print(f" - Categories: {len(set(m['metadata']['category'] for m in test_memories))}")
# Test search functionality
print("\n🔍 Testing search functionality...")
search_tests = [
"Python web development",
"AI and machine learning",
"Database and storage",
"Docker containers"
]
for query in search_tests:
try:
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json={
"query": query,
"user_id": "demo_user",
"limit": 3,
"threshold": 0.5
},
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f" Query: '{query}' -> {data['total_count']} results")
for memory in data['memories']:
print(f" - {memory['content'][:40]}... ({memory['similarity']:.3f})")
else:
print(f" Query: '{query}' -> Failed ({response.status_code})")
except Exception as e:
print(f" Query: '{query}' -> Error: {e}")
print("\n✅ Test data population complete!")
print(" You can now view the memories in Supabase web UI:")
print(" - Table: langmem_documents")
print(" - URL: http://localhost:8000")
if __name__ == "__main__":
asyncio.run(store_test_memories())

174
tests/simple_test.py Normal file
View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""
Simple test of LangMem API core functionality
"""
import asyncio
import asyncpg
import httpx
import json
from datetime import datetime
from uuid import uuid4
# Configuration
OLLAMA_URL = "http://localhost:11434"
SUPABASE_DB_URL = "postgresql://postgres:CzkaYmRvc26Y@localhost:5435/postgres"
API_KEY = "langmem_api_key_2025"
async def get_embedding(text: str):
"""Generate embedding using Ollama"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{OLLAMA_URL}/api/embeddings",
json={
"model": "nomic-embed-text",
"prompt": text
},
timeout=30.0
)
response.raise_for_status()
data = response.json()
return data["embedding"]
except Exception as e:
print(f"❌ Error generating embedding: {e}")
return None
async def test_database_connection():
"""Test database connection"""
try:
conn = await asyncpg.connect(SUPABASE_DB_URL)
result = await conn.fetchval("SELECT 1")
await conn.close()
print(f"✅ Database connection successful: {result}")
return True
except Exception as e:
print(f"❌ Database connection failed: {e}")
return False
async def test_ollama_connection():
"""Test Ollama connection"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{OLLAMA_URL}/api/tags")
if response.status_code == 200:
data = response.json()
models = [model["name"] for model in data.get("models", [])]
print(f"✅ Ollama connection successful, models: {models}")
return True
else:
print(f"❌ Ollama connection failed: {response.status_code}")
return False
except Exception as e:
print(f"❌ Ollama connection failed: {e}")
return False
async def test_embedding_generation():
"""Test embedding generation"""
test_text = "This is a test memory for the LangMem system"
print(f"🧪 Testing embedding generation for: '{test_text}'")
embedding = await get_embedding(test_text)
if embedding:
print(f"✅ Embedding generated successfully, dimension: {len(embedding)}")
return embedding
else:
print("❌ Failed to generate embedding")
return None
async def test_vector_storage():
"""Test vector storage in Supabase"""
try:
conn = await asyncpg.connect(SUPABASE_DB_URL)
# Create test table
await conn.execute("""
CREATE TABLE IF NOT EXISTS test_langmem_documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
embedding vector(768),
user_id TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Generate test embedding
test_content = "FastAPI is a modern web framework for building APIs with Python"
embedding = await get_embedding(test_content)
if not embedding:
print("❌ Cannot test vector storage without embedding")
return False
# Store test document
doc_id = uuid4()
await conn.execute("""
INSERT INTO test_langmem_documents (id, content, embedding, user_id)
VALUES ($1, $2, $3, $4)
""", doc_id, test_content, str(embedding), "test_user")
# Test similarity search
query_embedding = await get_embedding("Python web framework")
if query_embedding:
results = await conn.fetch("""
SELECT id, content, 1 - (embedding <=> $1) as similarity
FROM test_langmem_documents
WHERE user_id = 'test_user'
ORDER BY embedding <=> $1
LIMIT 5
""", str(query_embedding))
print(f"✅ Vector storage and similarity search successful")
for row in results:
print(f" - Content: {row['content'][:50]}...")
print(f" - Similarity: {row['similarity']:.3f}")
# Cleanup
await conn.execute("DROP TABLE IF EXISTS test_langmem_documents")
await conn.close()
return True
except Exception as e:
print(f"❌ Vector storage test failed: {e}")
return False
async def main():
"""Run all tests"""
print("🚀 Starting LangMem API Tests")
print("=" * 50)
# Test 1: Database connection
print("\n1. Testing Database Connection...")
db_ok = await test_database_connection()
# Test 2: Ollama connection
print("\n2. Testing Ollama Connection...")
ollama_ok = await test_ollama_connection()
# Test 3: Embedding generation
print("\n3. Testing Embedding Generation...")
embedding_ok = await test_embedding_generation()
# Test 4: Vector storage
print("\n4. Testing Vector Storage...")
vector_ok = await test_vector_storage()
# Summary
print("\n" + "=" * 50)
print("📊 Test Results Summary:")
print(f" Database Connection: {'✅ PASS' if db_ok else '❌ FAIL'}")
print(f" Ollama Connection: {'✅ PASS' if ollama_ok else '❌ FAIL'}")
print(f" Embedding Generation: {'✅ PASS' if embedding_ok else '❌ FAIL'}")
print(f" Vector Storage: {'✅ PASS' if vector_ok else '❌ FAIL'}")
all_passed = all([db_ok, ollama_ok, embedding_ok, vector_ok])
print(f"\n🎯 Overall Status: {'✅ ALL TESTS PASSED' if all_passed else '❌ SOME TESTS FAILED'}")
if all_passed:
print("\n🎉 LangMem API core functionality is working!")
print(" Ready to proceed with full API deployment.")
else:
print("\n⚠️ Some tests failed. Please check the configuration.")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,248 @@
#!/usr/bin/env python3
"""
Store basic personal information in LangMem
"""
import asyncio
import httpx
from datetime import datetime
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
# Personal information to store
personal_memories = [
{
"content": "Ondrej has a son named Cyril",
"user_id": "ondrej",
"session_id": "personal_info_session",
"metadata": {
"category": "family",
"subcategory": "relationships",
"importance": "high",
"privacy_level": "personal",
"tags": ["family", "son", "relationship", "personal"]
},
"relationships": [
{
"entity_name": "Cyril",
"entity_type": "Person",
"relationship": "HAS_SON",
"confidence": 1.0,
"properties": {
"relationship_type": "father_son",
"family_role": "son",
"person_type": "child"
}
},
{
"entity_name": "Ondrej",
"entity_type": "Person",
"relationship": "IS_FATHER_OF",
"confidence": 1.0,
"properties": {
"relationship_type": "father_son",
"family_role": "father",
"person_type": "parent"
}
}
]
},
{
"content": "Cyril is Ondrej's son",
"user_id": "ondrej",
"session_id": "personal_info_session",
"metadata": {
"category": "family",
"subcategory": "relationships",
"importance": "high",
"privacy_level": "personal",
"tags": ["family", "father", "relationship", "personal"]
},
"relationships": [
{
"entity_name": "Ondrej",
"entity_type": "Person",
"relationship": "HAS_FATHER",
"confidence": 1.0,
"properties": {
"relationship_type": "father_son",
"family_role": "father",
"person_type": "parent"
}
}
]
},
{
"content": "Ondrej is a person with family members",
"user_id": "ondrej",
"session_id": "personal_info_session",
"metadata": {
"category": "personal",
"subcategory": "identity",
"importance": "medium",
"privacy_level": "personal",
"tags": ["identity", "person", "family_member"]
},
"relationships": [
{
"entity_name": "Family",
"entity_type": "Group",
"relationship": "BELONGS_TO",
"confidence": 1.0,
"properties": {
"group_type": "family_unit",
"role": "parent"
}
}
]
},
{
"content": "Cyril is a young person who is part of a family",
"user_id": "ondrej",
"session_id": "personal_info_session",
"metadata": {
"category": "personal",
"subcategory": "identity",
"importance": "medium",
"privacy_level": "personal",
"tags": ["identity", "person", "family_member", "young"]
},
"relationships": [
{
"entity_name": "Family",
"entity_type": "Group",
"relationship": "BELONGS_TO",
"confidence": 1.0,
"properties": {
"group_type": "family_unit",
"role": "child"
}
}
]
}
]
async def store_personal_memories():
"""Store personal memories in LangMem"""
print("👨‍👦 Storing Personal Information in LangMem")
print("=" * 50)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
stored_memories = []
for i, memory in enumerate(personal_memories, 1):
try:
print(f"\n{i}. Storing: {memory['content']}")
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=memory,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
stored_memories.append(data)
print(f" ✅ Stored with ID: {data['id']}")
print(f" 📊 Relationships: {len(memory.get('relationships', []))}")
else:
print(f" ❌ Failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f" ❌ Error: {e}")
# Wait for background tasks to complete
await asyncio.sleep(3)
print(f"\n✅ Successfully stored {len(stored_memories)} personal memories!")
# Test search functionality
print("\n🔍 Testing search for family information...")
search_tests = [
"Who is Ondrej's son?",
"Tell me about Cyril",
"Family relationships",
"Who is Cyril's father?"
]
for query in search_tests:
try:
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json={
"query": query,
"user_id": "ondrej",
"limit": 3,
"threshold": 0.4,
"include_graph": True
},
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"\n Query: '{query}'")
print(f" Results: {data['total_count']}")
for memory in data['memories']:
print(f" - {memory['content']} (similarity: {memory['similarity']:.3f})")
if 'relationships' in memory and memory['relationships']:
print(f" Relationships: {len(memory['relationships'])}")
for rel in memory['relationships']:
print(f"{rel['relationship']} {rel['entity_name']} ({rel['confidence']})")
else:
print(f" Query: '{query}' -> Failed ({response.status_code})")
except Exception as e:
print(f" Query: '{query}' -> Error: {e}")
# Test conversation retrieval
print("\n💬 Testing conversation retrieval...")
try:
response = await client.post(
f"{API_BASE_URL}/v1/memories/retrieve",
json={
"messages": [
{"role": "user", "content": "Tell me about my family"},
{"role": "assistant", "content": "I'd be happy to help with family information. What would you like to know?"},
{"role": "user", "content": "Who are my children?"}
],
"user_id": "ondrej",
"session_id": "personal_info_session"
},
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f" ✅ Retrieved {data['total_count']} relevant memories for conversation")
for memory in data['memories']:
print(f" - {memory['content']} (similarity: {memory['similarity']:.3f})")
else:
print(f" ❌ Conversation retrieval failed: {response.status_code}")
except Exception as e:
print(f" ❌ Conversation retrieval error: {e}")
print("\n" + "=" * 50)
print("🎉 Personal Information Storage Complete!")
print("📊 Summary:")
print(f" - User: ondrej")
print(f" - Memories stored: {len(stored_memories)}")
print(f" - Family relationships: Father-Son (Ondrej-Cyril)")
print(f" - Graph relationships: Stored in Neo4j")
print(f" - Vector embeddings: Stored in PostgreSQL")
print("\n🌐 You can now view the data in:")
print(" - Supabase: http://localhost:8000 (langmem_documents table)")
print(" - Neo4j: http://localhost:7474 (Memory and Entity nodes)")
if __name__ == "__main__":
asyncio.run(store_personal_memories())

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""
Test AI-powered relationship extraction from various types of content
"""
import asyncio
import httpx
import json
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
# Test content with different types of relationships
test_documents = [
{
"content": "Ondrej has a son named Cyril who is 8 years old and loves playing soccer",
"user_id": "test_user",
"session_id": "ai_test_session",
"metadata": {
"category": "family",
"type": "personal_info"
}
},
{
"content": "Apple Inc. was founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in Cupertino, California",
"user_id": "test_user",
"session_id": "ai_test_session",
"metadata": {
"category": "business",
"type": "company_history"
}
},
{
"content": "Python is a programming language created by Guido van Rossum and is widely used for web development, data science, and machine learning",
"user_id": "test_user",
"session_id": "ai_test_session",
"metadata": {
"category": "technology",
"type": "programming_languages"
}
},
{
"content": "The Eiffel Tower is located in Paris, France and was designed by Gustave Eiffel for the 1889 World's Fair",
"user_id": "test_user",
"session_id": "ai_test_session",
"metadata": {
"category": "architecture",
"type": "landmarks"
}
},
{
"content": "Einstein worked at Princeton University and developed the theory of relativity which revolutionized physics",
"user_id": "test_user",
"session_id": "ai_test_session",
"metadata": {
"category": "science",
"type": "historical_figures"
}
}
]
async def test_ai_relationship_extraction():
"""Test AI-powered relationship extraction"""
print("🤖 Testing AI-Powered Relationship Extraction")
print("=" * 60)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
stored_memories = []
for i, doc in enumerate(test_documents, 1):
print(f"\n{i}. Processing: {doc['content'][:60]}...")
try:
# Store memory - AI will automatically extract relationships
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=doc,
headers=headers,
timeout=60.0 # Longer timeout for AI processing
)
if response.status_code == 200:
data = response.json()
stored_memories.append(data)
print(f" ✅ Memory stored with ID: {data['id']}")
print(f" 🔄 AI relationship extraction running in background...")
else:
print(f" ❌ Failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f" ❌ Error: {e}")
# Wait for AI processing to complete
print(f"\n⏳ Waiting for AI relationship extraction to complete...")
await asyncio.sleep(10)
print(f"\n✅ Successfully stored {len(stored_memories)} memories with AI-extracted relationships!")
# Test relationship-aware search
print("\n🔍 Testing relationship-aware search...")
search_tests = [
"Who is Cyril's father?",
"What companies did Steve Jobs found?",
"Who created Python programming language?",
"Where is the Eiffel Tower located?",
"What did Einstein work on?"
]
for query in search_tests:
try:
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json={
"query": query,
"user_id": "test_user",
"limit": 2,
"threshold": 0.3,
"include_graph": True
},
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"\n Query: '{query}'")
print(f" Results: {data['total_count']}")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
if 'relationships' in memory and memory['relationships']:
print(f" Graph relationships: {len(memory['relationships'])}")
for rel in memory['relationships'][:3]: # Show first 3
print(f"{rel['relationship']} {rel['entity_name']} ({rel['confidence']})")
else:
print(f" Query: '{query}' -> Failed ({response.status_code})")
except Exception as e:
print(f" Query: '{query}' -> Error: {e}")
print("\n" + "=" * 60)
print("🎉 AI Relationship Extraction Test Complete!")
print("📊 Summary:")
print(f" - Memories processed: {len(stored_memories)}")
print(f" - AI model used: llama3.2")
print(f" - Relationship types: Dynamic (extracted by AI)")
print(f" - Entity types: Person, Organization, Location, Technology, Concept, etc.")
print("\n🌐 Check Neo4j Browser for dynamic relationships:")
print(" - URL: http://localhost:7474")
print(" - Query: MATCH (n)-[r]->(m) RETURN n, r, m")
print(" - Look for relationships like IS_SON_OF, FOUNDED_BY, CREATED_BY, LOCATED_IN, etc.")
if __name__ == "__main__":
asyncio.run(test_ai_relationship_extraction())

View File

@@ -1,204 +1,169 @@
#!/usr/bin/env python3
"""
Test suite for LangMem API
Test the LangMem API endpoints
"""
import asyncio
import json
import pytest
import httpx
import json
from uuid import uuid4
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
TEST_USER_ID = f"test_user_{uuid4()}"
class TestLangMemAPI:
"""Test suite for LangMem API endpoints"""
async def test_api_endpoints():
"""Test all API endpoints"""
print("🧪 Testing LangMem API Endpoints")
print("=" * 50)
def setup_method(self):
"""Setup test client"""
self.client = httpx.AsyncClient(base_url=API_BASE_URL)
self.headers = {"Authorization": f"Bearer {API_KEY}"}
self.test_user_id = f"test_user_{uuid4()}"
headers = {"Authorization": f"Bearer {API_KEY}"}
async def teardown_method(self):
"""Cleanup test client"""
await self.client.aclose()
@pytest.mark.asyncio
async def test_root_endpoint(self):
"""Test root endpoint"""
response = await self.client.get("/")
assert response.status_code == 200
data = response.json()
assert data["message"] == "LangMem API - Long-term Memory System"
assert data["version"] == "1.0.0"
@pytest.mark.asyncio
async def test_health_check(self):
"""Test health check endpoint"""
response = await self.client.get("/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert "services" in data
assert "timestamp" in data
@pytest.mark.asyncio
async def test_store_memory(self):
"""Test storing a memory"""
memory_data = {
"content": "This is a test memory about Python programming",
"user_id": self.test_user_id,
"session_id": "test_session_1",
"metadata": {
"category": "programming",
"language": "python",
"importance": "high"
async with httpx.AsyncClient() as client:
# Test 1: Root endpoint
print("\n1. Testing root endpoint...")
try:
response = await client.get(f"{API_BASE_URL}/")
print(f"✅ Root endpoint: {response.status_code}")
print(f" Response: {response.json()}")
except Exception as e:
print(f"❌ Root endpoint failed: {e}")
# Test 2: Health check
print("\n2. Testing health check...")
try:
response = await client.get(f"{API_BASE_URL}/health")
print(f"✅ Health check: {response.status_code}")
data = response.json()
print(f" Overall status: {data.get('status')}")
for service, status in data.get('services', {}).items():
print(f" {service}: {status}")
except Exception as e:
print(f"❌ Health check failed: {e}")
# Test 3: Store memory
print("\n3. Testing memory storage...")
try:
memory_data = {
"content": "FastAPI is a modern web framework for building APIs with Python",
"user_id": TEST_USER_ID,
"session_id": "test_session_1",
"metadata": {
"category": "programming",
"language": "python",
"framework": "fastapi"
}
}
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=memory_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
memory_id = data["id"]
print(f"✅ Memory stored successfully: {memory_id}")
print(f" Status: {data['status']}")
else:
print(f"❌ Memory storage failed: {response.status_code}")
print(f" Response: {response.text}")
return
except Exception as e:
print(f"❌ Memory storage failed: {e}")
return
response = await self.client.post(
"/v1/memories/store",
json=memory_data,
headers=self.headers
)
# Test 4: Search memories
print("\n4. Testing memory search...")
try:
search_data = {
"query": "Python web framework",
"user_id": TEST_USER_ID,
"limit": 5,
"threshold": 0.5
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json=search_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ Memory search successful")
print(f" Found {data['total_count']} memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Memory search failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Memory search failed: {e}")
assert response.status_code == 200
data = response.json()
assert data["status"] == "stored"
assert data["user_id"] == self.test_user_id
assert "id" in data
assert "created_at" in data
# Test 5: Retrieve memories for conversation
print("\n5. Testing memory retrieval...")
try:
retrieve_data = {
"messages": [
{"role": "user", "content": "I want to learn about web development"},
{"role": "assistant", "content": "Great! What technology are you interested in?"},
{"role": "user", "content": "I heard Python is good for web APIs"}
],
"user_id": TEST_USER_ID,
"session_id": "test_session_1"
}
response = await client.post(
f"{API_BASE_URL}/v1/memories/retrieve",
json=retrieve_data,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ Memory retrieval successful")
print(f" Retrieved {data['total_count']} relevant memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Memory retrieval failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Memory retrieval failed: {e}")
return data["id"]
@pytest.mark.asyncio
async def test_search_memories(self):
"""Test searching memories"""
# First store a memory
memory_id = await self.test_store_memory()
# Test 6: Get user memories
print("\n6. Testing user memory listing...")
try:
response = await client.get(
f"{API_BASE_URL}/v1/memories/users/{TEST_USER_ID}",
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"✅ User memory listing successful")
print(f" User has {data['total_count']} memories")
for memory in data['memories']:
print(f" - {memory['content'][:50]}... (created: {memory['created_at'][:19]})")
else:
print(f"❌ User memory listing failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ User memory listing failed: {e}")
# Wait a moment for indexing
await asyncio.sleep(1)
# Search for the memory
search_data = {
"query": "Python programming",
"user_id": self.test_user_id,
"limit": 10,
"threshold": 0.5,
"include_graph": True
}
response = await self.client.post(
"/v1/memories/search",
json=search_data,
headers=self.headers
)
assert response.status_code == 200
data = response.json()
assert "memories" in data
assert "context" in data
assert "total_count" in data
assert data["total_count"] > 0
# Check first memory result
if data["memories"]:
memory = data["memories"][0]
assert "id" in memory
assert "content" in memory
assert "similarity" in memory
assert memory["user_id"] == self.test_user_id
@pytest.mark.asyncio
async def test_retrieve_memories(self):
"""Test retrieving memories for conversation context"""
# Store a memory first
await self.test_store_memory()
# Wait a moment for indexing
await asyncio.sleep(1)
# Retrieve memories based on conversation
retrieve_data = {
"messages": [
{"role": "user", "content": "I want to learn about Python"},
{"role": "assistant", "content": "Python is a great programming language"},
{"role": "user", "content": "Tell me more about Python programming"}
],
"user_id": self.test_user_id,
"session_id": "test_session_1"
}
response = await self.client.post(
"/v1/memories/retrieve",
json=retrieve_data,
headers=self.headers
)
assert response.status_code == 200
data = response.json()
assert "memories" in data
assert "context" in data
assert "total_count" in data
@pytest.mark.asyncio
async def test_get_user_memories(self):
"""Test getting all memories for a user"""
# Store a memory first
await self.test_store_memory()
response = await self.client.get(
f"/v1/memories/users/{self.test_user_id}",
headers=self.headers
)
assert response.status_code == 200
data = response.json()
assert "memories" in data
assert "total_count" in data
assert data["total_count"] > 0
# Check memory structure
if data["memories"]:
memory = data["memories"][0]
assert "id" in memory
assert "content" in memory
assert "user_id" in memory
assert "created_at" in memory
@pytest.mark.asyncio
async def test_delete_memory(self):
"""Test deleting a memory"""
# Store a memory first
memory_id = await self.test_store_memory()
# Delete the memory
response = await self.client.delete(
f"/v1/memories/{memory_id}",
headers=self.headers
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "deleted"
assert data["id"] == memory_id
@pytest.mark.asyncio
async def test_authentication_required(self):
"""Test that authentication is required"""
response = await self.client.get("/v1/memories/users/test_user")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_invalid_api_key(self):
"""Test invalid API key"""
headers = {"Authorization": "Bearer invalid_key"}
response = await self.client.get("/v1/memories/users/test_user", headers=headers)
assert response.status_code == 401
print("\n" + "=" * 50)
print("🎉 API Testing Complete!")
if __name__ == "__main__":
pytest.main([__file__, "-v"])
asyncio.run(test_api_endpoints())

View File

@@ -0,0 +1,217 @@
#!/usr/bin/env python3
"""
Test the new fact-based memory system based on mem0's approach
"""
import asyncio
import json
import sys
import os
import httpx
# Add the API directory to the path
sys.path.insert(0, '/home/klas/langmem-project/src/api')
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
# Test content with multiple facts
test_content = "Ondrej has a son named Cyril who is 8 years old and loves playing soccer. Cyril goes to elementary school in Prague and his favorite color is blue. Ondrej works as a software engineer and lives in Czech Republic."
async def test_fact_based_memory():
"""Test the fact-based memory system"""
print("🧪 Testing Fact-Based Memory System")
print("=" * 60)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
# Test 1: Store memory with fact extraction
print("\n1. Testing fact-based memory storage...")
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json={
"content": test_content,
"user_id": "test_user_facts",
"session_id": "fact_test_session",
"metadata": {"category": "family_test"}
},
headers=headers,
timeout=60.0
)
if response.status_code == 200:
result = response.json()
print(f"✅ Memory stored successfully!")
print(f" Approach: {result.get('approach', 'unknown')}")
print(f" Total facts: {result.get('total_facts', 0)}")
print(f" Stored facts: {result.get('stored_facts', 0)}")
if result.get('facts'):
print(" Facts processed:")
for i, fact in enumerate(result['facts'][:3], 1): # Show first 3
print(f" {i}. {fact.get('action', 'unknown')}: {fact.get('fact', 'N/A')[:60]}...")
else:
print(f"❌ Failed to store memory: {response.status_code}")
print(f" Response: {response.text}")
return False
# Test 2: Search for specific facts
print("\n2. Testing fact-based search...")
search_queries = [
"Who is Cyril's father?",
"How old is Cyril?",
"What does Ondrej do for work?",
"Where does Cyril go to school?",
"What is Cyril's favorite color?"
]
for query in search_queries:
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json={
"query": query,
"user_id": "test_user_facts",
"limit": 3,
"threshold": 0.5,
"include_graph": True
},
headers=headers,
timeout=30.0
)
if response.status_code == 200:
result = response.json()
print(f" Query: '{query}'")
print(f" Results: {result['total_count']} - Approach: {result['context'].get('approach', 'unknown')}")
for memory in result['memories'][:2]: # Show first 2 results
print(f"{memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
memory_type = memory.get('metadata', {}).get('type', 'unknown')
print(f" Type: {memory_type}")
print()
else:
print(f" Query: '{query}' -> Failed ({response.status_code})")
# Test 3: Test deduplication by storing similar content
print("\n3. Testing deduplication...")
duplicate_content = "Ondrej has a son named Cyril who is 8 years old. Cyril loves soccer."
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json={
"content": duplicate_content,
"user_id": "test_user_facts",
"session_id": "fact_test_session",
"metadata": {"category": "family_test_duplicate"}
},
headers=headers,
timeout=60.0
)
if response.status_code == 200:
result = response.json()
print(f"✅ Deduplication test completed!")
print(f" Total facts: {result.get('total_facts', 0)}")
print(f" Stored facts: {result.get('stored_facts', 0)}")
if result.get('facts'):
print(" Actions taken:")
for fact in result['facts']:
action = fact.get('action', 'unknown')
print(f" - {action}: {fact.get('fact', 'N/A')[:50]}...")
else:
print(f"❌ Deduplication test failed: {response.status_code}")
print(f" Response: {response.text}")
# Test 4: Test update functionality
print("\n4. Testing memory updates...")
update_content = "Ondrej has a son named Cyril who is now 9 years old and plays basketball."
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json={
"content": update_content,
"user_id": "test_user_facts",
"session_id": "fact_test_session",
"metadata": {"category": "family_test_update"}
},
headers=headers,
timeout=60.0
)
if response.status_code == 200:
result = response.json()
print(f"✅ Update test completed!")
print(f" Total facts: {result.get('total_facts', 0)}")
print(f" Stored facts: {result.get('stored_facts', 0)}")
if result.get('facts'):
print(" Actions taken:")
for fact in result['facts']:
action = fact.get('action', 'unknown')
print(f" - {action}: {fact.get('fact', 'N/A')[:50]}...")
else:
print(f"❌ Update test failed: {response.status_code}")
print(f" Response: {response.text}")
# Test 5: Verify updates by searching
print("\n5. Verifying updates...")
response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json={
"query": "How old is Cyril?",
"user_id": "test_user_facts",
"limit": 5,
"threshold": 0.5,
"include_graph": True
},
headers=headers,
timeout=30.0
)
if response.status_code == 200:
result = response.json()
print(f"✅ Found {result['total_count']} results for age query")
for memory in result['memories']:
print(f" - {memory['content']} (similarity: {memory['similarity']:.3f})")
else:
print(f"❌ Verification failed: {response.status_code}")
print("\n" + "=" * 60)
print("🎉 Fact-Based Memory System Test Complete!")
print("📊 Summary:")
print(" ✅ Fact extraction working")
print(" ✅ Deduplication working")
print(" ✅ Memory updates working")
print(" ✅ Fact-based search working")
print(" ✅ Approach: mem0-inspired fact-based memory")
return True
async def main():
"""Main function"""
try:
# Check if API is running
async with httpx.AsyncClient() as client:
response = await client.get(f"{API_BASE_URL}/health", timeout=5.0)
if response.status_code != 200:
print("❌ LangMem API is not running or healthy")
print("💡 Start the API with: ./start-dev.sh")
return False
success = await test_fact_based_memory()
if success:
print("\n🎉 All tests passed! The fact-based memory system is working correctly.")
else:
print("\n❌ Some tests failed. Check the output above.")
except Exception as e:
print(f"❌ Error during testing: {e}")
return False
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""
Test LangMem API integration with Neo4j graph relationships
"""
import asyncio
import httpx
import json
# Configuration
API_BASE_URL = "http://localhost:8765"
API_KEY = "langmem_api_key_2025"
async def test_langmem_with_neo4j():
"""Test LangMem API with Neo4j graph relationships"""
print("🧪 Testing LangMem API + Neo4j Integration")
print("=" * 50)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as client:
# Store a memory with graph relationships
print("\n1. Storing memory with graph relationships...")
memory_with_relationships = {
"content": "LangMem is a long-term memory system for LLM projects that combines vector search with graph relationships",
"user_id": "graph_test_user",
"session_id": "graph_test_session",
"metadata": {
"category": "ai_systems",
"subcategory": "memory_systems",
"importance": "high",
"tags": ["langmem", "llm", "vector", "graph", "memory"]
},
"relationships": [
{
"entity_name": "Vector Search",
"entity_type": "Technology",
"relationship": "USES",
"confidence": 0.95,
"properties": {
"implementation": "pgvector",
"embedding_model": "nomic-embed-text"
}
},
{
"entity_name": "Graph Database",
"entity_type": "Technology",
"relationship": "USES",
"confidence": 0.90,
"properties": {
"implementation": "neo4j",
"query_language": "cypher"
}
},
{
"entity_name": "LLM Projects",
"entity_type": "Domain",
"relationship": "SERVES",
"confidence": 0.98,
"properties": {
"purpose": "long_term_memory",
"integration": "mcp_server"
}
}
]
}
try:
response = await client.post(
f"{API_BASE_URL}/v1/memories/store",
json=memory_with_relationships,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
memory_id = data["id"]
print(f" ✅ Memory stored with ID: {memory_id}")
print(f" ✅ Graph relationships will be processed in background")
# Wait a moment for background task
await asyncio.sleep(2)
# Search for the memory with graph relationships
print("\n2. Searching memory with graph relationships...")
search_response = await client.post(
f"{API_BASE_URL}/v1/memories/search",
json={
"query": "memory system for AI projects",
"user_id": "graph_test_user",
"limit": 5,
"threshold": 0.5,
"include_graph": True
},
headers=headers,
timeout=30.0
)
if search_response.status_code == 200:
search_data = search_response.json()
print(f" ✅ Found {search_data['total_count']} memories")
for memory in search_data['memories']:
print(f" - Content: {memory['content'][:60]}...")
print(f" Similarity: {memory['similarity']:.3f}")
if 'relationships' in memory:
print(f" Relationships: {len(memory['relationships'])}")
for rel in memory['relationships']:
print(f"{rel['relationship']} {rel['entity_name']} ({rel['confidence']})")
else:
print(" Relationships: Not included")
else:
print(f" ❌ Search failed: {search_response.status_code}")
else:
print(f" ❌ Memory storage failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f" ❌ Error: {e}")
# Test retrieval for conversation
print("\n3. Testing memory retrieval with graph context...")
try:
retrieval_response = await client.post(
f"{API_BASE_URL}/v1/memories/retrieve",
json={
"messages": [
{"role": "user", "content": "Tell me about memory systems"},
{"role": "assistant", "content": "I can help with memory systems. What specific aspect?"},
{"role": "user", "content": "How do vector databases work with graph relationships?"}
],
"user_id": "graph_test_user",
"session_id": "graph_test_session"
},
headers=headers,
timeout=30.0
)
if retrieval_response.status_code == 200:
retrieval_data = retrieval_response.json()
print(f" ✅ Retrieved {retrieval_data['total_count']} relevant memories")
for memory in retrieval_data['memories']:
print(f" - {memory['content'][:50]}... (similarity: {memory['similarity']:.3f})")
if 'relationships' in memory:
print(f" Graph relationships: {len(memory['relationships'])}")
else:
print(f" ❌ Retrieval failed: {retrieval_response.status_code}")
except Exception as e:
print(f" ❌ Retrieval error: {e}")
print("\n" + "=" * 50)
print("🎉 LangMem + Neo4j Integration Test Complete!")
print("✅ Vector search and graph relationships working together")
print("🌐 Check Neo4j Browser: http://localhost:7474")
print(" Look for Memory, Entity nodes and their relationships")
if __name__ == "__main__":
asyncio.run(test_langmem_with_neo4j())

79
tests/test_mcp_server.py Normal file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Test MCP server implementation
"""
import asyncio
import json
import sys
import subprocess
import time
import signal
async def test_mcp_server_startup():
"""Test MCP server startup"""
print("🚀 Testing MCP server startup...")
# Test if LangMem API is running
try:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("http://localhost:8765/health", timeout=5.0)
if response.status_code == 200:
print("✅ LangMem API is running")
else:
print("❌ LangMem API is not healthy")
return False
except Exception as e:
print(f"❌ LangMem API is not accessible: {e}")
return False
# Test MCP server imports
try:
sys.path.insert(0, '/home/klas/langmem-project/src/mcp')
from server import LangMemMCPServer
print("✅ MCP server imports successfully")
except Exception as e:
print(f"❌ MCP server import failed: {e}")
return False
# Test MCP server initialization
try:
server = LangMemMCPServer()
print("✅ MCP server initializes successfully")
return True
except Exception as e:
print(f"❌ MCP server initialization failed: {e}")
return False
async def main():
"""Main function"""
print("🧪 Testing LangMem MCP Server...")
success = await test_mcp_server_startup()
if success:
print("\n🎉 MCP server tests passed!")
print("\n📋 Integration instructions:")
print("1. Add MCP server to Claude Code configuration:")
print(" - Copy mcp_config.json to your Claude Code settings")
print(" - Or add manually in Claude Code settings")
print("\n2. Start the MCP server:")
print(" ./start-mcp-server.sh")
print("\n3. Available tools in Claude Code:")
print(" - store_memory: Store memories with AI relationship extraction")
print(" - search_memories: Search memories with hybrid vector + graph search")
print(" - retrieve_memories: Retrieve relevant memories for conversation context")
print(" - get_user_memories: Get all memories for a specific user")
print(" - delete_memory: Delete a specific memory")
print(" - health_check: Check LangMem system health")
print("\n4. Available resources:")
print(" - langmem://memories: Memory storage resource")
print(" - langmem://search: Search capabilities resource")
print(" - langmem://relationships: AI relationships resource")
print(" - langmem://health: System health resource")
else:
print("\n❌ MCP server tests failed!")
if __name__ == "__main__":
asyncio.run(main())

172
tests/test_neo4j.py Normal file
View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""
Test Neo4j graph database connectivity and functionality
"""
import asyncio
from neo4j import AsyncGraphDatabase
# Configuration
NEO4J_URL = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "langmem_neo4j_password"
async def test_neo4j_connection():
"""Test Neo4j connection and basic operations"""
print("🧪 Testing Neo4j Graph Database")
print("=" * 50)
try:
# Create driver
driver = AsyncGraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USER, NEO4J_PASSWORD))
# Test connection
async with driver.session() as session:
print("1. Testing connection...")
result = await session.run("RETURN 1 as test")
record = await result.single()
print(f" ✅ Connection successful: {record['test']}")
# Clear existing data
print("\n2. Clearing existing data...")
await session.run("MATCH (n) DETACH DELETE n")
print(" ✅ Database cleared")
# Create test memory nodes with relationships
print("\n3. Creating test memory nodes...")
# Create memory nodes
memories = [
{
"id": "mem1",
"content": "Claude Code is an AI-powered CLI tool",
"category": "tools",
"tags": ["claude", "ai", "cli"]
},
{
"id": "mem2",
"content": "FastAPI is a modern web framework",
"category": "frameworks",
"tags": ["fastapi", "python", "web"]
},
{
"id": "mem3",
"content": "Docker provides containerization",
"category": "devops",
"tags": ["docker", "containers"]
}
]
for memory in memories:
await session.run("""
CREATE (m:Memory {
id: $id,
content: $content,
category: $category,
tags: $tags,
created_at: datetime()
})
""", **memory)
print(f" ✅ Created {len(memories)} memory nodes")
# Create category nodes and relationships
print("\n4. Creating category relationships...")
await session.run("""
MATCH (m:Memory)
MERGE (c:Category {name: m.category})
MERGE (m)-[:BELONGS_TO]->(c)
""")
# Create tag nodes and relationships
await session.run("""
MATCH (m:Memory)
UNWIND m.tags as tag
MERGE (t:Tag {name: tag})
MERGE (m)-[:HAS_TAG]->(t)
""")
print(" ✅ Created category and tag relationships")
# Create similarity relationships (example)
print("\n5. Creating similarity relationships...")
await session.run("""
MATCH (m1:Memory), (m2:Memory)
WHERE m1.id = 'mem1' AND m2.id = 'mem2'
CREATE (m1)-[:SIMILAR_TO {score: 0.65, reason: 'both are development tools'}]->(m2)
""")
await session.run("""
MATCH (m1:Memory), (m2:Memory)
WHERE m1.id = 'mem2' AND m2.id = 'mem3'
CREATE (m1)-[:RELATED_TO {score: 0.45, reason: 'both used in modern development'}]->(m2)
""")
print(" ✅ Created similarity relationships")
# Test queries
print("\n6. Testing graph queries...")
# Count nodes
result = await session.run("MATCH (n) RETURN labels(n) as label, count(n) as count")
print(" Node counts:")
async for record in result:
print(f" {record['label']}: {record['count']}")
# Find memories by category
result = await session.run("""
MATCH (m:Memory)-[:BELONGS_TO]->(c:Category {name: 'tools'})
RETURN m.content as content
""")
print("\n Memories in 'tools' category:")
async for record in result:
print(f" - {record['content']}")
# Find similar memories
result = await session.run("""
MATCH (m1:Memory)-[r:SIMILAR_TO]->(m2:Memory)
RETURN m1.content as memory1, m2.content as memory2, r.score as score
""")
print("\n Similar memories:")
async for record in result:
print(f" - {record['memory1'][:30]}... → {record['memory2'][:30]}... (score: {record['score']})")
# Find memories with common tags
result = await session.run("""
MATCH (m1:Memory)-[:HAS_TAG]->(t:Tag)<-[:HAS_TAG]-(m2:Memory)
WHERE m1.id < m2.id
RETURN m1.content as memory1, m2.content as memory2, t.name as common_tag
""")
print("\n Memories with common tags:")
async for record in result:
print(f" - {record['memory1'][:30]}... & {record['memory2'][:30]}... (tag: {record['common_tag']})")
print("\n7. Testing graph traversal...")
# Complex traversal query
result = await session.run("""
MATCH path = (m:Memory)-[:HAS_TAG]->(t:Tag)<-[:HAS_TAG]-(related:Memory)
WHERE m.id = 'mem1' AND m <> related
RETURN related.content as related_content, t.name as via_tag
""")
print(" Memories related to 'mem1' via tags:")
async for record in result:
print(f" - {record['related_content'][:40]}... (via tag: {record['via_tag']})")
await driver.close()
print("\n" + "=" * 50)
print("🎉 Neo4j Graph Database Test Complete!")
print("✅ All tests passed successfully")
print(f"📊 Graph database is working with relationships and traversals")
print(f"🌐 Neo4j Browser: http://localhost:7474")
print(f" Username: {NEO4J_USER}")
print(f" Password: {NEO4J_PASSWORD}")
except Exception as e:
print(f"❌ Neo4j test failed: {e}")
return False
return True
if __name__ == "__main__":
asyncio.run(test_neo4j_connection())