Implementation Guide

Complete step-by-step guide to implementing the LangMem long-term memory system from scratch to production.

Prerequisites

๐Ÿ› ๏ธ Development Environment

  • Docker & Docker Compose
  • Python 3.11+
  • Node.js 18+ (for n8n integration)
  • Git version control

๐Ÿ“ฆ System Requirements

  • 8GB+ RAM (16GB recommended)
  • 50GB+ storage space
  • GPU optional (for better Ollama performance)
  • Network access to localai network

Phase 1: Core API Development

1

Project Setup & Structure

Week 1
In Progress

1.1 Create Project Structure

# Create project directory
mkdir -p /home/klas/langmem
cd /home/klas/langmem

# Initialize project structure
mkdir -p {src,tests,docs,scripts,config}
mkdir -p src/{api,core,storage,models}

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Create requirements.txt
cat > requirements.txt << 'EOF'
fastapi==0.104.1
uvicorn==0.24.0
langchain==0.1.0
langmem==0.1.0
neo4j==5.15.0
psycopg2-binary==2.9.9
pgvector==0.2.4
ollama==0.1.7
pydantic==2.5.0
python-dotenv==1.0.0
pytest==7.4.3
pytest-asyncio==0.21.1
httpx==0.25.2
EOF

# Install dependencies
pip install -r requirements.txt

1.2 Docker Infrastructure Setup

# docker-compose.yml
version: '3.8'
services:
  ollama:
    image: ollama/ollama:latest
    container_name: langmem_ollama
    ports:
      - "11434:11434"
    volumes:
      - ollama_data:/root/.ollama
    networks:
      - localai_network
    environment:
      - OLLAMA_HOST=0.0.0.0
    restart: unless-stopped

  supabase:
    image: supabase/postgres:latest
    container_name: langmem_supabase
    environment:
      - POSTGRES_DB=langmem_db
      - POSTGRES_USER=langmem_user
      - POSTGRES_PASSWORD=secure_password_123
      - POSTGRES_HOST_AUTH_METHOD=trust
    volumes:
      - supabase_data:/var/lib/postgresql/data
      - ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
    networks:
      - localai_network
    restart: unless-stopped

  neo4j:
    image: neo4j:5.15-community
    container_name: langmem_neo4j
    environment:
      - NEO4J_AUTH=neo4j/secure_password_123
      - NEO4J_HEAP_SIZE=1G
      - NEO4J_PAGECACHE_SIZE=1G
    volumes:
      - neo4j_data:/data
      - neo4j_logs:/logs
      - neo4j_plugins:/plugins
    networks:
      - localai_network
    restart: unless-stopped

  memory_service:
    build: .
    container_name: langmem_api
    ports:
      - "8000:8000"
    depends_on:
      - supabase
      - neo4j
      - ollama
    environment:
      - SUPABASE_URL=postgresql://langmem_user:secure_password_123@supabase:5432/langmem_db
      - NEO4J_URI=bolt://neo4j:7687
      - NEO4J_USERNAME=neo4j
      - NEO4J_PASSWORD=secure_password_123
      - OLLAMA_URL=http://ollama:11434
      - API_KEY=langmem_api_key_2025
    volumes:
      - ./src:/app/src
      - ./config:/app/config
    networks:
      - localai_network
    restart: unless-stopped

networks:
  localai_network:
    external: true

volumes:
  ollama_data:
  supabase_data:
  neo4j_data:
  neo4j_logs:
  neo4j_plugins:

1.3 Database Schema Setup

-- scripts/init_db.sql
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Create documents table with vector embeddings
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    content TEXT NOT NULL,
    embedding VECTOR(1536),
    metadata JSONB DEFAULT '{}',
    source_url TEXT,
    document_type VARCHAR(50) DEFAULT 'text',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Create vector index for similarity search
CREATE INDEX documents_embedding_idx ON documents 
USING ivfflat (embedding vector_cosine_ops) 
WITH (lists = 100);

-- Create metadata index for filtering
CREATE INDEX documents_metadata_idx ON documents 
USING gin (metadata);

