Feature (OpenMemory): Add support for LLM and Embedding Providers in OpenMemory (#2794)
This commit is contained in:
@@ -1,3 +1,20 @@
|
||||
"""
|
||||
MCP Server for OpenMemory with resilient memory client handling.
|
||||
|
||||
This module implements an MCP (Model Context Protocol) server that provides
|
||||
memory operations for OpenMemory. The memory client is initialized lazily
|
||||
to prevent server crashes when external dependencies (like Ollama) are
|
||||
unavailable. If the memory client cannot be initialized, the server will
|
||||
continue running with limited functionality and appropriate error messages.
|
||||
|
||||
Key features:
|
||||
- Lazy memory client initialization
|
||||
- Graceful error handling for unavailable dependencies
|
||||
- Fallback to database-only mode when vector store is unavailable
|
||||
- Proper logging for debugging connection issues
|
||||
- Environment variable parsing for API keys
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
@@ -19,14 +36,17 @@ from qdrant_client import models as qdrant_models
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Initialize MCP and memory client
|
||||
# Initialize MCP
|
||||
mcp = FastMCP("mem0-mcp-server")
|
||||
|
||||
# Check if OpenAI API key is set
|
||||
if not os.getenv("OPENAI_API_KEY"):
|
||||
raise Exception("OPENAI_API_KEY is not set in .env file")
|
||||
|
||||
memory_client = get_memory_client()
|
||||
# Don't initialize memory client at import time - do it lazily when needed
|
||||
def get_memory_client_safe():
|
||||
"""Get memory client with error handling. Returns None if client cannot be initialized."""
|
||||
try:
|
||||
return get_memory_client()
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to get memory client: {e}")
|
||||
return None
|
||||
|
||||
# Context variables for user_id and client_name
|
||||
user_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("user_id")
|
||||
@@ -48,6 +68,11 @@ async def add_memories(text: str) -> str:
|
||||
if not client_name:
|
||||
return "Error: client_name not provided"
|
||||
|
||||
# Get memory client safely
|
||||
memory_client = get_memory_client_safe()
|
||||
if not memory_client:
|
||||
return "Error: Memory system is currently unavailable. Please try again later."
|
||||
|
||||
try:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
@@ -113,6 +138,7 @@ async def add_memories(text: str) -> str:
|
||||
finally:
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logging.exception(f"Error adding to memory: {e}")
|
||||
return f"Error adding to memory: {e}"
|
||||
|
||||
|
||||
@@ -124,6 +150,12 @@ async def search_memory(query: str) -> str:
|
||||
return "Error: user_id not provided"
|
||||
if not client_name:
|
||||
return "Error: client_name not provided"
|
||||
|
||||
# Get memory client safely
|
||||
memory_client = get_memory_client_safe()
|
||||
if not memory_client:
|
||||
return "Error: Memory system is currently unavailable. Please try again later."
|
||||
|
||||
try:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
@@ -216,6 +248,12 @@ async def list_memories() -> str:
|
||||
return "Error: user_id not provided"
|
||||
if not client_name:
|
||||
return "Error: client_name not provided"
|
||||
|
||||
# Get memory client safely
|
||||
memory_client = get_memory_client_safe()
|
||||
if not memory_client:
|
||||
return "Error: Memory system is currently unavailable. Please try again later."
|
||||
|
||||
try:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
@@ -267,6 +305,7 @@ async def list_memories() -> str:
|
||||
finally:
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logging.exception(f"Error getting memories: {e}")
|
||||
return f"Error getting memories: {e}"
|
||||
|
||||
|
||||
@@ -278,6 +317,12 @@ async def delete_all_memories() -> str:
|
||||
return "Error: user_id not provided"
|
||||
if not client_name:
|
||||
return "Error: client_name not provided"
|
||||
|
||||
# Get memory client safely
|
||||
memory_client = get_memory_client_safe()
|
||||
if not memory_client:
|
||||
return "Error: Memory system is currently unavailable. Please try again later."
|
||||
|
||||
try:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
@@ -289,7 +334,10 @@ async def delete_all_memories() -> str:
|
||||
|
||||
# delete the accessible memories only
|
||||
for memory_id in accessible_memory_ids:
|
||||
memory_client.delete(memory_id)
|
||||
try:
|
||||
memory_client.delete(memory_id)
|
||||
except Exception as delete_error:
|
||||
logging.warning(f"Failed to delete memory {memory_id} from vector store: {delete_error}")
|
||||
|
||||
# Update each memory's state and create history entries
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
@@ -322,6 +370,7 @@ async def delete_all_memories() -> str:
|
||||
finally:
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logging.exception(f"Error deleting memories: {e}")
|
||||
return f"Error deleting memories: {e}"
|
||||
|
||||
|
||||
|
||||
@@ -56,6 +56,17 @@ class App(Base):
|
||||
memories = relationship("Memory", back_populates="app")
|
||||
|
||||
|
||||
class Config(Base):
|
||||
__tablename__ = "configs"
|
||||
id = Column(UUID, primary_key=True, default=lambda: uuid.uuid4())
|
||||
key = Column(String, unique=True, nullable=False, index=True)
|
||||
value = Column(JSON, nullable=False)
|
||||
created_at = Column(DateTime, default=get_current_utc_time)
|
||||
updated_at = Column(DateTime,
|
||||
default=get_current_utc_time,
|
||||
onupdate=get_current_utc_time)
|
||||
|
||||
|
||||
class Memory(Base):
|
||||
__tablename__ = "memories"
|
||||
id = Column(UUID, primary_key=True, default=lambda: uuid.uuid4())
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from .memories import router as memories_router
|
||||
from .apps import router as apps_router
|
||||
from .stats import router as stats_router
|
||||
from .config import router as config_router
|
||||
|
||||
__all__ = ["memories_router", "apps_router", "stats_router"]
|
||||
__all__ = ["memories_router", "apps_router", "stats_router", "config_router"]
|
||||
240
openmemory/api/app/routers/config.py
Normal file
240
openmemory/api/app/routers/config.py
Normal file
@@ -0,0 +1,240 @@
|
||||
import os
|
||||
import json
|
||||
from typing import Dict, Any, Optional
|
||||
from fastapi import APIRouter, HTTPException, Depends
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.orm import Session
|
||||
from app.database import get_db
|
||||
from app.models import Config as ConfigModel
|
||||
from app.utils.memory import reset_memory_client
|
||||
|
||||
router = APIRouter(prefix="/api/v1/config", tags=["config"])
|
||||
|
||||
class LLMConfig(BaseModel):
|
||||
model: str = Field(..., description="LLM model name")
|
||||
temperature: float = Field(..., description="Temperature setting for the model")
|
||||
max_tokens: int = Field(..., description="Maximum tokens to generate")
|
||||
api_key: Optional[str] = Field(None, description="API key or 'env:API_KEY' to use environment variable")
|
||||
ollama_base_url: Optional[str] = Field(None, description="Base URL for Ollama server (e.g., http://host.docker.internal:11434)")
|
||||
|
||||
class LLMProvider(BaseModel):
|
||||
provider: str = Field(..., description="LLM provider name")
|
||||
config: LLMConfig
|
||||
|
||||
class EmbedderConfig(BaseModel):
|
||||
model: str = Field(..., description="Embedder model name")
|
||||
api_key: Optional[str] = Field(None, description="API key or 'env:API_KEY' to use environment variable")
|
||||
ollama_base_url: Optional[str] = Field(None, description="Base URL for Ollama server (e.g., http://host.docker.internal:11434)")
|
||||
|
||||
class EmbedderProvider(BaseModel):
|
||||
provider: str = Field(..., description="Embedder provider name")
|
||||
config: EmbedderConfig
|
||||
|
||||
class OpenMemoryConfig(BaseModel):
|
||||
custom_instructions: Optional[str] = Field(None, description="Custom instructions for memory management and fact extraction")
|
||||
|
||||
class Mem0Config(BaseModel):
|
||||
llm: Optional[LLMProvider] = None
|
||||
embedder: Optional[EmbedderProvider] = None
|
||||
|
||||
class ConfigSchema(BaseModel):
|
||||
openmemory: Optional[OpenMemoryConfig] = None
|
||||
mem0: Mem0Config
|
||||
|
||||
def get_default_configuration():
|
||||
"""Get the default configuration with sensible defaults for LLM and embedder."""
|
||||
return {
|
||||
"openmemory": {
|
||||
"custom_instructions": None
|
||||
},
|
||||
"mem0": {
|
||||
"llm": {
|
||||
"provider": "openai",
|
||||
"config": {
|
||||
"model": "gpt-4o-mini",
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 2000,
|
||||
"api_key": "env:OPENAI_API_KEY"
|
||||
}
|
||||
},
|
||||
"embedder": {
|
||||
"provider": "openai",
|
||||
"config": {
|
||||
"model": "text-embedding-3-small",
|
||||
"api_key": "env:OPENAI_API_KEY"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def get_config_from_db(db: Session, key: str = "main"):
|
||||
"""Get configuration from database."""
|
||||
config = db.query(ConfigModel).filter(ConfigModel.key == key).first()
|
||||
|
||||
if not config:
|
||||
# Create default config with proper provider configurations
|
||||
default_config = get_default_configuration()
|
||||
db_config = ConfigModel(key=key, value=default_config)
|
||||
db.add(db_config)
|
||||
db.commit()
|
||||
db.refresh(db_config)
|
||||
return default_config
|
||||
|
||||
# Ensure the config has all required sections with defaults
|
||||
config_value = config.value
|
||||
default_config = get_default_configuration()
|
||||
|
||||
# Merge with defaults to ensure all required fields exist
|
||||
if "openmemory" not in config_value:
|
||||
config_value["openmemory"] = default_config["openmemory"]
|
||||
|
||||
if "mem0" not in config_value:
|
||||
config_value["mem0"] = default_config["mem0"]
|
||||
else:
|
||||
# Ensure LLM config exists with defaults
|
||||
if "llm" not in config_value["mem0"] or config_value["mem0"]["llm"] is None:
|
||||
config_value["mem0"]["llm"] = default_config["mem0"]["llm"]
|
||||
|
||||
# Ensure embedder config exists with defaults
|
||||
if "embedder" not in config_value["mem0"] or config_value["mem0"]["embedder"] is None:
|
||||
config_value["mem0"]["embedder"] = default_config["mem0"]["embedder"]
|
||||
|
||||
# Save the updated config back to database if it was modified
|
||||
if config_value != config.value:
|
||||
config.value = config_value
|
||||
db.commit()
|
||||
db.refresh(config)
|
||||
|
||||
return config_value
|
||||
|
||||
def save_config_to_db(db: Session, config: Dict[str, Any], key: str = "main"):
|
||||
"""Save configuration to database."""
|
||||
db_config = db.query(ConfigModel).filter(ConfigModel.key == key).first()
|
||||
|
||||
if db_config:
|
||||
db_config.value = config
|
||||
db_config.updated_at = None # Will trigger the onupdate to set current time
|
||||
else:
|
||||
db_config = ConfigModel(key=key, value=config)
|
||||
db.add(db_config)
|
||||
|
||||
db.commit()
|
||||
db.refresh(db_config)
|
||||
return db_config.value
|
||||
|
||||
@router.get("/", response_model=ConfigSchema)
|
||||
async def get_configuration(db: Session = Depends(get_db)):
|
||||
"""Get the current configuration."""
|
||||
config = get_config_from_db(db)
|
||||
return config
|
||||
|
||||
@router.put("/", response_model=ConfigSchema)
|
||||
async def update_configuration(config: ConfigSchema, db: Session = Depends(get_db)):
|
||||
"""Update the configuration."""
|
||||
current_config = get_config_from_db(db)
|
||||
|
||||
# Convert to dict for processing
|
||||
updated_config = current_config.copy()
|
||||
|
||||
# Update openmemory settings if provided
|
||||
if config.openmemory is not None:
|
||||
if "openmemory" not in updated_config:
|
||||
updated_config["openmemory"] = {}
|
||||
updated_config["openmemory"].update(config.openmemory.dict(exclude_none=True))
|
||||
|
||||
# Update mem0 settings
|
||||
updated_config["mem0"] = config.mem0.dict(exclude_none=True)
|
||||
|
||||
# Save the configuration to database
|
||||
save_config_to_db(db, updated_config)
|
||||
reset_memory_client()
|
||||
return updated_config
|
||||
|
||||
@router.post("/reset", response_model=ConfigSchema)
|
||||
async def reset_configuration(db: Session = Depends(get_db)):
|
||||
"""Reset the configuration to default values."""
|
||||
try:
|
||||
# Get the default configuration with proper provider setups
|
||||
default_config = get_default_configuration()
|
||||
|
||||
# Save it as the current configuration in the database
|
||||
save_config_to_db(db, default_config)
|
||||
reset_memory_client()
|
||||
return default_config
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to reset configuration: {str(e)}"
|
||||
)
|
||||
|
||||
@router.get("/mem0/llm", response_model=LLMProvider)
|
||||
async def get_llm_configuration(db: Session = Depends(get_db)):
|
||||
"""Get only the LLM configuration."""
|
||||
config = get_config_from_db(db)
|
||||
llm_config = config.get("mem0", {}).get("llm", {})
|
||||
return llm_config
|
||||
|
||||
@router.put("/mem0/llm", response_model=LLMProvider)
|
||||
async def update_llm_configuration(llm_config: LLMProvider, db: Session = Depends(get_db)):
|
||||
"""Update only the LLM configuration."""
|
||||
current_config = get_config_from_db(db)
|
||||
|
||||
# Ensure mem0 key exists
|
||||
if "mem0" not in current_config:
|
||||
current_config["mem0"] = {}
|
||||
|
||||
# Update the LLM configuration
|
||||
current_config["mem0"]["llm"] = llm_config.dict(exclude_none=True)
|
||||
|
||||
# Save the configuration to database
|
||||
save_config_to_db(db, current_config)
|
||||
reset_memory_client()
|
||||
return current_config["mem0"]["llm"]
|
||||
|
||||
@router.get("/mem0/embedder", response_model=EmbedderProvider)
|
||||
async def get_embedder_configuration(db: Session = Depends(get_db)):
|
||||
"""Get only the Embedder configuration."""
|
||||
config = get_config_from_db(db)
|
||||
embedder_config = config.get("mem0", {}).get("embedder", {})
|
||||
return embedder_config
|
||||
|
||||
@router.put("/mem0/embedder", response_model=EmbedderProvider)
|
||||
async def update_embedder_configuration(embedder_config: EmbedderProvider, db: Session = Depends(get_db)):
|
||||
"""Update only the Embedder configuration."""
|
||||
current_config = get_config_from_db(db)
|
||||
|
||||
# Ensure mem0 key exists
|
||||
if "mem0" not in current_config:
|
||||
current_config["mem0"] = {}
|
||||
|
||||
# Update the Embedder configuration
|
||||
current_config["mem0"]["embedder"] = embedder_config.dict(exclude_none=True)
|
||||
|
||||
# Save the configuration to database
|
||||
save_config_to_db(db, current_config)
|
||||
reset_memory_client()
|
||||
return current_config["mem0"]["embedder"]
|
||||
|
||||
@router.get("/openmemory", response_model=OpenMemoryConfig)
|
||||
async def get_openmemory_configuration(db: Session = Depends(get_db)):
|
||||
"""Get only the OpenMemory configuration."""
|
||||
config = get_config_from_db(db)
|
||||
openmemory_config = config.get("openmemory", {})
|
||||
return openmemory_config
|
||||
|
||||
@router.put("/openmemory", response_model=OpenMemoryConfig)
|
||||
async def update_openmemory_configuration(openmemory_config: OpenMemoryConfig, db: Session = Depends(get_db)):
|
||||
"""Update only the OpenMemory configuration."""
|
||||
current_config = get_config_from_db(db)
|
||||
|
||||
# Ensure openmemory key exists
|
||||
if "openmemory" not in current_config:
|
||||
current_config["openmemory"] = {}
|
||||
|
||||
# Update the OpenMemory configuration
|
||||
current_config["openmemory"].update(openmemory_config.dict(exclude_none=True))
|
||||
|
||||
# Save the configuration to database
|
||||
save_config_to_db(db, current_config)
|
||||
reset_memory_client()
|
||||
return current_config["openmemory"]
|
||||
@@ -2,6 +2,7 @@ from datetime import datetime, UTC
|
||||
from typing import List, Optional, Set
|
||||
from uuid import UUID, uuid4
|
||||
import logging
|
||||
import os
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from sqlalchemy.orm import Session, joinedload
|
||||
from fastapi_pagination import Page, Params
|
||||
@@ -13,13 +14,11 @@ from app.utils.memory import get_memory_client
|
||||
from app.database import get_db
|
||||
from app.models import (
|
||||
Memory, MemoryState, MemoryAccessLog, App,
|
||||
MemoryStatusHistory, User, Category, AccessControl
|
||||
MemoryStatusHistory, User, Category, AccessControl, Config as ConfigModel
|
||||
)
|
||||
from app.schemas import MemoryResponse, PaginatedMemoryResponse
|
||||
from app.utils.permissions import check_memory_access_permissions
|
||||
|
||||
memory_client = get_memory_client()
|
||||
|
||||
router = APIRouter(prefix="/api/v1/memories", tags=["memories"])
|
||||
|
||||
|
||||
@@ -227,100 +226,79 @@ async def create_memory(
|
||||
# Log what we're about to do
|
||||
logging.info(f"Creating memory for user_id: {request.user_id} with app: {request.app}")
|
||||
|
||||
# Save to Qdrant via memory_client
|
||||
qdrant_response = memory_client.add(
|
||||
request.text,
|
||||
user_id=request.user_id, # Use string user_id to match search
|
||||
metadata={
|
||||
"source_app": "openmemory",
|
||||
"mcp_client": request.app,
|
||||
}
|
||||
)
|
||||
|
||||
# Log the response for debugging
|
||||
logging.info(f"Qdrant response: {qdrant_response}")
|
||||
|
||||
# Process Qdrant response
|
||||
if isinstance(qdrant_response, dict) and 'results' in qdrant_response:
|
||||
for result in qdrant_response['results']:
|
||||
if result['event'] == 'ADD':
|
||||
# Get the Qdrant-generated ID
|
||||
memory_id = UUID(result['id'])
|
||||
|
||||
# Check if memory already exists
|
||||
existing_memory = db.query(Memory).filter(Memory.id == memory_id).first()
|
||||
|
||||
if existing_memory:
|
||||
# Update existing memory
|
||||
existing_memory.state = MemoryState.active
|
||||
existing_memory.content = result['memory']
|
||||
memory = existing_memory
|
||||
else:
|
||||
# Create memory with the EXACT SAME ID from Qdrant
|
||||
memory = Memory(
|
||||
id=memory_id, # Use the same ID that Qdrant generated
|
||||
user_id=user.id,
|
||||
app_id=app_obj.id,
|
||||
content=result['memory'],
|
||||
metadata_=request.metadata,
|
||||
state=MemoryState.active
|
||||
)
|
||||
db.add(memory)
|
||||
|
||||
# Create history entry
|
||||
history = MemoryStatusHistory(
|
||||
memory_id=memory_id,
|
||||
changed_by=user.id,
|
||||
old_state=MemoryState.deleted if existing_memory else MemoryState.deleted,
|
||||
new_state=MemoryState.active
|
||||
)
|
||||
db.add(history)
|
||||
|
||||
db.commit()
|
||||
db.refresh(memory)
|
||||
return memory
|
||||
|
||||
# Fallback to traditional DB-only approach if Qdrant integration fails
|
||||
# Generate a random UUID for the memory
|
||||
memory_id = uuid4()
|
||||
memory = Memory(
|
||||
id=memory_id,
|
||||
user_id=user.id,
|
||||
app_id=app_obj.id,
|
||||
content=request.text,
|
||||
metadata_=request.metadata
|
||||
)
|
||||
db.add(memory)
|
||||
|
||||
# Create history entry
|
||||
history = MemoryStatusHistory(
|
||||
memory_id=memory_id,
|
||||
changed_by=user.id,
|
||||
old_state=MemoryState.deleted,
|
||||
new_state=MemoryState.active
|
||||
)
|
||||
db.add(history)
|
||||
|
||||
db.commit()
|
||||
db.refresh(memory)
|
||||
|
||||
# Attempt to add to Qdrant with the same ID we just created
|
||||
# Try to get memory client safely
|
||||
try:
|
||||
# Try to add with our specific ID
|
||||
memory_client.add(
|
||||
memory_client = get_memory_client()
|
||||
if not memory_client:
|
||||
raise Exception("Memory client is not available")
|
||||
except Exception as client_error:
|
||||
logging.warning(f"Memory client unavailable: {client_error}. Creating memory in database only.")
|
||||
# Return a json response with the error
|
||||
return {
|
||||
"error": str(client_error)
|
||||
}
|
||||
|
||||
# Try to save to Qdrant via memory_client
|
||||
try:
|
||||
qdrant_response = memory_client.add(
|
||||
request.text,
|
||||
memory_id=str(memory_id), # Specify the ID
|
||||
user_id=request.user_id,
|
||||
user_id=request.user_id, # Use string user_id to match search
|
||||
metadata={
|
||||
"source_app": "openmemory",
|
||||
"mcp_client": request.app,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to add to Qdrant in fallback path: {e}")
|
||||
# Continue anyway, the DB record is created
|
||||
|
||||
return memory
|
||||
|
||||
# Log the response for debugging
|
||||
logging.info(f"Qdrant response: {qdrant_response}")
|
||||
|
||||
# Process Qdrant response
|
||||
if isinstance(qdrant_response, dict) and 'results' in qdrant_response:
|
||||
for result in qdrant_response['results']:
|
||||
if result['event'] == 'ADD':
|
||||
# Get the Qdrant-generated ID
|
||||
memory_id = UUID(result['id'])
|
||||
|
||||
# Check if memory already exists
|
||||
existing_memory = db.query(Memory).filter(Memory.id == memory_id).first()
|
||||
|
||||
if existing_memory:
|
||||
# Update existing memory
|
||||
existing_memory.state = MemoryState.active
|
||||
existing_memory.content = result['memory']
|
||||
memory = existing_memory
|
||||
else:
|
||||
# Create memory with the EXACT SAME ID from Qdrant
|
||||
memory = Memory(
|
||||
id=memory_id, # Use the same ID that Qdrant generated
|
||||
user_id=user.id,
|
||||
app_id=app_obj.id,
|
||||
content=result['memory'],
|
||||
metadata_=request.metadata,
|
||||
state=MemoryState.active
|
||||
)
|
||||
db.add(memory)
|
||||
|
||||
# Create history entry
|
||||
history = MemoryStatusHistory(
|
||||
memory_id=memory_id,
|
||||
changed_by=user.id,
|
||||
old_state=MemoryState.deleted if existing_memory else MemoryState.deleted,
|
||||
new_state=MemoryState.active
|
||||
)
|
||||
db.add(history)
|
||||
|
||||
db.commit()
|
||||
db.refresh(memory)
|
||||
return memory
|
||||
except Exception as qdrant_error:
|
||||
logging.warning(f"Qdrant operation failed: {qdrant_error}.")
|
||||
# Return a json response with the error
|
||||
return {
|
||||
"error": str(qdrant_error)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
# Get memory by ID
|
||||
|
||||
@@ -1,9 +1,193 @@
|
||||
"""
|
||||
Memory client utilities for OpenMemory.
|
||||
|
||||
This module provides functionality to initialize and manage the Mem0 memory client
|
||||
with automatic configuration management and Docker environment support.
|
||||
|
||||
Docker Ollama Configuration:
|
||||
When running inside a Docker container and using Ollama as the LLM or embedder provider,
|
||||
the system automatically detects the Docker environment and adjusts localhost URLs
|
||||
to properly reach the host machine where Ollama is running.
|
||||
|
||||
Supported Docker host resolution (in order of preference):
|
||||
1. OLLAMA_HOST environment variable (if set)
|
||||
2. host.docker.internal (Docker Desktop for Mac/Windows)
|
||||
3. Docker bridge gateway IP (typically 172.17.0.1 on Linux)
|
||||
4. Fallback to 172.17.0.1
|
||||
|
||||
Example configuration that will be automatically adjusted:
|
||||
{
|
||||
"llm": {
|
||||
"provider": "ollama",
|
||||
"config": {
|
||||
"model": "llama3.1:latest",
|
||||
"ollama_base_url": "http://localhost:11434" # Auto-adjusted in Docker
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import hashlib
|
||||
import socket
|
||||
import platform
|
||||
|
||||
from mem0 import Memory
|
||||
from app.database import SessionLocal
|
||||
from app.models import Config as ConfigModel
|
||||
|
||||
|
||||
memory_client = None
|
||||
_memory_client = None
|
||||
_config_hash = None
|
||||
|
||||
|
||||
def _get_config_hash(config_dict):
|
||||
"""Generate a hash of the config to detect changes."""
|
||||
config_str = json.dumps(config_dict, sort_keys=True)
|
||||
return hashlib.md5(config_str.encode()).hexdigest()
|
||||
|
||||
|
||||
def _get_docker_host_url():
|
||||
"""
|
||||
Determine the appropriate host URL to reach host machine from inside Docker container.
|
||||
Returns the best available option for reaching the host from inside a container.
|
||||
"""
|
||||
# Check for custom environment variable first
|
||||
custom_host = os.environ.get('OLLAMA_HOST')
|
||||
if custom_host:
|
||||
print(f"Using custom Ollama host from OLLAMA_HOST: {custom_host}")
|
||||
return custom_host.replace('http://', '').replace('https://', '').split(':')[0]
|
||||
|
||||
# Check if we're running inside Docker
|
||||
if not os.path.exists('/.dockerenv'):
|
||||
# Not in Docker, return localhost as-is
|
||||
return "localhost"
|
||||
|
||||
print("Detected Docker environment, adjusting host URL for Ollama...")
|
||||
|
||||
# Try different host resolution strategies
|
||||
host_candidates = []
|
||||
|
||||
# 1. host.docker.internal (works on Docker Desktop for Mac/Windows)
|
||||
try:
|
||||
socket.gethostbyname('host.docker.internal')
|
||||
host_candidates.append('host.docker.internal')
|
||||
print("Found host.docker.internal")
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
# 2. Docker bridge gateway (typically 172.17.0.1 on Linux)
|
||||
try:
|
||||
with open('/proc/net/route', 'r') as f:
|
||||
for line in f:
|
||||
fields = line.strip().split()
|
||||
if fields[1] == '00000000': # Default route
|
||||
gateway_hex = fields[2]
|
||||
gateway_ip = socket.inet_ntoa(bytes.fromhex(gateway_hex)[::-1])
|
||||
host_candidates.append(gateway_ip)
|
||||
print(f"Found Docker gateway: {gateway_ip}")
|
||||
break
|
||||
except (FileNotFoundError, IndexError, ValueError):
|
||||
pass
|
||||
|
||||
# 3. Fallback to common Docker bridge IP
|
||||
if not host_candidates:
|
||||
host_candidates.append('172.17.0.1')
|
||||
print("Using fallback Docker bridge IP: 172.17.0.1")
|
||||
|
||||
# Return the first available candidate
|
||||
return host_candidates[0]
|
||||
|
||||
|
||||
def _fix_ollama_urls(config_section):
|
||||
"""
|
||||
Fix Ollama URLs for Docker environment.
|
||||
Replaces localhost URLs with appropriate Docker host URLs.
|
||||
Sets default ollama_base_url if not provided.
|
||||
"""
|
||||
if not config_section or "config" not in config_section:
|
||||
return config_section
|
||||
|
||||
ollama_config = config_section["config"]
|
||||
|
||||
# Set default ollama_base_url if not provided
|
||||
if "ollama_base_url" not in ollama_config:
|
||||
ollama_config["ollama_base_url"] = "http://host.docker.internal:11434"
|
||||
else:
|
||||
# Check for ollama_base_url and fix if it's localhost
|
||||
url = ollama_config["ollama_base_url"]
|
||||
if "localhost" in url or "127.0.0.1" in url:
|
||||
docker_host = _get_docker_host_url()
|
||||
if docker_host != "localhost":
|
||||
new_url = url.replace("localhost", docker_host).replace("127.0.0.1", docker_host)
|
||||
ollama_config["ollama_base_url"] = new_url
|
||||
print(f"Adjusted Ollama URL from {url} to {new_url}")
|
||||
|
||||
return config_section
|
||||
|
||||
|
||||
def reset_memory_client():
|
||||
"""Reset the global memory client to force reinitialization with new config."""
|
||||
global _memory_client, _config_hash
|
||||
_memory_client = None
|
||||
_config_hash = None
|
||||
|
||||
|
||||
def get_default_memory_config():
|
||||
"""Get default memory client configuration with sensible defaults."""
|
||||
return {
|
||||
"vector_store": {
|
||||
"provider": "qdrant",
|
||||
"config": {
|
||||
"collection_name": "openmemory",
|
||||
"host": "mem0_store",
|
||||
"port": 6333,
|
||||
}
|
||||
},
|
||||
"llm": {
|
||||
"provider": "openai",
|
||||
"config": {
|
||||
"model": "gpt-4o-mini",
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 2000,
|
||||
"api_key": "env:OPENAI_API_KEY"
|
||||
}
|
||||
},
|
||||
"embedder": {
|
||||
"provider": "openai",
|
||||
"config": {
|
||||
"model": "text-embedding-3-small",
|
||||
"api_key": "env:OPENAI_API_KEY"
|
||||
}
|
||||
},
|
||||
"version": "v1.1"
|
||||
}
|
||||
|
||||
|
||||
def _parse_environment_variables(config_dict):
|
||||
"""
|
||||
Parse environment variables in config values.
|
||||
Converts 'env:VARIABLE_NAME' to actual environment variable values.
|
||||
"""
|
||||
if isinstance(config_dict, dict):
|
||||
parsed_config = {}
|
||||
for key, value in config_dict.items():
|
||||
if isinstance(value, str) and value.startswith("env:"):
|
||||
env_var = value.split(":", 1)[1]
|
||||
env_value = os.environ.get(env_var)
|
||||
if env_value:
|
||||
parsed_config[key] = env_value
|
||||
print(f"Loaded {env_var} from environment for {key}")
|
||||
else:
|
||||
print(f"Warning: Environment variable {env_var} not found, keeping original value")
|
||||
parsed_config[key] = value
|
||||
elif isinstance(value, dict):
|
||||
parsed_config[key] = _parse_environment_variables(value)
|
||||
else:
|
||||
parsed_config[key] = value
|
||||
return parsed_config
|
||||
return config_dict
|
||||
|
||||
|
||||
def get_memory_client(custom_instructions: str = None):
|
||||
@@ -14,37 +198,94 @@ def get_memory_client(custom_instructions: str = None):
|
||||
custom_instructions: Optional instructions for the memory project.
|
||||
|
||||
Returns:
|
||||
Initialized Mem0 client instance.
|
||||
Initialized Mem0 client instance or None if initialization fails.
|
||||
|
||||
Raises:
|
||||
Exception: If required API keys are not set.
|
||||
Exception: If required API keys are not set or critical configuration is missing.
|
||||
"""
|
||||
global memory_client
|
||||
|
||||
if memory_client is not None:
|
||||
return memory_client
|
||||
global _memory_client, _config_hash
|
||||
|
||||
try:
|
||||
config = {
|
||||
"vector_store": {
|
||||
"provider": "qdrant",
|
||||
"config": {
|
||||
"collection_name": "openmemory",
|
||||
"host": "mem0_store",
|
||||
"port": 6333,
|
||||
}
|
||||
}
|
||||
}
|
||||
# Start with default configuration
|
||||
config = get_default_memory_config()
|
||||
|
||||
# Variable to track custom instructions
|
||||
db_custom_instructions = None
|
||||
|
||||
# Load configuration from database
|
||||
try:
|
||||
db = SessionLocal()
|
||||
db_config = db.query(ConfigModel).filter(ConfigModel.key == "main").first()
|
||||
|
||||
if db_config:
|
||||
json_config = db_config.value
|
||||
|
||||
# Extract custom instructions from openmemory settings
|
||||
if "openmemory" in json_config and "custom_instructions" in json_config["openmemory"]:
|
||||
db_custom_instructions = json_config["openmemory"]["custom_instructions"]
|
||||
|
||||
# Override defaults with configurations from the database
|
||||
if "mem0" in json_config:
|
||||
mem0_config = json_config["mem0"]
|
||||
|
||||
# Update LLM configuration if available
|
||||
if "llm" in mem0_config and mem0_config["llm"] is not None:
|
||||
config["llm"] = mem0_config["llm"]
|
||||
|
||||
# Fix Ollama URLs for Docker if needed
|
||||
if config["llm"].get("provider") == "ollama":
|
||||
config["llm"] = _fix_ollama_urls(config["llm"])
|
||||
|
||||
# Update Embedder configuration if available
|
||||
if "embedder" in mem0_config and mem0_config["embedder"] is not None:
|
||||
config["embedder"] = mem0_config["embedder"]
|
||||
|
||||
# Fix Ollama URLs for Docker if needed
|
||||
if config["embedder"].get("provider") == "ollama":
|
||||
config["embedder"] = _fix_ollama_urls(config["embedder"])
|
||||
else:
|
||||
print("No configuration found in database, using defaults")
|
||||
|
||||
db.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Warning: Error loading configuration from database: {e}")
|
||||
print("Using default configuration")
|
||||
# Continue with default configuration if database config can't be loaded
|
||||
|
||||
memory_client = Memory.from_config(config_dict=config)
|
||||
except Exception:
|
||||
raise Exception("Exception occurred while initializing memory client")
|
||||
# Use custom_instructions parameter first, then fall back to database value
|
||||
instructions_to_use = custom_instructions or db_custom_instructions
|
||||
if instructions_to_use:
|
||||
config["custom_fact_extraction_prompt"] = instructions_to_use
|
||||
|
||||
# Update project with custom instructions if provided
|
||||
if custom_instructions:
|
||||
memory_client.update_project(custom_instructions=custom_instructions)
|
||||
# ALWAYS parse environment variables in the final config
|
||||
# This ensures that even default config values like "env:OPENAI_API_KEY" get parsed
|
||||
print("Parsing environment variables in final config...")
|
||||
config = _parse_environment_variables(config)
|
||||
|
||||
return memory_client
|
||||
# Check if config has changed by comparing hashes
|
||||
current_config_hash = _get_config_hash(config)
|
||||
|
||||
# Only reinitialize if config changed or client doesn't exist
|
||||
if _memory_client is None or _config_hash != current_config_hash:
|
||||
print(f"Initializing memory client with config hash: {current_config_hash}")
|
||||
try:
|
||||
_memory_client = Memory.from_config(config_dict=config)
|
||||
_config_hash = current_config_hash
|
||||
print("Memory client initialized successfully")
|
||||
except Exception as init_error:
|
||||
print(f"Warning: Failed to initialize memory client: {init_error}")
|
||||
print("Server will continue running with limited memory functionality")
|
||||
_memory_client = None
|
||||
_config_hash = None
|
||||
return None
|
||||
|
||||
return _memory_client
|
||||
|
||||
except Exception as e:
|
||||
print(f"Warning: Exception occurred while initializing memory client: {e}")
|
||||
print("Server will continue running with limited memory functionality")
|
||||
return None
|
||||
|
||||
|
||||
def get_default_user_id():
|
||||
|
||||
Reference in New Issue
Block a user