Files
t66_langmem/docs/implementation/index.html
Docker Config Backup d086af72ee Add basic authentication to documentation
- Added .htaccess with Apache basic auth configuration
- Created .htpasswd with username 'langmem' and password 'langmem2025'
- Added auth.js for JavaScript-based authentication backup
- Updated all HTML pages to include authentication script
- Added AUTH_INFO.md with access credentials and setup info

Credentials: langmem / langmem2025

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 13:27:23 +02:00

1244 lines
40 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Implementation Guide - LangMem Documentation</title>
<meta name="description" content="Step-by-step implementation guide for building the LangMem long-term memory system with detailed phases and instructions.">
<link rel="stylesheet" href="../assets/css/style.css">
<script src="../auth.js"></script>
<script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/prismjs@1.29.0/components/prism-core.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/prismjs@1.29.0/plugins/autoloader/prism-autoloader.min.js"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/prismjs@1.29.0/themes/prism.css">
</head>
<body>
<header>
<nav>
<a href="../" class="logo">🧠 LangMem</a>
<ul class="nav-links">
<li><a href="../">Home</a></li>
<li><a href="../architecture/">Architecture</a></li>
<li><a href="../implementation/">Implementation</a></li>
<li><a href="../api/">API Docs</a></li>
</ul>
</nav>
</header>
<main>
<section class="hero">
<h1>Implementation Guide</h1>
<p>Complete step-by-step guide to implementing the LangMem long-term memory system from scratch to production.</p>
</section>
<section class="mb-4">
<h2>Prerequisites</h2>
<div class="grid grid-2">
<div class="card">
<h3>🛠️ Development Environment</h3>
<ul>
<li>Docker & Docker Compose</li>
<li>Python 3.11+</li>
<li>Node.js 18+ (for n8n integration)</li>
<li>Git version control</li>
</ul>
</div>
<div class="card">
<h3>📦 System Requirements</h3>
<ul>
<li>8GB+ RAM (16GB recommended)</li>
<li>50GB+ storage space</li>
<li>GPU optional (for better Ollama performance)</li>
<li>Network access to localai network</li>
</ul>
</div>
</div>
</section>
<section class="mb-4">
<h2>Phase 1: Core API Development</h2>
<div class="phase-card">
<span class="phase-number">1</span>
<h3 class="phase-title">Project Setup & Structure</h3>
<div class="phase-timeline">Week 1</div>
<span class="status status-in-progress">In Progress</span>
<h4>1.1 Create Project Structure</h4>
<div class="code-block">
<pre><code class="language-bash"># Create project directory
mkdir -p /home/klas/langmem
cd /home/klas/langmem
# Initialize project structure
mkdir -p {src,tests,docs,scripts,config}
mkdir -p src/{api,core,storage,models}
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Create requirements.txt
cat > requirements.txt << 'EOF'
fastapi==0.104.1
uvicorn==0.24.0
langchain==0.1.0
langmem==0.1.0
neo4j==5.15.0
psycopg2-binary==2.9.9
pgvector==0.2.4
ollama==0.1.7
pydantic==2.5.0
python-dotenv==1.0.0
pytest==7.4.3
pytest-asyncio==0.21.1
httpx==0.25.2
EOF
# Install dependencies
pip install -r requirements.txt</code></pre>
</div>
<h4>1.2 Docker Infrastructure Setup</h4>
<div class="code-block">
<pre><code class="language-yaml"># docker-compose.yml
version: '3.8'
services:
ollama:
image: ollama/ollama:latest
container_name: langmem_ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
networks:
- localai_network
environment:
- OLLAMA_HOST=0.0.0.0
restart: unless-stopped
supabase:
image: supabase/postgres:latest
container_name: langmem_supabase
environment:
- POSTGRES_DB=langmem_db
- POSTGRES_USER=langmem_user
- POSTGRES_PASSWORD=secure_password_123
- POSTGRES_HOST_AUTH_METHOD=trust
volumes:
- supabase_data:/var/lib/postgresql/data
- ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
networks:
- localai_network
restart: unless-stopped
neo4j:
image: neo4j:5.15-community
container_name: langmem_neo4j
environment:
- NEO4J_AUTH=neo4j/secure_password_123
- NEO4J_HEAP_SIZE=1G
- NEO4J_PAGECACHE_SIZE=1G
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
- neo4j_plugins:/plugins
networks:
- localai_network
restart: unless-stopped
memory_service:
build: .
container_name: langmem_api
ports:
- "8000:8000"
depends_on:
- supabase
- neo4j
- ollama
environment:
- SUPABASE_URL=postgresql://langmem_user:secure_password_123@supabase:5432/langmem_db
- NEO4J_URI=bolt://neo4j:7687
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=secure_password_123
- OLLAMA_URL=http://ollama:11434
- API_KEY=langmem_api_key_2025
volumes:
- ./src:/app/src
- ./config:/app/config
networks:
- localai_network
restart: unless-stopped
networks:
localai_network:
external: true
volumes:
ollama_data:
supabase_data:
neo4j_data:
neo4j_logs:
neo4j_plugins:</code></pre>
</div>
<h4>1.3 Database Schema Setup</h4>
<div class="code-block">
<pre><code class="language-sql">-- scripts/init_db.sql
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create documents table with vector embeddings
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
embedding VECTOR(1536),
metadata JSONB DEFAULT '{}',
source_url TEXT,
document_type VARCHAR(50) DEFAULT 'text',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Create vector index for similarity search
CREATE INDEX documents_embedding_idx ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Create metadata index for filtering
CREATE INDEX documents_metadata_idx ON documents
USING gin (metadata);
-- Create search index for full-text search
CREATE INDEX documents_content_idx ON documents
USING gin (to_tsvector('english', content));
-- Create function to update updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger to automatically update updated_at
CREATE TRIGGER update_documents_updated_at
BEFORE UPDATE ON documents
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();</code></pre>
</div>
</div>
</section>
<section class="mb-4">
<div class="phase-card">
<span class="phase-number">2</span>
<h3 class="phase-title">Core API Implementation</h3>
<div class="phase-timeline">Week 2</div>
<span class="status status-in-progress">In Progress</span>
<h4>2.1 FastAPI Application Structure</h4>
<div class="code-block">
<pre><code class="language-python"># src/api/main.py
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import os
from typing import Optional
from .routers import memory, health
from .middleware import authentication
from ..core.config import settings
from ..core.database import init_databases
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize databases
await init_databases()
yield
# Cleanup if needed
app = FastAPI(
title="LangMem API",
description="Long-term memory system for LLM projects",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add authentication middleware
app.add_middleware(authentication.APIKeyMiddleware)
# Include routers
app.include_router(memory.router, prefix="/v1", tags=["memory"])
app.include_router(health.router, prefix="/v1", tags=["health"])
@app.get("/")
async def root():
return {"message": "LangMem API", "version": "1.0.0"}</code></pre>
</div>
<h4>2.2 Memory Router Implementation</h4>
<div class="code-block">
<pre><code class="language-python"># src/api/routers/memory.py
from fastapi import APIRouter, HTTPException, Depends
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
import uuid
from ...core.memory_manager import MemoryManager
from ...core.models import IngestRequest, QueryRequest, QueryResponse
router = APIRouter()
class IngestRequest(BaseModel):
content: str
document_type: str = "text"
metadata: Dict[str, Any] = {}
source_url: Optional[str] = None
class QueryRequest(BaseModel):
query: str
max_tokens: int = 4000
conversation_history: List[Dict[str, str]] = []
filters: Dict[str, Any] = {}
class QueryResponse(BaseModel):
context: str
sources: List[Dict[str, Any]]
relevant_facts: List[Dict[str, Any]]
confidence_score: float
@router.post("/ingest", response_model=Dict[str, Any])
async def ingest_document(
request: IngestRequest,
memory_manager: MemoryManager = Depends(get_memory_manager)
):
"""Ingest a new document into the memory system."""
try:
result = await memory_manager.ingest_document(
content=request.content,
document_type=request.document_type,
metadata=request.metadata,
source_url=request.source_url
)
return {
"status": "success",
"document_id": str(result.document_id),
"chunks_created": result.chunks_created,
"entities_extracted": result.entities_extracted
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/context/retrieve", response_model=QueryResponse)
async def retrieve_context(
request: QueryRequest,
memory_manager: MemoryManager = Depends(get_memory_manager)
):
"""Retrieve augmented context for a query."""
try:
result = await memory_manager.retrieve_context(
query=request.query,
max_tokens=request.max_tokens,
conversation_history=request.conversation_history,
filters=request.filters
)
return QueryResponse(
context=result.context,
sources=result.sources,
relevant_facts=result.relevant_facts,
confidence_score=result.confidence_score
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/tools/{tool_name}/execute")
async def execute_tool(
tool_name: str,
params: Dict[str, Any],
memory_manager: MemoryManager = Depends(get_memory_manager)
):
"""Execute a specific memory tool."""
try:
result = await memory_manager.execute_tool(tool_name, params)
return {"result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def get_memory_manager() -> MemoryManager:
"""Dependency to get memory manager instance."""
return MemoryManager()</code></pre>
</div>
<h4>2.3 Memory Manager Core Logic</h4>
<div class="code-block">
<pre><code class="language-python"># src/core/memory_manager.py
import asyncio
from typing import Dict, Any, List, Optional
from langchain.memory import ConversationBufferMemory
from langchain.embeddings import OllamaEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from .storage.supabase_store import SupabaseStore
from .storage.neo4j_store import Neo4jStore
from .models import IngestResult, QueryResult
class MemoryManager:
def __init__(self):
self.supabase_store = SupabaseStore()
self.neo4j_store = Neo4jStore()
self.embeddings = OllamaEmbeddings(
model="llama3",
base_url="http://ollama:11434"
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
async def ingest_document(
self,
content: str,
document_type: str = "text",
metadata: Dict[str, Any] = None,
source_url: Optional[str] = None
) -> IngestResult:
"""Ingest a document into the memory system."""
metadata = metadata or {}
# Step 1: Split document into chunks
chunks = self.text_splitter.split_text(content)
# Step 2: Generate embeddings for chunks
embeddings = await self._generate_embeddings(chunks)
# Step 3: Store chunks and embeddings in Supabase
chunk_ids = await self.supabase_store.store_chunks(
chunks=chunks,
embeddings=embeddings,
metadata=metadata,
source_url=source_url,
document_type=document_type
)
# Step 4: Extract entities and relationships
entities = await self._extract_entities(chunks)
# Step 5: Store entities and relationships in Neo4j
await self.neo4j_store.store_entities(
entities=entities,
chunk_ids=chunk_ids
)
return IngestResult(
document_id=chunk_ids[0], # Use first chunk ID as document ID
chunks_created=len(chunks),
entities_extracted=len(entities)
)
async def retrieve_context(
self,
query: str,
max_tokens: int = 4000,
conversation_history: List[Dict[str, str]] = None,
filters: Dict[str, Any] = None
) -> QueryResult:
"""Retrieve augmented context for a query."""
conversation_history = conversation_history or []
filters = filters or {}
# Step 1: Generate query embedding
query_embedding = await self._generate_embeddings([query])
# Step 2: Semantic search in Supabase
similar_chunks = await self.supabase_store.similarity_search(
query_embedding=query_embedding[0],
limit=20,
filters=filters
)
# Step 3: Extract entities from similar chunks
entities = await self._extract_entities_from_chunks(similar_chunks)
# Step 4: Graph traversal in Neo4j
related_facts = await self.neo4j_store.find_related_facts(
entities=entities,
max_depth=2
)
# Step 5: Assemble context
context = await self._assemble_context(
query=query,
similar_chunks=similar_chunks,
related_facts=related_facts,
max_tokens=max_tokens
)
return QueryResult(
context=context["text"],
sources=context["sources"],
relevant_facts=related_facts,
confidence_score=context["confidence"]
)
async def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for a list of texts."""
return await self.embeddings.aembed_documents(texts)
async def _extract_entities(self, chunks: List[str]) -> List[Dict[str, Any]]:
"""Extract entities from text chunks."""
# This would use Ollama to extract entities
# For now, return empty list
return []
async def _extract_entities_from_chunks(self, chunks: List[Dict[str, Any]]) -> List[str]:
"""Extract entities from retrieved chunks."""
# Extract entities from chunk metadata or content
entities = []
for chunk in chunks:
if "entities" in chunk.get("metadata", {}):
entities.extend(chunk["metadata"]["entities"])
return list(set(entities))
async def _assemble_context(
self,
query: str,
similar_chunks: List[Dict[str, Any]],
related_facts: List[Dict[str, Any]],
max_tokens: int
) -> Dict[str, Any]:
"""Assemble the final context from chunks and facts."""
context_parts = []
sources = []
# Add similar chunks
for chunk in similar_chunks[:10]: # Limit to top 10
context_parts.append(chunk["content"])
sources.append({
"id": chunk["id"],
"type": "chunk",
"source_url": chunk.get("source_url"),
"confidence": chunk.get("similarity", 0.0)
})
# Add related facts
for fact in related_facts[:5]: # Limit to top 5
context_parts.append(fact["description"])
sources.append({
"id": fact["id"],
"type": "fact",
"relationship": fact.get("relationship"),
"confidence": fact.get("confidence", 0.0)
})
# Combine context parts
full_context = "\n\n".join(context_parts)
# Truncate if too long (rough token estimation)
if len(full_context) > max_tokens * 4: # Rough 4 chars per token
full_context = full_context[:max_tokens * 4]
return {
"text": full_context,
"sources": sources,
"confidence": 0.8 # Calculate based on similarity scores
}</code></pre>
</div>
</div>
</section>
<section class="mb-4">
<div class="phase-card">
<span class="phase-number">3</span>
<h3 class="phase-title">Storage Layer Implementation</h3>
<div class="phase-timeline">Week 3</div>
<span class="status status-planning">Planning</span>
<h4>3.1 Supabase Store Implementation</h4>
<div class="code-block">
<pre><code class="language-python"># src/core/storage/supabase_store.py
import asyncio
import asyncpg
from typing import List, Dict, Any, Optional
import json
import uuid
class SupabaseStore:
def __init__(self):
self.connection_pool = None
async def initialize(self):
"""Initialize database connection pool."""
self.connection_pool = await asyncpg.create_pool(
host="supabase",
port=5432,
database="langmem_db",
user="langmem_user",
password="secure_password_123",
min_size=2,
max_size=10
)
async def store_chunks(
self,
chunks: List[str],
embeddings: List[List[float]],
metadata: Dict[str, Any],
source_url: Optional[str] = None,
document_type: str = "text"
) -> List[str]:
"""Store document chunks with embeddings."""
chunk_ids = []
async with self.connection_pool.acquire() as conn:
for chunk, embedding in zip(chunks, embeddings):
chunk_id = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO documents (id, content, embedding, metadata, source_url, document_type)
VALUES ($1, $2, $3, $4, $5, $6)
""",
chunk_id,
chunk,
embedding,
json.dumps(metadata),
source_url,
document_type
)
chunk_ids.append(chunk_id)
return chunk_ids
async def similarity_search(
self,
query_embedding: List[float],
limit: int = 10,
filters: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""Perform vector similarity search."""
filters = filters or {}
query = """
SELECT id, content, metadata, source_url, document_type,
1 - (embedding <=> $1) as similarity
FROM documents
WHERE 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT $2
"""
async with self.connection_pool.acquire() as conn:
rows = await conn.fetch(query, query_embedding, limit)
results = []
for row in rows:
results.append({
"id": row["id"],
"content": row["content"],
"metadata": json.loads(row["metadata"]),
"source_url": row["source_url"],
"document_type": row["document_type"],
"similarity": row["similarity"]
})
return results</code></pre>
</div>
<h4>3.2 Neo4j Store Implementation</h4>
<div class="code-block">
<pre><code class="language-python"># src/core/storage/neo4j_store.py
from neo4j import AsyncGraphDatabase
from typing import List, Dict, Any, Optional
import logging
class Neo4jStore:
def __init__(self):
self.driver = None
async def initialize(self):
"""Initialize Neo4j connection."""
self.driver = AsyncGraphDatabase.driver(
"bolt://neo4j:7687",
auth=("neo4j", "secure_password_123")
)
async def store_entities(
self,
entities: List[Dict[str, Any]],
chunk_ids: List[str]
):
"""Store entities and their relationships."""
async with self.driver.session() as session:
for i, chunk_id in enumerate(chunk_ids):
# Create DocumentChunk node
await session.run(
"""
CREATE (doc:DocumentChunk {
id: $chunk_id,
supabase_id: $chunk_id,
created_at: datetime()
})
""",
chunk_id=chunk_id
)
# Create entity nodes and relationships
for entity in entities:
await self._create_entity_node(session, entity, chunk_id)
async def _create_entity_node(
self,
session,
entity: Dict[str, Any],
chunk_id: str
):
"""Create an entity node and link it to document chunk."""
entity_type = entity.get("type", "Entity")
entity_name = entity.get("name", "Unknown")
# Create entity node
await session.run(
f"""
MERGE (entity:{entity_type} {{name: $name}})
ON CREATE SET entity.created_at = datetime()
""",
name=entity_name
)
# Create relationship between document and entity
await session.run(
f"""
MATCH (doc:DocumentChunk {{id: $chunk_id}})
MATCH (entity:{entity_type} {{name: $name}})
CREATE (doc)-[:MENTIONS]->(entity)
""",
chunk_id=chunk_id,
name=entity_name
)
async def find_related_facts(
self,
entities: List[str],
max_depth: int = 2
) -> List[Dict[str, Any]]:
"""Find facts related to given entities."""
if not entities:
return []
query = """
MATCH (entity)
WHERE entity.name IN $entities
MATCH (entity)-[r*1..%d]-(related)
RETURN DISTINCT related.name as name,
labels(related) as labels,
type(r[0]) as relationship,
length(r) as depth
ORDER BY depth, name
LIMIT 50
""" % max_depth
async with self.driver.session() as session:
result = await session.run(query, entities=entities)
facts = []
async for record in result:
facts.append({
"id": f"fact_{len(facts)}",
"name": record["name"],
"type": record["labels"][0] if record["labels"] else "Entity",
"relationship": record["relationship"],
"depth": record["depth"],
"confidence": 1.0 - (record["depth"] * 0.2),
"description": f"Related {record['labels'][0] if record['labels'] else 'entity'}: {record['name']}"
})
return facts
async def close(self):
"""Close Neo4j connection."""
if self.driver:
await self.driver.close()</code></pre>
</div>
</div>
</section>
<section class="mb-4">
<h2>Phase 2: Testing & Validation</h2>
<div class="phase-card">
<span class="phase-number">4</span>
<h3 class="phase-title">Comprehensive Testing</h3>
<div class="phase-timeline">Weeks 4-5</div>
<span class="status status-planning">Planning</span>
<h4>4.1 Test Suite Setup</h4>
<div class="code-block">
<pre><code class="language-python"># tests/test_memory_manager.py
import pytest
import asyncio
from unittest.mock import Mock, patch
from src.core.memory_manager import MemoryManager
from src.core.models import IngestResult, QueryResult
@pytest.fixture
async def memory_manager():
"""Create memory manager instance for testing."""
manager = MemoryManager()
await manager.initialize()
yield manager
await manager.cleanup()
@pytest.mark.asyncio
async def test_document_ingestion(memory_manager):
"""Test document ingestion pipeline."""
content = "This is a test document about machine learning."
result = await memory_manager.ingest_document(
content=content,
document_type="text",
metadata={"source": "test"}
)
assert isinstance(result, IngestResult)
assert result.chunks_created > 0
assert result.document_id is not None
@pytest.mark.asyncio
async def test_context_retrieval(memory_manager):
"""Test context retrieval functionality."""
# First ingest a document
await memory_manager.ingest_document(
content="Python is a programming language used for AI development.",
document_type="text"
)
# Then query for context
result = await memory_manager.retrieve_context(
query="What is Python?",
max_tokens=1000
)
assert isinstance(result, QueryResult)
assert "Python" in result.context
assert len(result.sources) > 0
@pytest.mark.asyncio
async def test_hybrid_retrieval(memory_manager):
"""Test hybrid retrieval combining vector and graph search."""
# Ingest related documents
documents = [
"John works at OpenAI on machine learning research.",
"OpenAI developed GPT models for natural language processing.",
"Machine learning is a subset of artificial intelligence."
]
for doc in documents:
await memory_manager.ingest_document(content=doc, document_type="text")
# Query should find related information
result = await memory_manager.retrieve_context(
query="Tell me about John's work",
max_tokens=2000
)
assert "John" in result.context
assert "OpenAI" in result.context
assert len(result.relevant_facts) > 0</code></pre>
</div>
<h4>4.2 Integration Testing</h4>
<div class="code-block">
<pre><code class="language-python"># tests/test_integration.py
import pytest
import httpx
from fastapi.testclient import TestClient
from src.api.main import app
client = TestClient(app)
def test_health_endpoint():
"""Test health check endpoint."""
response = client.get("/v1/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_ingest_endpoint():
"""Test document ingestion endpoint."""
response = client.post(
"/v1/ingest",
json={
"content": "Test document content",
"document_type": "text",
"metadata": {"test": True}
},
headers={"X-API-Key": "langmem_api_key_2025"}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "document_id" in data
def test_retrieve_endpoint():
"""Test context retrieval endpoint."""
# First ingest a document
client.post(
"/v1/ingest",
json={"content": "Machine learning is fascinating"},
headers={"X-API-Key": "langmem_api_key_2025"}
)
# Then retrieve context
response = client.post(
"/v1/context/retrieve",
json={
"query": "What is machine learning?",
"max_tokens": 1000
},
headers={"X-API-Key": "langmem_api_key_2025"}
)
assert response.status_code == 200
data = response.json()
assert "context" in data
assert "sources" in data</code></pre>
</div>
</div>
</section>
<section class="mb-4">
<h2>Phase 3: MCP Server Transformation</h2>
<div class="phase-card">
<span class="phase-number">5</span>
<h3 class="phase-title">MCP Protocol Implementation</h3>
<div class="phase-timeline">Weeks 6-7</div>
<span class="status status-planning">Planning</span>
<h4>5.1 MCP Server Setup</h4>
<div class="code-block">
<pre><code class="language-python"># src/mcp/server.py
from typing import Dict, Any, List, Optional
import asyncio
import json
from dataclasses import dataclass
@dataclass
class MCPMessage:
"""MCP message structure."""
id: str
method: str
params: Dict[str, Any]
@dataclass
class MCPResponse:
"""MCP response structure."""
id: str
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
class MCPServer:
"""Model Context Protocol server implementation."""
def __init__(self, memory_manager):
self.memory_manager = memory_manager
self.capabilities = {
"tools": {
"memory_ingest": {
"description": "Ingest documents into memory",
"parameters": {
"type": "object",
"properties": {
"content": {"type": "string"},
"document_type": {"type": "string", "default": "text"},
"metadata": {"type": "object"}
},
"required": ["content"]
}
},
"memory_retrieve": {
"description": "Retrieve context from memory",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_tokens": {"type": "integer", "default": 4000}
},
"required": ["query"]
}
}
},
"resources": {
"memory_stats": {
"description": "Memory system statistics",
"uri": "memory://stats"
}
}
}
async def handle_message(self, message: MCPMessage) -> MCPResponse:
"""Handle incoming MCP message."""
try:
if message.method == "initialize":
return await self._handle_initialize(message)
elif message.method == "tools/list":
return await self._handle_tools_list(message)
elif message.method == "tools/call":
return await self._handle_tools_call(message)
elif message.method == "resources/list":
return await self._handle_resources_list(message)
elif message.method == "resources/read":
return await self._handle_resources_read(message)
else:
return MCPResponse(
id=message.id,
error={"code": -32601, "message": "Method not found"}
)
except Exception as e:
return MCPResponse(
id=message.id,
error={"code": -32603, "message": str(e)}
)
async def _handle_initialize(self, message: MCPMessage) -> MCPResponse:
"""Handle initialization request."""
return MCPResponse(
id=message.id,
result={
"protocolVersion": "2024-11-05",
"capabilities": self.capabilities,
"serverInfo": {
"name": "langmem",
"version": "1.0.0"
}
}
)
async def _handle_tools_call(self, message: MCPMessage) -> MCPResponse:
"""Handle tool execution request."""
tool_name = message.params.get("name")
arguments = message.params.get("arguments", {})
if tool_name == "memory_ingest":
result = await self.memory_manager.ingest_document(
content=arguments["content"],
document_type=arguments.get("document_type", "text"),
metadata=arguments.get("metadata", {})
)
return MCPResponse(
id=message.id,
result={"content": [{"type": "text", "text": f"Ingested document: {result.document_id}"}]}
)
elif tool_name == "memory_retrieve":
result = await self.memory_manager.retrieve_context(
query=arguments["query"],
max_tokens=arguments.get("max_tokens", 4000)
)
return MCPResponse(
id=message.id,
result={"content": [{"type": "text", "text": result.context}]}
)
return MCPResponse(
id=message.id,
error={"code": -32602, "message": "Unknown tool"}
)</code></pre>
</div>
</div>
</section>
<section class="mb-4">
<h2>Phase 4: Client Integration</h2>
<div class="phase-card">
<span class="phase-number">6</span>
<h3 class="phase-title">n8n & Claude Code Integration</h3>
<div class="phase-timeline">Weeks 8-9</div>
<span class="status status-planning">Planning</span>
<h4>6.1 n8n Workflow Example</h4>
<div class="code-block">
<pre><code class="language-json">{
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "webhook-langmem",
"responseMode": "responseNode",
"options": {}
},
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [200, 300]
},
{
"parameters": {
"requestMethod": "POST",
"url": "http://memory_service:8000/v1/ingest",
"jsonParameters": true,
"parametersJson": "{\n \"content\": \"{{ $json.content }}\",\n \"document_type\": \"{{ $json.type || 'text' }}\",\n \"metadata\": {\n \"source\": \"n8n\",\n \"timestamp\": \"{{ $now }}\"\n }\n}",
"options": {},
"headerParametersJson": "{\n \"X-API-Key\": \"langmem_api_key_2025\"\n}"
},
"name": "Ingest to Memory",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 1,
"position": [400, 300]
},
{
"parameters": {
"requestMethod": "POST",
"url": "http://memory_service:8000/v1/context/retrieve",
"jsonParameters": true,
"parametersJson": "{\n \"query\": \"{{ $json.query }}\",\n \"max_tokens\": 4000\n}",
"options": {},
"headerParametersJson": "{\n \"X-API-Key\": \"langmem_api_key_2025\"\n}"
},
"name": "Query Memory",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 1,
"position": [600, 300]
},
{
"parameters": {
"respondWith": "json",
"responseBody": "{{ $json }}"
},
"name": "Response",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1,
"position": [800, 300]
}
],
"connections": {
"Webhook": {
"main": [
[
{
"node": "Ingest to Memory",
"type": "main",
"index": 0
}
]
]
},
"Ingest to Memory": {
"main": [
[
{
"node": "Query Memory",
"type": "main",
"index": 0
}
]
]
},
"Query Memory": {
"main": [
[
{
"node": "Response",
"type": "main",
"index": 0
}
]
]
}
}
}</code></pre>
</div>
<h4>6.2 Deployment Script</h4>
<div class="code-block">
<pre><code class="language-bash">#!/bin/bash
# deploy.sh
set -e
echo "🚀 Starting LangMem deployment..."
# Create network if it doesn't exist
docker network ls | grep -q localai_network || docker network create localai_network
# Build and start services
docker-compose up -d --build
# Wait for services to be ready
echo "⏳ Waiting for services to be ready..."
sleep 30
# Initialize Ollama models
echo "🦙 Initializing Ollama models..."
docker exec langmem_ollama ollama pull llama3
docker exec langmem_ollama ollama pull nomic-embed-text
# Test the API
echo "🧪 Testing API endpoints..."
curl -X POST "http://localhost:8000/v1/ingest" \
-H "Content-Type: application/json" \
-H "X-API-Key: langmem_api_key_2025" \
-d '{"content": "Hello, LangMem!", "document_type": "text"}'
curl -X POST "http://localhost:8000/v1/context/retrieve" \
-H "Content-Type: application/json" \
-H "X-API-Key: langmem_api_key_2025" \
-d '{"query": "What is LangMem?", "max_tokens": 1000}'
echo "✅ LangMem deployment complete!"
echo "📡 API available at: http://localhost:8000"
echo "🔧 n8n integration ready on localai_network"</code></pre>
</div>
</div>
</section>
<section class="mb-4">
<h2>Success Metrics</h2>
<div class="grid grid-2">
<div class="card">
<h3>📊 Performance Metrics</h3>
<ul>
<li>Context retrieval < 2 seconds</li>
<li>Document ingestion < 5 seconds</li>
<li>Memory search accuracy > 80%</li>
<li>System uptime > 99%</li>
</ul>
</div>
<div class="card">
<h3>🎯 Functional Metrics</h3>
<ul>
<li>Successful n8n integration</li>
<li>MCP protocol compliance</li>
<li>Data consistency across stores</li>
<li>Comprehensive test coverage</li>
</ul>
</div>
</div>
</section>
<section class="text-center">
<h2>Ready to Begin?</h2>
<p class="mb-3">Start with Phase 1 and follow the step-by-step guide to build your LangMem system.</p>
<div class="cta-buttons">
<a href="../api/" class="btn btn-primary">📡 API Reference</a>
<a href="../architecture/" class="btn btn-secondary">🏗️ Architecture Details</a>
</div>
</section>
</main>
<footer style="text-align: center; padding: 2rem; margin-top: 4rem; color: var(--text-secondary);">
<p>&copy; 2025 LangMem Documentation. Built with modern web technologies.</p>
</footer>
<script src="../assets/js/main.js"></script>
</body>
</html>