-- Create search index for full-text search
CREATE INDEX documents_content_idx ON documents 
USING gin (to_tsvector('english', content));

-- Create function to update updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger to automatically update updated_at
CREATE TRIGGER update_documents_updated_at 
    BEFORE UPDATE ON documents 
    FOR EACH ROW 
    EXECUTE FUNCTION update_updated_at_column();
2

Core API Implementation

Week 2
In Progress

2.1 FastAPI Application Structure

# src/api/main.py
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import os
from typing import Optional

from .routers import memory, health
from .middleware import authentication
from ..core.config import settings
from ..core.database import init_databases

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize databases
    await init_databases()
    yield
    # Cleanup if needed

app = FastAPI(
    title="LangMem API",
    description="Long-term memory system for LLM projects",
    version="1.0.0",
    lifespan=lifespan
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Add authentication middleware
app.add_middleware(authentication.APIKeyMiddleware)

# Include routers
app.include_router(memory.router, prefix="/v1", tags=["memory"])
app.include_router(health.router, prefix="/v1", tags=["health"])

@app.get("/")
async def root():
    return {"message": "LangMem API", "version": "1.0.0"}

2.2 Memory Router Implementation

# src/api/routers/memory.py
from fastapi import APIRouter, HTTPException, Depends
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
import uuid

from ...core.memory_manager import MemoryManager
from ...core.models import IngestRequest, QueryRequest, QueryResponse

router = APIRouter()

class IngestRequest(BaseModel):
    content: str
    document_type: str = "text"
    metadata: Dict[str, Any] = {}
    source_url: Optional[str] = None

class QueryRequest(BaseModel):
    query: str
    max_tokens: int = 4000
    conversation_history: List[Dict[str, str]] = []
    filters: Dict[str, Any] = {}

class QueryResponse(BaseModel):
    context: str
    sources: List[Dict[str, Any]]
    relevant_facts: List[Dict[str, Any]]
    confidence_score: float

@router.post("/ingest", response_model=Dict[str, Any])
async def ingest_document(
    request: IngestRequest,
    memory_manager: MemoryManager = Depends(get_memory_manager)
):
    """Ingest a new document into the memory system."""
    try:
        result = await memory_manager.ingest_document(
            content=request.content,
            document_type=request.document_type,
            metadata=request.metadata,
            source_url=request.source_url
        )
        return {
            "status": "success",
            "document_id": str(result.document_id),
            "chunks_created": result.chunks_created,
            "entities_extracted": result.entities_extracted
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/context/retrieve", response_model=QueryResponse)
async def retrieve_context(
    request: QueryRequest,
    memory_manager: MemoryManager = Depends(get_memory_manager)
):
    """Retrieve augmented context for a query."""
    try:
        result = await memory_manager.retrieve_context(
            query=request.query,
            max_tokens=request.max_tokens,
            conversation_history=request.conversation_history,
            filters=request.filters
        )
        return QueryResponse(
            context=result.context,
            sources=result.sources,
            relevant_facts=result.relevant_facts,
            confidence_score=result.confidence_score
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/tools/{tool_name}/execute")
async def execute_tool(
    tool_name: str,
    params: Dict[str, Any],
    memory_manager: MemoryManager = Depends(get_memory_manager)
):
    """Execute a specific memory tool."""
    try:
        result = await memory_manager.execute_tool(tool_name, params)
        return {"result": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

async def get_memory_manager() -> MemoryManager:
    """Dependency to get memory manager instance."""
    return MemoryManager()

2.3 Memory Manager Core Logic

# src/core/memory_manager.py
import asyncio
from typing import Dict, Any, List, Optional
from langchain.memory import ConversationBufferMemory
from langchain.embeddings import OllamaEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter

from .storage.supabase_store import SupabaseStore
from .storage.neo4j_store import Neo4jStore
from .models import IngestResult, QueryResult

class MemoryManager:
    def __init__(self):
        self.supabase_store = SupabaseStore()
        self.neo4j_store = Neo4jStore()
        self.embeddings = OllamaEmbeddings(
            model="llama3",
            base_url="http://ollama:11434"
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
    
    async def ingest_document(
        self,
        content: str,
        document_type: str = "text",
        metadata: Dict[str, Any] = None,
        source_url: Optional[str] = None
    ) -> IngestResult:
        """Ingest a document into the memory system."""
        metadata = metadata or {}
        
        # Step 1: Split document into chunks
        chunks = self.text_splitter.split_text(content)
        
        # Step 2: Generate embeddings for chunks
        embeddings = await self._generate_embeddings(chunks)
        
        # Step 3: Store chunks and embeddings in Supabase
        chunk_ids = await self.supabase_store.store_chunks(
            chunks=chunks,
            embeddings=embeddings,
            metadata=metadata,
            source_url=source_url,
            document_type=document_type
        )
        
        # Step 4: Extract entities and relationships
        entities = await self._extract_entities(chunks)
        
        # Step 5: Store entities and relationships in Neo4j
        await self.neo4j_store.store_entities(
            entities=entities,
            chunk_ids=chunk_ids
        )
        
        return IngestResult(
            document_id=chunk_ids[0],  # Use first chunk ID as document ID
            chunks_created=len(chunks),
            entities_extracted=len(entities)
        )
    
    async def retrieve_context(
        self,
        query: str,
        max_tokens: int = 4000,
        conversation_history: List[Dict[str, str]] = None,
        filters: Dict[str, Any] = None
    ) -> QueryResult:
        """Retrieve augmented context for a query."""
        conversation_history = conversation_history or []
        filters = filters or {}
        
        # Step 1: Generate query embedding
        query_embedding = await self._generate_embeddings([query])
        
        # Step 2: Semantic search in Supabase
        similar_chunks = await self.supabase_store.similarity_search(
            query_embedding=query_embedding[0],
            limit=20,
            filters=filters
        )
        
        # Step 3: Extract entities from similar chunks
        entities = await self._extract_entities_from_chunks(similar_chunks)
        
        # Step 4: Graph traversal in Neo4j
        related_facts = await self.neo4j_store.find_related_facts(
            entities=entities,
            max_depth=2
        )
        
        # Step 5: Assemble context
        context = await self._assemble_context(
            query=query,
            similar_chunks=similar_chunks,
            related_facts=related_facts,
            max_tokens=max_tokens
        )
        
        return QueryResult(
            context=context["text"],
            sources=context["sources"],
            relevant_facts=related_facts,
            confidence_score=context["confidence"]
        )
    
    async def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a list of texts."""
        return await self.embeddings.aembed_documents(texts)
    
    async def _extract_entities(self, chunks: List[str]) -> List[Dict[str, Any]]:
        """Extract entities from text chunks."""
        # This would use Ollama to extract entities
        # For now, return empty list
        return []
    
    async def _extract_entities_from_chunks(self, chunks: List[Dict[str, Any]]) -> List[str]:
        """Extract entities from retrieved chunks."""
        # Extract entities from chunk metadata or content
        entities = []
        for chunk in chunks:
            if "entities" in chunk.get("metadata", {}):
                entities.extend(chunk["metadata"]["entities"])
        return list(set(entities))
    
    async def _assemble_context(
        self,
        query: str,
        similar_chunks: List[Dict[str, Any]],
        related_facts: List[Dict[str, Any]],
        max_tokens: int
    ) -> Dict[str, Any]:
        """Assemble the final context from chunks and facts."""
        context_parts = []
        sources = []
        
        # Add similar chunks
        for chunk in similar_chunks[:10]:  # Limit to top 10
            context_parts.append(chunk["content"])
            sources.append({
                "id": chunk["id"],
                "type": "chunk",
                "source_url": chunk.get("source_url"),
                "confidence": chunk.get("similarity", 0.0)
            })
        
        # Add related facts
        for fact in related_facts[:5]:  # Limit to top 5
            context_parts.append(fact["description"])
            sources.append({
                "id": fact["id"],
                "type": "fact",
                "relationship": fact.get("relationship"),
                "confidence": fact.get("confidence", 0.0)
            })
        
        # Combine context parts
        full_context = "\n\n".join(context_parts)
        
        # Truncate if too long (rough token estimation)
        if len(full_context) > max_tokens * 4:  # Rough 4 chars per token
            full_context = full_context[:max_tokens * 4]
        
        return {
            "text": full_context,
            "sources": sources,
            "confidence": 0.8  # Calculate based on similarity scores
        }
3

Storage Layer Implementation

Week 3
Planning

3.1 Supabase Store Implementation

# src/core/storage/supabase_store.py
import asyncio
import asyncpg
from typing import List, Dict, Any, Optional
import json
import uuid

class SupabaseStore:
    def __init__(self):
        self.connection_pool = None
    
    async def initialize(self):
        """Initialize database connection pool."""
        self.connection_pool = await asyncpg.create_pool(
            host="supabase",
            port=5432,
            database="langmem_db",
            user="langmem_user",
            password="secure_password_123",
            min_size=2,
            max_size=10
        )
    
    async def store_chunks(
        self,
        chunks: List[str],
        embeddings: List[List[float]],
        metadata: Dict[str, Any],
        source_url: Optional[str] = None,
        document_type: str = "text"
    ) -> List[str]:
        """Store document chunks with embeddings."""
        chunk_ids = []
        
        async with self.connection_pool.acquire() as conn:
            for chunk, embedding in zip(chunks, embeddings):
                chunk_id = str(uuid.uuid4())
                
                await conn.execute(
                    """
                    INSERT INTO documents (id, content, embedding, metadata, source_url, document_type)
                    VALUES ($1, $2, $3, $4, $5, $6)
                    """,
                    chunk_id,
                    chunk,
                    embedding,
                    json.dumps(metadata),
                    source_url,
                    document_type
                )
                
                chunk_ids.append(chunk_id)
        
        return chunk_ids
    
    async def similarity_search(
        self,
        query_embedding: List[float],
        limit: int = 10,
        filters: Dict[str, Any] = None
    ) -> List[Dict[str, Any]]:
        """Perform vector similarity search."""
        filters = filters or {}
        
        query = """
        SELECT id, content, metadata, source_url, document_type,
               1 - (embedding <=> $1) as similarity
        FROM documents
        WHERE 1 - (embedding <=> $1) > 0.7
        ORDER BY embedding <=> $1
        LIMIT $2
        """
        
        async with self.connection_pool.acquire() as conn:
            rows = await conn.fetch(query, query_embedding, limit)
            
            results = []
            for row in rows:
                results.append({
                    "id": row["id"],
                    "content": row["content"],
                    "metadata": json.loads(row["metadata"]),
                    "source_url": row["source_url"],
                    "document_type": row["document_type"],
                    "similarity": row["similarity"]
                })
            
            return results

3.2 Neo4j Store Implementation

# src/core/storage/neo4j_store.py
from neo4j import AsyncGraphDatabase
from typing import List, Dict, Any, Optional
import logging

class Neo4jStore:
    def __init__(self):
        self.driver = None
    
    async def initialize(self):
        """Initialize Neo4j connection."""
        self.driver = AsyncGraphDatabase.driver(
            "bolt://neo4j:7687",
            auth=("neo4j", "secure_password_123")
        )
    
    async def store_entities(
        self,
        entities: List[Dict[str, Any]],
        chunk_ids: List[str]
    ):
        """Store entities and their relationships."""
        async with self.driver.session() as session:
            for i, chunk_id in enumerate(chunk_ids):
                # Create DocumentChunk node
                await session.run(
                    """
                    CREATE (doc:DocumentChunk {
                        id: $chunk_id,
                        supabase_id: $chunk_id,
                        created_at: datetime()
                    })
                    """,
                    chunk_id=chunk_id
                )
                
                # Create entity nodes and relationships
                for entity in entities:
                    await self._create_entity_node(session, entity, chunk_id)
    
    async def _create_entity_node(
        self,
        session,
        entity: Dict[str, Any],
        chunk_id: str
    ):
        """Create an entity node and link it to document chunk."""
        entity_type = entity.get("type", "Entity")
        entity_name = entity.get("name", "Unknown")
        
        # Create entity node
        await session.run(
            f"""
            MERGE (entity:{entity_type} {{name: $name}})
            ON CREATE SET entity.created_at = datetime()
            """,
            name=entity_name
        )
        
        # Create relationship between document and entity
        await session.run(
            f"""
            MATCH (doc:DocumentChunk {{id: $chunk_id}})
            MATCH (entity:{entity_type} {{name: $name}})
            CREATE (doc)-[:MENTIONS]->(entity)
            """,
            chunk_id=chunk_id,
            name=entity_name
        )
    
    async def find_related_facts(
        self,
        entities: List[str],
        max_depth: int = 2
    ) -> List[Dict[str, Any]]:
        """Find facts related to given entities."""
        if not entities:
            return []
        
        query = """
        MATCH (entity)
        WHERE entity.name IN $entities
        MATCH (entity)-[r*1..%d]-(related)
        RETURN DISTINCT related.name as name, 
               labels(related) as labels,
               type(r[0]) as relationship,
               length(r) as depth
        ORDER BY depth, name
        LIMIT 50
        """ % max_depth
        
        async with self.driver.session() as session:
            result = await session.run(query, entities=entities)
            
            facts = []
            async for record in result:
                facts.append({
                    "id": f"fact_{len(facts)}",
                    "name": record["name"],
                    "type": record["labels"][0] if record["labels"] else "Entity",
                    "relationship": record["relationship"],
                    "depth": record["depth"],
                    "confidence": 1.0 - (record["depth"] * 0.2),
                    "description": f"Related {record['labels'][0] if record['labels'] else 'entity'}: {record['name']}"
                })
            
            return facts
    
    async def close(self):
        """Close Neo4j connection."""
        if self.driver:
            await self.driver.close()

Phase 2: Testing & Validation

4

Comprehensive Testing

Weeks 4-5
Planning

4.1 Test Suite Setup

# tests/test_memory_manager.py
import pytest
import asyncio
from unittest.mock import Mock, patch
from src.core.memory_manager import MemoryManager
from src.core.models import IngestResult, QueryResult

@pytest.fixture
async def memory_manager():
    """Create memory manager instance for testing."""
    manager = MemoryManager()
    await manager.initialize()
    yield manager
    await manager.cleanup()

@pytest.mark.asyncio
async def test_document_ingestion(memory_manager):
    """Test document ingestion pipeline."""
    content = "This is a test document about machine learning."
    
    result = await memory_manager.ingest_document(
        content=content,
        document_type="text",
        metadata={"source": "test"}
    )
    
    assert isinstance(result, IngestResult)
    assert result.chunks_created > 0
    assert result.document_id is not None

@pytest.mark.asyncio
async def test_context_retrieval(memory_manager):
    """Test context retrieval functionality."""
    # First ingest a document
    await memory_manager.ingest_document(
        content="Python is a programming language used for AI development.",
        document_type="text"
    )
    
    # Then query for context
    result = await memory_manager.retrieve_context(
        query="What is Python?",
        max_tokens=1000
    )
    
    assert isinstance(result, QueryResult)
    assert "Python" in result.context
    assert len(result.sources) > 0

@pytest.mark.asyncio
async def test_hybrid_retrieval(memory_manager):
    """Test hybrid retrieval combining vector and graph search."""
    # Ingest related documents
    documents = [
        "John works at OpenAI on machine learning research.",
        "OpenAI developed GPT models for natural language processing.",
        "Machine learning is a subset of artificial intelligence."
    ]
    
    for doc in documents:
        await memory_manager.ingest_document(content=doc, document_type="text")
    
    # Query should find related information
    result = await memory_manager.retrieve_context(
        query="Tell me about John's work",
        max_tokens=2000
    )
    
    assert "John" in result.context
    assert "OpenAI" in result.context
    assert len(result.relevant_facts) > 0

4.2 Integration Testing

# tests/test_integration.py
import pytest
import httpx
from fastapi.testclient import TestClient
from src.api.main import app

client = TestClient(app)

def test_health_endpoint():
    """Test health check endpoint."""
    response = client.get("/v1/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_ingest_endpoint():
    """Test document ingestion endpoint."""
    response = client.post(
        "/v1/ingest",
        json={
            "content": "Test document content",
            "document_type": "text",
            "metadata": {"test": True}
        },
        headers={"X-API-Key": "langmem_api_key_2025"}
    )
    
    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "success"
    assert "document_id" in data

def test_retrieve_endpoint():
    """Test context retrieval endpoint."""
    # First ingest a document
    client.post(
        "/v1/ingest",
        json={"content": "Machine learning is fascinating"},
        headers={"X-API-Key": "langmem_api_key_2025"}
    )
    
    # Then retrieve context
    response = client.post(
        "/v1/context/retrieve",
        json={
            "query": "What is machine learning?",
            "max_tokens": 1000
        },
        headers={"X-API-Key": "langmem_api_key_2025"}
    )
    
    assert response.status_code == 200
    data = response.json()
    assert "context" in data
    assert "sources" in data

Phase 3: MCP Server Transformation

5

MCP Protocol Implementation

Weeks 6-7
Planning

5.1 MCP Server Setup

# src/mcp/server.py
from typing import Dict, Any, List, Optional
import asyncio
import json
from dataclasses import dataclass

@dataclass
class MCPMessage:
    """MCP message structure."""
    id: str
    method: str
    params: Dict[str, Any]
    
@dataclass
class MCPResponse:
    """MCP response structure."""
    id: str
    result: Optional[Dict[str, Any]] = None
    error: Optional[Dict[str, Any]] = None

class MCPServer:
    """Model Context Protocol server implementation."""
    
    def __init__(self, memory_manager):
        self.memory_manager = memory_manager
        self.capabilities = {
            "tools": {
                "memory_ingest": {
                    "description": "Ingest documents into memory",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "content": {"type": "string"},
                            "document_type": {"type": "string", "default": "text"},
                            "metadata": {"type": "object"}
                        },
                        "required": ["content"]
                    }
                },
                "memory_retrieve": {
                    "description": "Retrieve context from memory",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string"},
                            "max_tokens": {"type": "integer", "default": 4000}
                        },
                        "required": ["query"]
                    }
                }
            },
            "resources": {
                "memory_stats": {
                    "description": "Memory system statistics",
                    "uri": "memory://stats"
                }
            }
        }
    
    async def handle_message(self, message: MCPMessage) -> MCPResponse:
        """Handle incoming MCP message."""
        try:
            if message.method == "initialize":
                return await self._handle_initialize(message)
            elif message.method == "tools/list":
                return await self._handle_tools_list(message)
            elif message.method == "tools/call":
                return await self._handle_tools_call(message)
            elif message.method == "resources/list":
                return await self._handle_resources_list(message)
            elif message.method == "resources/read":
                return await self._handle_resources_read(message)
            else:
                return MCPResponse(
                    id=message.id,
                    error={"code": -32601, "message": "Method not found"}
                )
        except Exception as e:
            return MCPResponse(
                id=message.id,
                error={"code": -32603, "message": str(e)}
            )
    
    async def _handle_initialize(self, message: MCPMessage) -> MCPResponse:
        """Handle initialization request."""
        return MCPResponse(
            id=message.id,
            result={
                "protocolVersion": "2024-11-05",
                "capabilities": self.capabilities,
                "serverInfo": {
                    "name": "langmem",
                    "version": "1.0.0"
                }
            }
        )
    
    async def _handle_tools_call(self, message: MCPMessage) -> MCPResponse:
        """Handle tool execution request."""
        tool_name = message.params.get("name")
        arguments = message.params.get("arguments", {})
        
        if tool_name == "memory_ingest":
            result = await self.memory_manager.ingest_document(
                content=arguments["content"],
                document_type=arguments.get("document_type", "text"),
                metadata=arguments.get("metadata", {})
            )
            return MCPResponse(
                id=message.id,
                result={"content": [{"type": "text", "text": f"Ingested document: {result.document_id}"}]}
            )
        
        elif tool_name == "memory_retrieve":
            result = await self.memory_manager.retrieve_context(
                query=arguments["query"],
                max_tokens=arguments.get("max_tokens", 4000)
            )
            return MCPResponse(
                id=message.id,
                result={"content": [{"type": "text", "text": result.context}]}
            )
        
        return MCPResponse(
            id=message.id,
            error={"code": -32602, "message": "Unknown tool"}
        )

Phase 4: Client Integration

6

n8n & Claude Code Integration

Weeks 8-9
Planning

6.1 n8n Workflow Example

{
  "nodes": [
    {
      "parameters": {
        "httpMethod": "POST",
        "path": "webhook-langmem",
        "responseMode": "responseNode",
        "options": {}
      },
      "name": "Webhook",
      "type": "n8n-nodes-base.webhook",
      "typeVersion": 1,
      "position": [200, 300]
    },
    {
      "parameters": {
        "requestMethod": "POST",
        "url": "http://memory_service:8000/v1/ingest",
        "jsonParameters": true,
        "parametersJson": "{\n  \"content\": \"{{ $json.content }}\",\n  \"document_type\": \"{{ $json.type || 'text' }}\",\n  \"metadata\": {\n    \"source\": \"n8n\",\n    \"timestamp\": \"{{ $now }}\"\n  }\n}",
        "options": {},
        "headerParametersJson": "{\n  \"X-API-Key\": \"langmem_api_key_2025\"\n}"
      },
      "name": "Ingest to Memory",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 1,
      "position": [400, 300]
    },
    {
      "parameters": {
        "requestMethod": "POST",
        "url": "http://memory_service:8000/v1/context/retrieve",
        "jsonParameters": true,
        "parametersJson": "{\n  \"query\": \"{{ $json.query }}\",\n  \"max_tokens\": 4000\n}",
        "options": {},
        "headerParametersJson": "{\n  \"X-API-Key\": \"langmem_api_key_2025\"\n}"
      },
      "name": "Query Memory",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 1,
      "position": [600, 300]
    },
    {
      "parameters": {
        "respondWith": "json",
        "responseBody": "{{ $json }}"
      },
      "name": "Response",
      "type": "n8n-nodes-base.respondToWebhook",
      "typeVersion": 1,
      "position": [800, 300]
    }
  ],
  "connections": {
    "Webhook": {
      "main": [
        [
          {
            "node": "Ingest to Memory",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Ingest to Memory": {
      "main": [
        [
          {
            "node": "Query Memory",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Query Memory": {
      "main": [
        [
          {
            "node": "Response",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  }
}

6.2 Deployment Script

#!/bin/bash
# deploy.sh

set -e

echo "๐Ÿš€ Starting LangMem deployment..."

# Create network if it doesn't exist
docker network ls | grep -q localai_network || docker network create localai_network

# Build and start services
docker-compose up -d --build

# Wait for services to be ready
echo "โณ Waiting for services to be ready..."
sleep 30

# Initialize Ollama models
echo "๐Ÿฆ™ Initializing Ollama models..."
docker exec langmem_ollama ollama pull llama3
docker exec langmem_ollama ollama pull nomic-embed-text

# Test the API
echo "๐Ÿงช Testing API endpoints..."
curl -X POST "http://localhost:8000/v1/ingest" \
  -H "Content-Type: application/json" \
  -H "X-API-Key: langmem_api_key_2025" \
  -d '{"content": "Hello, LangMem!", "document_type": "text"}'

curl -X POST "http://localhost:8000/v1/context/retrieve" \
  -H "Content-Type: application/json" \
  -H "X-API-Key: langmem_api_key_2025" \
  -d '{"query": "What is LangMem?", "max_tokens": 1000}'

echo "โœ… LangMem deployment complete!"
echo "๐Ÿ“ก API available at: http://localhost:8000"
echo "๐Ÿ”ง n8n integration ready on localai_network"

Success Metrics

๐Ÿ“Š Performance Metrics

  • Context retrieval < 2 seconds
  • Document ingestion < 5 seconds
  • Memory search accuracy > 80%
  • System uptime > 99%

๐ŸŽฏ Functional Metrics

  • Successful n8n integration
  • MCP protocol compliance
  • Data consistency across stores
  • Comprehensive test coverage

Ready to Begin?

Start with Phase 1 and follow the step-by-step guide to build your LangMem system.