Authentication fixes: - Removed auth.js from all HTML pages (was causing double prompts) - Removed .htaccess and .htpasswd files (redundant with Caddy) - Now using only Caddy basic auth: langmem / langmem2025 Signal bridge fixes: - Found Signal bridge bot: @signalbot:matrix.klas.chat - Created DM room between Claude user and bridge bot - Sent login command to register Claude with Signal bridge - Claude should now be able to bridge messages to Signal 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1243 lines
40 KiB
HTML
1243 lines
40 KiB
HTML
<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>Implementation Guide - LangMem Documentation</title>
|
|
<meta name="description" content="Step-by-step implementation guide for building the LangMem long-term memory system with detailed phases and instructions.">
|
|
<link rel="stylesheet" href="../assets/css/style.css">
|
|
<script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script>
|
|
<script src="https://cdn.jsdelivr.net/npm/prismjs@1.29.0/components/prism-core.min.js"></script>
|
|
<script src="https://cdn.jsdelivr.net/npm/prismjs@1.29.0/plugins/autoloader/prism-autoloader.min.js"></script>
|
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/prismjs@1.29.0/themes/prism.css">
|
|
</head>
|
|
<body>
|
|
<header>
|
|
<nav>
|
|
<a href="../" class="logo">🧠 LangMem</a>
|
|
<ul class="nav-links">
|
|
<li><a href="../">Home</a></li>
|
|
<li><a href="../architecture/">Architecture</a></li>
|
|
<li><a href="../implementation/">Implementation</a></li>
|
|
<li><a href="../api/">API Docs</a></li>
|
|
</ul>
|
|
</nav>
|
|
</header>
|
|
|
|
<main>
|
|
<section class="hero">
|
|
<h1>Implementation Guide</h1>
|
|
<p>Complete step-by-step guide to implementing the LangMem long-term memory system from scratch to production.</p>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<h2>Prerequisites</h2>
|
|
<div class="grid grid-2">
|
|
<div class="card">
|
|
<h3>🛠️ Development Environment</h3>
|
|
<ul>
|
|
<li>Docker & Docker Compose</li>
|
|
<li>Python 3.11+</li>
|
|
<li>Node.js 18+ (for n8n integration)</li>
|
|
<li>Git version control</li>
|
|
</ul>
|
|
</div>
|
|
<div class="card">
|
|
<h3>📦 System Requirements</h3>
|
|
<ul>
|
|
<li>8GB+ RAM (16GB recommended)</li>
|
|
<li>50GB+ storage space</li>
|
|
<li>GPU optional (for better Ollama performance)</li>
|
|
<li>Network access to localai network</li>
|
|
</ul>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<h2>Phase 1: Core API Development</h2>
|
|
<div class="phase-card">
|
|
<span class="phase-number">1</span>
|
|
<h3 class="phase-title">Project Setup & Structure</h3>
|
|
<div class="phase-timeline">Week 1</div>
|
|
<span class="status status-in-progress">In Progress</span>
|
|
|
|
<h4>1.1 Create Project Structure</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-bash"># Create project directory
|
|
mkdir -p /home/klas/langmem
|
|
cd /home/klas/langmem
|
|
|
|
# Initialize project structure
|
|
mkdir -p {src,tests,docs,scripts,config}
|
|
mkdir -p src/{api,core,storage,models}
|
|
|
|
# Create virtual environment
|
|
python -m venv venv
|
|
source venv/bin/activate
|
|
|
|
# Create requirements.txt
|
|
cat > requirements.txt << 'EOF'
|
|
fastapi==0.104.1
|
|
uvicorn==0.24.0
|
|
langchain==0.1.0
|
|
langmem==0.1.0
|
|
neo4j==5.15.0
|
|
psycopg2-binary==2.9.9
|
|
pgvector==0.2.4
|
|
ollama==0.1.7
|
|
pydantic==2.5.0
|
|
python-dotenv==1.0.0
|
|
pytest==7.4.3
|
|
pytest-asyncio==0.21.1
|
|
httpx==0.25.2
|
|
EOF
|
|
|
|
# Install dependencies
|
|
pip install -r requirements.txt</code></pre>
|
|
</div>
|
|
|
|
<h4>1.2 Docker Infrastructure Setup</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-yaml"># docker-compose.yml
|
|
version: '3.8'
|
|
services:
|
|
ollama:
|
|
image: ollama/ollama:latest
|
|
container_name: langmem_ollama
|
|
ports:
|
|
- "11434:11434"
|
|
volumes:
|
|
- ollama_data:/root/.ollama
|
|
networks:
|
|
- localai_network
|
|
environment:
|
|
- OLLAMA_HOST=0.0.0.0
|
|
restart: unless-stopped
|
|
|
|
supabase:
|
|
image: supabase/postgres:latest
|
|
container_name: langmem_supabase
|
|
environment:
|
|
- POSTGRES_DB=langmem_db
|
|
- POSTGRES_USER=langmem_user
|
|
- POSTGRES_PASSWORD=secure_password_123
|
|
- POSTGRES_HOST_AUTH_METHOD=trust
|
|
volumes:
|
|
- supabase_data:/var/lib/postgresql/data
|
|
- ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
|
|
networks:
|
|
- localai_network
|
|
restart: unless-stopped
|
|
|
|
neo4j:
|
|
image: neo4j:5.15-community
|
|
container_name: langmem_neo4j
|
|
environment:
|
|
- NEO4J_AUTH=neo4j/secure_password_123
|
|
- NEO4J_HEAP_SIZE=1G
|
|
- NEO4J_PAGECACHE_SIZE=1G
|
|
volumes:
|
|
- neo4j_data:/data
|
|
- neo4j_logs:/logs
|
|
- neo4j_plugins:/plugins
|
|
networks:
|
|
- localai_network
|
|
restart: unless-stopped
|
|
|
|
memory_service:
|
|
build: .
|
|
container_name: langmem_api
|
|
ports:
|
|
- "8000:8000"
|
|
depends_on:
|
|
- supabase
|
|
- neo4j
|
|
- ollama
|
|
environment:
|
|
- SUPABASE_URL=postgresql://langmem_user:secure_password_123@supabase:5432/langmem_db
|
|
- NEO4J_URI=bolt://neo4j:7687
|
|
- NEO4J_USERNAME=neo4j
|
|
- NEO4J_PASSWORD=secure_password_123
|
|
- OLLAMA_URL=http://ollama:11434
|
|
- API_KEY=langmem_api_key_2025
|
|
volumes:
|
|
- ./src:/app/src
|
|
- ./config:/app/config
|
|
networks:
|
|
- localai_network
|
|
restart: unless-stopped
|
|
|
|
networks:
|
|
localai_network:
|
|
external: true
|
|
|
|
volumes:
|
|
ollama_data:
|
|
supabase_data:
|
|
neo4j_data:
|
|
neo4j_logs:
|
|
neo4j_plugins:</code></pre>
|
|
</div>
|
|
|
|
<h4>1.3 Database Schema Setup</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-sql">-- scripts/init_db.sql
|
|
-- Enable pgvector extension
|
|
CREATE EXTENSION IF NOT EXISTS vector;
|
|
|
|
-- Create documents table with vector embeddings
|
|
CREATE TABLE documents (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
content TEXT NOT NULL,
|
|
embedding VECTOR(1536),
|
|
metadata JSONB DEFAULT '{}',
|
|
source_url TEXT,
|
|
document_type VARCHAR(50) DEFAULT 'text',
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
updated_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
-- Create vector index for similarity search
|
|
CREATE INDEX documents_embedding_idx ON documents
|
|
USING ivfflat (embedding vector_cosine_ops)
|
|
WITH (lists = 100);
|
|
|
|
-- Create metadata index for filtering
|
|
CREATE INDEX documents_metadata_idx ON documents
|
|
USING gin (metadata);
|
|
|
|
-- Create search index for full-text search
|
|
CREATE INDEX documents_content_idx ON documents
|
|
USING gin (to_tsvector('english', content));
|
|
|
|
-- Create function to update updated_at timestamp
|
|
CREATE OR REPLACE FUNCTION update_updated_at_column()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
NEW.updated_at = NOW();
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Create trigger to automatically update updated_at
|
|
CREATE TRIGGER update_documents_updated_at
|
|
BEFORE UPDATE ON documents
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION update_updated_at_column();</code></pre>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<div class="phase-card">
|
|
<span class="phase-number">2</span>
|
|
<h3 class="phase-title">Core API Implementation</h3>
|
|
<div class="phase-timeline">Week 2</div>
|
|
<span class="status status-in-progress">In Progress</span>
|
|
|
|
<h4>2.1 FastAPI Application Structure</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># src/api/main.py
|
|
from fastapi import FastAPI, HTTPException, Depends, Header
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from contextlib import asynccontextmanager
|
|
import os
|
|
from typing import Optional
|
|
|
|
from .routers import memory, health
|
|
from .middleware import authentication
|
|
from ..core.config import settings
|
|
from ..core.database import init_databases
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Initialize databases
|
|
await init_databases()
|
|
yield
|
|
# Cleanup if needed
|
|
|
|
app = FastAPI(
|
|
title="LangMem API",
|
|
description="Long-term memory system for LLM projects",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Add authentication middleware
|
|
app.add_middleware(authentication.APIKeyMiddleware)
|
|
|
|
# Include routers
|
|
app.include_router(memory.router, prefix="/v1", tags=["memory"])
|
|
app.include_router(health.router, prefix="/v1", tags=["health"])
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {"message": "LangMem API", "version": "1.0.0"}</code></pre>
|
|
</div>
|
|
|
|
<h4>2.2 Memory Router Implementation</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># src/api/routers/memory.py
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
from typing import Dict, Any, List, Optional
|
|
from pydantic import BaseModel
|
|
import uuid
|
|
|
|
from ...core.memory_manager import MemoryManager
|
|
from ...core.models import IngestRequest, QueryRequest, QueryResponse
|
|
|
|
router = APIRouter()
|
|
|
|
class IngestRequest(BaseModel):
|
|
content: str
|
|
document_type: str = "text"
|
|
metadata: Dict[str, Any] = {}
|
|
source_url: Optional[str] = None
|
|
|
|
class QueryRequest(BaseModel):
|
|
query: str
|
|
max_tokens: int = 4000
|
|
conversation_history: List[Dict[str, str]] = []
|
|
filters: Dict[str, Any] = {}
|
|
|
|
class QueryResponse(BaseModel):
|
|
context: str
|
|
sources: List[Dict[str, Any]]
|
|
relevant_facts: List[Dict[str, Any]]
|
|
confidence_score: float
|
|
|
|
@router.post("/ingest", response_model=Dict[str, Any])
|
|
async def ingest_document(
|
|
request: IngestRequest,
|
|
memory_manager: MemoryManager = Depends(get_memory_manager)
|
|
):
|
|
"""Ingest a new document into the memory system."""
|
|
try:
|
|
result = await memory_manager.ingest_document(
|
|
content=request.content,
|
|
document_type=request.document_type,
|
|
metadata=request.metadata,
|
|
source_url=request.source_url
|
|
)
|
|
return {
|
|
"status": "success",
|
|
"document_id": str(result.document_id),
|
|
"chunks_created": result.chunks_created,
|
|
"entities_extracted": result.entities_extracted
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post("/context/retrieve", response_model=QueryResponse)
|
|
async def retrieve_context(
|
|
request: QueryRequest,
|
|
memory_manager: MemoryManager = Depends(get_memory_manager)
|
|
):
|
|
"""Retrieve augmented context for a query."""
|
|
try:
|
|
result = await memory_manager.retrieve_context(
|
|
query=request.query,
|
|
max_tokens=request.max_tokens,
|
|
conversation_history=request.conversation_history,
|
|
filters=request.filters
|
|
)
|
|
return QueryResponse(
|
|
context=result.context,
|
|
sources=result.sources,
|
|
relevant_facts=result.relevant_facts,
|
|
confidence_score=result.confidence_score
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post("/tools/{tool_name}/execute")
|
|
async def execute_tool(
|
|
tool_name: str,
|
|
params: Dict[str, Any],
|
|
memory_manager: MemoryManager = Depends(get_memory_manager)
|
|
):
|
|
"""Execute a specific memory tool."""
|
|
try:
|
|
result = await memory_manager.execute_tool(tool_name, params)
|
|
return {"result": result}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
async def get_memory_manager() -> MemoryManager:
|
|
"""Dependency to get memory manager instance."""
|
|
return MemoryManager()</code></pre>
|
|
</div>
|
|
|
|
<h4>2.3 Memory Manager Core Logic</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># src/core/memory_manager.py
|
|
import asyncio
|
|
from typing import Dict, Any, List, Optional
|
|
from langchain.memory import ConversationBufferMemory
|
|
from langchain.embeddings import OllamaEmbeddings
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
|
|
from .storage.supabase_store import SupabaseStore
|
|
from .storage.neo4j_store import Neo4jStore
|
|
from .models import IngestResult, QueryResult
|
|
|
|
class MemoryManager:
|
|
def __init__(self):
|
|
self.supabase_store = SupabaseStore()
|
|
self.neo4j_store = Neo4jStore()
|
|
self.embeddings = OllamaEmbeddings(
|
|
model="llama3",
|
|
base_url="http://ollama:11434"
|
|
)
|
|
self.text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=1000,
|
|
chunk_overlap=200
|
|
)
|
|
|
|
async def ingest_document(
|
|
self,
|
|
content: str,
|
|
document_type: str = "text",
|
|
metadata: Dict[str, Any] = None,
|
|
source_url: Optional[str] = None
|
|
) -> IngestResult:
|
|
"""Ingest a document into the memory system."""
|
|
metadata = metadata or {}
|
|
|
|
# Step 1: Split document into chunks
|
|
chunks = self.text_splitter.split_text(content)
|
|
|
|
# Step 2: Generate embeddings for chunks
|
|
embeddings = await self._generate_embeddings(chunks)
|
|
|
|
# Step 3: Store chunks and embeddings in Supabase
|
|
chunk_ids = await self.supabase_store.store_chunks(
|
|
chunks=chunks,
|
|
embeddings=embeddings,
|
|
metadata=metadata,
|
|
source_url=source_url,
|
|
document_type=document_type
|
|
)
|
|
|
|
# Step 4: Extract entities and relationships
|
|
entities = await self._extract_entities(chunks)
|
|
|
|
# Step 5: Store entities and relationships in Neo4j
|
|
await self.neo4j_store.store_entities(
|
|
entities=entities,
|
|
chunk_ids=chunk_ids
|
|
)
|
|
|
|
return IngestResult(
|
|
document_id=chunk_ids[0], # Use first chunk ID as document ID
|
|
chunks_created=len(chunks),
|
|
entities_extracted=len(entities)
|
|
)
|
|
|
|
async def retrieve_context(
|
|
self,
|
|
query: str,
|
|
max_tokens: int = 4000,
|
|
conversation_history: List[Dict[str, str]] = None,
|
|
filters: Dict[str, Any] = None
|
|
) -> QueryResult:
|
|
"""Retrieve augmented context for a query."""
|
|
conversation_history = conversation_history or []
|
|
filters = filters or {}
|
|
|
|
# Step 1: Generate query embedding
|
|
query_embedding = await self._generate_embeddings([query])
|
|
|
|
# Step 2: Semantic search in Supabase
|
|
similar_chunks = await self.supabase_store.similarity_search(
|
|
query_embedding=query_embedding[0],
|
|
limit=20,
|
|
filters=filters
|
|
)
|
|
|
|
# Step 3: Extract entities from similar chunks
|
|
entities = await self._extract_entities_from_chunks(similar_chunks)
|
|
|
|
# Step 4: Graph traversal in Neo4j
|
|
related_facts = await self.neo4j_store.find_related_facts(
|
|
entities=entities,
|
|
max_depth=2
|
|
)
|
|
|
|
# Step 5: Assemble context
|
|
context = await self._assemble_context(
|
|
query=query,
|
|
similar_chunks=similar_chunks,
|
|
related_facts=related_facts,
|
|
max_tokens=max_tokens
|
|
)
|
|
|
|
return QueryResult(
|
|
context=context["text"],
|
|
sources=context["sources"],
|
|
relevant_facts=related_facts,
|
|
confidence_score=context["confidence"]
|
|
)
|
|
|
|
async def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
|
|
"""Generate embeddings for a list of texts."""
|
|
return await self.embeddings.aembed_documents(texts)
|
|
|
|
async def _extract_entities(self, chunks: List[str]) -> List[Dict[str, Any]]:
|
|
"""Extract entities from text chunks."""
|
|
# This would use Ollama to extract entities
|
|
# For now, return empty list
|
|
return []
|
|
|
|
async def _extract_entities_from_chunks(self, chunks: List[Dict[str, Any]]) -> List[str]:
|
|
"""Extract entities from retrieved chunks."""
|
|
# Extract entities from chunk metadata or content
|
|
entities = []
|
|
for chunk in chunks:
|
|
if "entities" in chunk.get("metadata", {}):
|
|
entities.extend(chunk["metadata"]["entities"])
|
|
return list(set(entities))
|
|
|
|
async def _assemble_context(
|
|
self,
|
|
query: str,
|
|
similar_chunks: List[Dict[str, Any]],
|
|
related_facts: List[Dict[str, Any]],
|
|
max_tokens: int
|
|
) -> Dict[str, Any]:
|
|
"""Assemble the final context from chunks and facts."""
|
|
context_parts = []
|
|
sources = []
|
|
|
|
# Add similar chunks
|
|
for chunk in similar_chunks[:10]: # Limit to top 10
|
|
context_parts.append(chunk["content"])
|
|
sources.append({
|
|
"id": chunk["id"],
|
|
"type": "chunk",
|
|
"source_url": chunk.get("source_url"),
|
|
"confidence": chunk.get("similarity", 0.0)
|
|
})
|
|
|
|
# Add related facts
|
|
for fact in related_facts[:5]: # Limit to top 5
|
|
context_parts.append(fact["description"])
|
|
sources.append({
|
|
"id": fact["id"],
|
|
"type": "fact",
|
|
"relationship": fact.get("relationship"),
|
|
"confidence": fact.get("confidence", 0.0)
|
|
})
|
|
|
|
# Combine context parts
|
|
full_context = "\n\n".join(context_parts)
|
|
|
|
# Truncate if too long (rough token estimation)
|
|
if len(full_context) > max_tokens * 4: # Rough 4 chars per token
|
|
full_context = full_context[:max_tokens * 4]
|
|
|
|
return {
|
|
"text": full_context,
|
|
"sources": sources,
|
|
"confidence": 0.8 # Calculate based on similarity scores
|
|
}</code></pre>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<div class="phase-card">
|
|
<span class="phase-number">3</span>
|
|
<h3 class="phase-title">Storage Layer Implementation</h3>
|
|
<div class="phase-timeline">Week 3</div>
|
|
<span class="status status-planning">Planning</span>
|
|
|
|
<h4>3.1 Supabase Store Implementation</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># src/core/storage/supabase_store.py
|
|
import asyncio
|
|
import asyncpg
|
|
from typing import List, Dict, Any, Optional
|
|
import json
|
|
import uuid
|
|
|
|
class SupabaseStore:
|
|
def __init__(self):
|
|
self.connection_pool = None
|
|
|
|
async def initialize(self):
|
|
"""Initialize database connection pool."""
|
|
self.connection_pool = await asyncpg.create_pool(
|
|
host="supabase",
|
|
port=5432,
|
|
database="langmem_db",
|
|
user="langmem_user",
|
|
password="secure_password_123",
|
|
min_size=2,
|
|
max_size=10
|
|
)
|
|
|
|
async def store_chunks(
|
|
self,
|
|
chunks: List[str],
|
|
embeddings: List[List[float]],
|
|
metadata: Dict[str, Any],
|
|
source_url: Optional[str] = None,
|
|
document_type: str = "text"
|
|
) -> List[str]:
|
|
"""Store document chunks with embeddings."""
|
|
chunk_ids = []
|
|
|
|
async with self.connection_pool.acquire() as conn:
|
|
for chunk, embedding in zip(chunks, embeddings):
|
|
chunk_id = str(uuid.uuid4())
|
|
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO documents (id, content, embedding, metadata, source_url, document_type)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
""",
|
|
chunk_id,
|
|
chunk,
|
|
embedding,
|
|
json.dumps(metadata),
|
|
source_url,
|
|
document_type
|
|
)
|
|
|
|
chunk_ids.append(chunk_id)
|
|
|
|
return chunk_ids
|
|
|
|
async def similarity_search(
|
|
self,
|
|
query_embedding: List[float],
|
|
limit: int = 10,
|
|
filters: Dict[str, Any] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Perform vector similarity search."""
|
|
filters = filters or {}
|
|
|
|
query = """
|
|
SELECT id, content, metadata, source_url, document_type,
|
|
1 - (embedding <=> $1) as similarity
|
|
FROM documents
|
|
WHERE 1 - (embedding <=> $1) > 0.7
|
|
ORDER BY embedding <=> $1
|
|
LIMIT $2
|
|
"""
|
|
|
|
async with self.connection_pool.acquire() as conn:
|
|
rows = await conn.fetch(query, query_embedding, limit)
|
|
|
|
results = []
|
|
for row in rows:
|
|
results.append({
|
|
"id": row["id"],
|
|
"content": row["content"],
|
|
"metadata": json.loads(row["metadata"]),
|
|
"source_url": row["source_url"],
|
|
"document_type": row["document_type"],
|
|
"similarity": row["similarity"]
|
|
})
|
|
|
|
return results</code></pre>
|
|
</div>
|
|
|
|
<h4>3.2 Neo4j Store Implementation</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># src/core/storage/neo4j_store.py
|
|
from neo4j import AsyncGraphDatabase
|
|
from typing import List, Dict, Any, Optional
|
|
import logging
|
|
|
|
class Neo4jStore:
|
|
def __init__(self):
|
|
self.driver = None
|
|
|
|
async def initialize(self):
|
|
"""Initialize Neo4j connection."""
|
|
self.driver = AsyncGraphDatabase.driver(
|
|
"bolt://neo4j:7687",
|
|
auth=("neo4j", "secure_password_123")
|
|
)
|
|
|
|
async def store_entities(
|
|
self,
|
|
entities: List[Dict[str, Any]],
|
|
chunk_ids: List[str]
|
|
):
|
|
"""Store entities and their relationships."""
|
|
async with self.driver.session() as session:
|
|
for i, chunk_id in enumerate(chunk_ids):
|
|
# Create DocumentChunk node
|
|
await session.run(
|
|
"""
|
|
CREATE (doc:DocumentChunk {
|
|
id: $chunk_id,
|
|
supabase_id: $chunk_id,
|
|
created_at: datetime()
|
|
})
|
|
""",
|
|
chunk_id=chunk_id
|
|
)
|
|
|
|
# Create entity nodes and relationships
|
|
for entity in entities:
|
|
await self._create_entity_node(session, entity, chunk_id)
|
|
|
|
async def _create_entity_node(
|
|
self,
|
|
session,
|
|
entity: Dict[str, Any],
|
|
chunk_id: str
|
|
):
|
|
"""Create an entity node and link it to document chunk."""
|
|
entity_type = entity.get("type", "Entity")
|
|
entity_name = entity.get("name", "Unknown")
|
|
|
|
# Create entity node
|
|
await session.run(
|
|
f"""
|
|
MERGE (entity:{entity_type} {{name: $name}})
|
|
ON CREATE SET entity.created_at = datetime()
|
|
""",
|
|
name=entity_name
|
|
)
|
|
|
|
# Create relationship between document and entity
|
|
await session.run(
|
|
f"""
|
|
MATCH (doc:DocumentChunk {{id: $chunk_id}})
|
|
MATCH (entity:{entity_type} {{name: $name}})
|
|
CREATE (doc)-[:MENTIONS]->(entity)
|
|
""",
|
|
chunk_id=chunk_id,
|
|
name=entity_name
|
|
)
|
|
|
|
async def find_related_facts(
|
|
self,
|
|
entities: List[str],
|
|
max_depth: int = 2
|
|
) -> List[Dict[str, Any]]:
|
|
"""Find facts related to given entities."""
|
|
if not entities:
|
|
return []
|
|
|
|
query = """
|
|
MATCH (entity)
|
|
WHERE entity.name IN $entities
|
|
MATCH (entity)-[r*1..%d]-(related)
|
|
RETURN DISTINCT related.name as name,
|
|
labels(related) as labels,
|
|
type(r[0]) as relationship,
|
|
length(r) as depth
|
|
ORDER BY depth, name
|
|
LIMIT 50
|
|
""" % max_depth
|
|
|
|
async with self.driver.session() as session:
|
|
result = await session.run(query, entities=entities)
|
|
|
|
facts = []
|
|
async for record in result:
|
|
facts.append({
|
|
"id": f"fact_{len(facts)}",
|
|
"name": record["name"],
|
|
"type": record["labels"][0] if record["labels"] else "Entity",
|
|
"relationship": record["relationship"],
|
|
"depth": record["depth"],
|
|
"confidence": 1.0 - (record["depth"] * 0.2),
|
|
"description": f"Related {record['labels'][0] if record['labels'] else 'entity'}: {record['name']}"
|
|
})
|
|
|
|
return facts
|
|
|
|
async def close(self):
|
|
"""Close Neo4j connection."""
|
|
if self.driver:
|
|
await self.driver.close()</code></pre>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<h2>Phase 2: Testing & Validation</h2>
|
|
<div class="phase-card">
|
|
<span class="phase-number">4</span>
|
|
<h3 class="phase-title">Comprehensive Testing</h3>
|
|
<div class="phase-timeline">Weeks 4-5</div>
|
|
<span class="status status-planning">Planning</span>
|
|
|
|
<h4>4.1 Test Suite Setup</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># tests/test_memory_manager.py
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import Mock, patch
|
|
from src.core.memory_manager import MemoryManager
|
|
from src.core.models import IngestResult, QueryResult
|
|
|
|
@pytest.fixture
|
|
async def memory_manager():
|
|
"""Create memory manager instance for testing."""
|
|
manager = MemoryManager()
|
|
await manager.initialize()
|
|
yield manager
|
|
await manager.cleanup()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_document_ingestion(memory_manager):
|
|
"""Test document ingestion pipeline."""
|
|
content = "This is a test document about machine learning."
|
|
|
|
result = await memory_manager.ingest_document(
|
|
content=content,
|
|
document_type="text",
|
|
metadata={"source": "test"}
|
|
)
|
|
|
|
assert isinstance(result, IngestResult)
|
|
assert result.chunks_created > 0
|
|
assert result.document_id is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_context_retrieval(memory_manager):
|
|
"""Test context retrieval functionality."""
|
|
# First ingest a document
|
|
await memory_manager.ingest_document(
|
|
content="Python is a programming language used for AI development.",
|
|
document_type="text"
|
|
)
|
|
|
|
# Then query for context
|
|
result = await memory_manager.retrieve_context(
|
|
query="What is Python?",
|
|
max_tokens=1000
|
|
)
|
|
|
|
assert isinstance(result, QueryResult)
|
|
assert "Python" in result.context
|
|
assert len(result.sources) > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hybrid_retrieval(memory_manager):
|
|
"""Test hybrid retrieval combining vector and graph search."""
|
|
# Ingest related documents
|
|
documents = [
|
|
"John works at OpenAI on machine learning research.",
|
|
"OpenAI developed GPT models for natural language processing.",
|
|
"Machine learning is a subset of artificial intelligence."
|
|
]
|
|
|
|
for doc in documents:
|
|
await memory_manager.ingest_document(content=doc, document_type="text")
|
|
|
|
# Query should find related information
|
|
result = await memory_manager.retrieve_context(
|
|
query="Tell me about John's work",
|
|
max_tokens=2000
|
|
)
|
|
|
|
assert "John" in result.context
|
|
assert "OpenAI" in result.context
|
|
assert len(result.relevant_facts) > 0</code></pre>
|
|
</div>
|
|
|
|
<h4>4.2 Integration Testing</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># tests/test_integration.py
|
|
import pytest
|
|
import httpx
|
|
from fastapi.testclient import TestClient
|
|
from src.api.main import app
|
|
|
|
client = TestClient(app)
|
|
|
|
def test_health_endpoint():
|
|
"""Test health check endpoint."""
|
|
response = client.get("/v1/health")
|
|
assert response.status_code == 200
|
|
assert response.json()["status"] == "healthy"
|
|
|
|
def test_ingest_endpoint():
|
|
"""Test document ingestion endpoint."""
|
|
response = client.post(
|
|
"/v1/ingest",
|
|
json={
|
|
"content": "Test document content",
|
|
"document_type": "text",
|
|
"metadata": {"test": True}
|
|
},
|
|
headers={"X-API-Key": "langmem_api_key_2025"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "document_id" in data
|
|
|
|
def test_retrieve_endpoint():
|
|
"""Test context retrieval endpoint."""
|
|
# First ingest a document
|
|
client.post(
|
|
"/v1/ingest",
|
|
json={"content": "Machine learning is fascinating"},
|
|
headers={"X-API-Key": "langmem_api_key_2025"}
|
|
)
|
|
|
|
# Then retrieve context
|
|
response = client.post(
|
|
"/v1/context/retrieve",
|
|
json={
|
|
"query": "What is machine learning?",
|
|
"max_tokens": 1000
|
|
},
|
|
headers={"X-API-Key": "langmem_api_key_2025"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "context" in data
|
|
assert "sources" in data</code></pre>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<h2>Phase 3: MCP Server Transformation</h2>
|
|
<div class="phase-card">
|
|
<span class="phase-number">5</span>
|
|
<h3 class="phase-title">MCP Protocol Implementation</h3>
|
|
<div class="phase-timeline">Weeks 6-7</div>
|
|
<span class="status status-planning">Planning</span>
|
|
|
|
<h4>5.1 MCP Server Setup</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-python"># src/mcp/server.py
|
|
from typing import Dict, Any, List, Optional
|
|
import asyncio
|
|
import json
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class MCPMessage:
|
|
"""MCP message structure."""
|
|
id: str
|
|
method: str
|
|
params: Dict[str, Any]
|
|
|
|
@dataclass
|
|
class MCPResponse:
|
|
"""MCP response structure."""
|
|
id: str
|
|
result: Optional[Dict[str, Any]] = None
|
|
error: Optional[Dict[str, Any]] = None
|
|
|
|
class MCPServer:
|
|
"""Model Context Protocol server implementation."""
|
|
|
|
def __init__(self, memory_manager):
|
|
self.memory_manager = memory_manager
|
|
self.capabilities = {
|
|
"tools": {
|
|
"memory_ingest": {
|
|
"description": "Ingest documents into memory",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {"type": "string"},
|
|
"document_type": {"type": "string", "default": "text"},
|
|
"metadata": {"type": "object"}
|
|
},
|
|
"required": ["content"]
|
|
}
|
|
},
|
|
"memory_retrieve": {
|
|
"description": "Retrieve context from memory",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string"},
|
|
"max_tokens": {"type": "integer", "default": 4000}
|
|
},
|
|
"required": ["query"]
|
|
}
|
|
}
|
|
},
|
|
"resources": {
|
|
"memory_stats": {
|
|
"description": "Memory system statistics",
|
|
"uri": "memory://stats"
|
|
}
|
|
}
|
|
}
|
|
|
|
async def handle_message(self, message: MCPMessage) -> MCPResponse:
|
|
"""Handle incoming MCP message."""
|
|
try:
|
|
if message.method == "initialize":
|
|
return await self._handle_initialize(message)
|
|
elif message.method == "tools/list":
|
|
return await self._handle_tools_list(message)
|
|
elif message.method == "tools/call":
|
|
return await self._handle_tools_call(message)
|
|
elif message.method == "resources/list":
|
|
return await self._handle_resources_list(message)
|
|
elif message.method == "resources/read":
|
|
return await self._handle_resources_read(message)
|
|
else:
|
|
return MCPResponse(
|
|
id=message.id,
|
|
error={"code": -32601, "message": "Method not found"}
|
|
)
|
|
except Exception as e:
|
|
return MCPResponse(
|
|
id=message.id,
|
|
error={"code": -32603, "message": str(e)}
|
|
)
|
|
|
|
async def _handle_initialize(self, message: MCPMessage) -> MCPResponse:
|
|
"""Handle initialization request."""
|
|
return MCPResponse(
|
|
id=message.id,
|
|
result={
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": self.capabilities,
|
|
"serverInfo": {
|
|
"name": "langmem",
|
|
"version": "1.0.0"
|
|
}
|
|
}
|
|
)
|
|
|
|
async def _handle_tools_call(self, message: MCPMessage) -> MCPResponse:
|
|
"""Handle tool execution request."""
|
|
tool_name = message.params.get("name")
|
|
arguments = message.params.get("arguments", {})
|
|
|
|
if tool_name == "memory_ingest":
|
|
result = await self.memory_manager.ingest_document(
|
|
content=arguments["content"],
|
|
document_type=arguments.get("document_type", "text"),
|
|
metadata=arguments.get("metadata", {})
|
|
)
|
|
return MCPResponse(
|
|
id=message.id,
|
|
result={"content": [{"type": "text", "text": f"Ingested document: {result.document_id}"}]}
|
|
)
|
|
|
|
elif tool_name == "memory_retrieve":
|
|
result = await self.memory_manager.retrieve_context(
|
|
query=arguments["query"],
|
|
max_tokens=arguments.get("max_tokens", 4000)
|
|
)
|
|
return MCPResponse(
|
|
id=message.id,
|
|
result={"content": [{"type": "text", "text": result.context}]}
|
|
)
|
|
|
|
return MCPResponse(
|
|
id=message.id,
|
|
error={"code": -32602, "message": "Unknown tool"}
|
|
)</code></pre>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<h2>Phase 4: Client Integration</h2>
|
|
<div class="phase-card">
|
|
<span class="phase-number">6</span>
|
|
<h3 class="phase-title">n8n & Claude Code Integration</h3>
|
|
<div class="phase-timeline">Weeks 8-9</div>
|
|
<span class="status status-planning">Planning</span>
|
|
|
|
<h4>6.1 n8n Workflow Example</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-json">{
|
|
"nodes": [
|
|
{
|
|
"parameters": {
|
|
"httpMethod": "POST",
|
|
"path": "webhook-langmem",
|
|
"responseMode": "responseNode",
|
|
"options": {}
|
|
},
|
|
"name": "Webhook",
|
|
"type": "n8n-nodes-base.webhook",
|
|
"typeVersion": 1,
|
|
"position": [200, 300]
|
|
},
|
|
{
|
|
"parameters": {
|
|
"requestMethod": "POST",
|
|
"url": "http://memory_service:8000/v1/ingest",
|
|
"jsonParameters": true,
|
|
"parametersJson": "{\n \"content\": \"{{ $json.content }}\",\n \"document_type\": \"{{ $json.type || 'text' }}\",\n \"metadata\": {\n \"source\": \"n8n\",\n \"timestamp\": \"{{ $now }}\"\n }\n}",
|
|
"options": {},
|
|
"headerParametersJson": "{\n \"X-API-Key\": \"langmem_api_key_2025\"\n}"
|
|
},
|
|
"name": "Ingest to Memory",
|
|
"type": "n8n-nodes-base.httpRequest",
|
|
"typeVersion": 1,
|
|
"position": [400, 300]
|
|
},
|
|
{
|
|
"parameters": {
|
|
"requestMethod": "POST",
|
|
"url": "http://memory_service:8000/v1/context/retrieve",
|
|
"jsonParameters": true,
|
|
"parametersJson": "{\n \"query\": \"{{ $json.query }}\",\n \"max_tokens\": 4000\n}",
|
|
"options": {},
|
|
"headerParametersJson": "{\n \"X-API-Key\": \"langmem_api_key_2025\"\n}"
|
|
},
|
|
"name": "Query Memory",
|
|
"type": "n8n-nodes-base.httpRequest",
|
|
"typeVersion": 1,
|
|
"position": [600, 300]
|
|
},
|
|
{
|
|
"parameters": {
|
|
"respondWith": "json",
|
|
"responseBody": "{{ $json }}"
|
|
},
|
|
"name": "Response",
|
|
"type": "n8n-nodes-base.respondToWebhook",
|
|
"typeVersion": 1,
|
|
"position": [800, 300]
|
|
}
|
|
],
|
|
"connections": {
|
|
"Webhook": {
|
|
"main": [
|
|
[
|
|
{
|
|
"node": "Ingest to Memory",
|
|
"type": "main",
|
|
"index": 0
|
|
}
|
|
]
|
|
]
|
|
},
|
|
"Ingest to Memory": {
|
|
"main": [
|
|
[
|
|
{
|
|
"node": "Query Memory",
|
|
"type": "main",
|
|
"index": 0
|
|
}
|
|
]
|
|
]
|
|
},
|
|
"Query Memory": {
|
|
"main": [
|
|
[
|
|
{
|
|
"node": "Response",
|
|
"type": "main",
|
|
"index": 0
|
|
}
|
|
]
|
|
]
|
|
}
|
|
}
|
|
}</code></pre>
|
|
</div>
|
|
|
|
<h4>6.2 Deployment Script</h4>
|
|
<div class="code-block">
|
|
<pre><code class="language-bash">#!/bin/bash
|
|
# deploy.sh
|
|
|
|
set -e
|
|
|
|
echo "🚀 Starting LangMem deployment..."
|
|
|
|
# Create network if it doesn't exist
|
|
docker network ls | grep -q localai_network || docker network create localai_network
|
|
|
|
# Build and start services
|
|
docker-compose up -d --build
|
|
|
|
# Wait for services to be ready
|
|
echo "⏳ Waiting for services to be ready..."
|
|
sleep 30
|
|
|
|
# Initialize Ollama models
|
|
echo "🦙 Initializing Ollama models..."
|
|
docker exec langmem_ollama ollama pull llama3
|
|
docker exec langmem_ollama ollama pull nomic-embed-text
|
|
|
|
# Test the API
|
|
echo "🧪 Testing API endpoints..."
|
|
curl -X POST "http://localhost:8000/v1/ingest" \
|
|
-H "Content-Type: application/json" \
|
|
-H "X-API-Key: langmem_api_key_2025" \
|
|
-d '{"content": "Hello, LangMem!", "document_type": "text"}'
|
|
|
|
curl -X POST "http://localhost:8000/v1/context/retrieve" \
|
|
-H "Content-Type: application/json" \
|
|
-H "X-API-Key: langmem_api_key_2025" \
|
|
-d '{"query": "What is LangMem?", "max_tokens": 1000}'
|
|
|
|
echo "✅ LangMem deployment complete!"
|
|
echo "📡 API available at: http://localhost:8000"
|
|
echo "🔧 n8n integration ready on localai_network"</code></pre>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="mb-4">
|
|
<h2>Success Metrics</h2>
|
|
<div class="grid grid-2">
|
|
<div class="card">
|
|
<h3>📊 Performance Metrics</h3>
|
|
<ul>
|
|
<li>Context retrieval < 2 seconds</li>
|
|
<li>Document ingestion < 5 seconds</li>
|
|
<li>Memory search accuracy > 80%</li>
|
|
<li>System uptime > 99%</li>
|
|
</ul>
|
|
</div>
|
|
<div class="card">
|
|
<h3>🎯 Functional Metrics</h3>
|
|
<ul>
|
|
<li>Successful n8n integration</li>
|
|
<li>MCP protocol compliance</li>
|
|
<li>Data consistency across stores</li>
|
|
<li>Comprehensive test coverage</li>
|
|
</ul>
|
|
</div>
|
|
</div>
|
|
</section>
|
|
|
|
<section class="text-center">
|
|
<h2>Ready to Begin?</h2>
|
|
<p class="mb-3">Start with Phase 1 and follow the step-by-step guide to build your LangMem system.</p>
|
|
<div class="cta-buttons">
|
|
<a href="../api/" class="btn btn-primary">📡 API Reference</a>
|
|
<a href="../architecture/" class="btn btn-secondary">🏗️ Architecture Details</a>
|
|
</div>
|
|
</section>
|
|
</main>
|
|
|
|
<footer style="text-align: center; padding: 2rem; margin-top: 4rem; color: var(--text-secondary);">
|
|
<p>© 2025 LangMem Documentation. Built with modern web technologies.</p>
|
|
</footer>
|
|
|
|
<script src="../assets/js/main.js"></script>
|
|
</body>
|
|
</html> |