From 931df14e257c0b78e5218d37f0bf524e40dacbfc Mon Sep 17 00:00:00 2001 From: Saket Aryan Date: Fri, 16 May 2025 22:11:22 +0530 Subject: [PATCH] fix(OMM): Memories not appearing in MCP clients added from Dashboard (#2704) Co-authored-by: Deshraj Yadav --- openmemory/api/app/mcp_server.py | 20 ++++-- openmemory/api/app/routers/memories.py | 96 +++++++++++++++++++++++++- 2 files changed, 108 insertions(+), 8 deletions(-) diff --git a/openmemory/api/app/mcp_server.py b/openmemory/api/app/mcp_server.py index fa6863a4..672a9a52 100644 --- a/openmemory/api/app/mcp_server.py +++ b/openmemory/api/app/mcp_server.py @@ -38,7 +38,7 @@ mcp_router = APIRouter(prefix="/mcp") # Initialize SSE transport sse = SseServerTransport("/mcp/messages/") -@mcp.tool(description="Add new memories to the user's memory") +@mcp.tool(description="Add a new memory. This method is called everytime the user informs anything about themselves, their preferences, or anything that has any relevent information whcih can be useful in the future conversation. This can also be called when the user asks you to remember something.") async def add_memories(text: str) -> str: uid = user_id_var.get(None) client_name = client_name_var.get(None) @@ -116,7 +116,7 @@ async def add_memories(text: str) -> str: return f"Error adding to memory: {e}" -@mcp.tool(description="Search the user's memory for memories that match the query") +@mcp.tool(description="Search through stored memories. This method is called EVERYTIME the user asks anything.") async def search_memory(query: str) -> str: uid = user_id_var.get(None) client_name = client_name_var.get(None) @@ -133,16 +133,17 @@ async def search_memory(query: str) -> str: # Get accessible memory IDs based on ACL user_memories = db.query(Memory).filter(Memory.user_id == user.id).all() accessible_memory_ids = [memory.id for memory in user_memories if check_memory_access_permissions(db, memory, app.id)] + conditions = [qdrant_models.FieldCondition(key="user_id", match=qdrant_models.MatchValue(value=uid))] - logging.info(f"Accessible memory IDs: {accessible_memory_ids}") - logging.info(f"Conditions: {conditions}") + if accessible_memory_ids: # Convert UUIDs to strings for Qdrant accessible_memory_ids_str = [str(memory_id) for memory_id in accessible_memory_ids] conditions.append(qdrant_models.HasIdCondition(has_id=accessible_memory_ids_str)) + filters = qdrant_models.Filter(must=conditions) - logging.info(f"Filters: {filters}") embeddings = memory_client.embedding_model.embed(query, "search") + hits = memory_client.vector_store.client.query_points( collection_name=memory_client.vector_store.collection_name, query=embeddings, @@ -150,6 +151,7 @@ async def search_memory(query: str) -> str: limit=10, ) + # Process search results memories = hits.points memories = [ { @@ -350,7 +352,15 @@ async def handle_sse(request: Request): client_name_var.reset(client_token) +@mcp_router.post("/messages/") +async def handle_get_message(request: Request): + return await handle_post_message(request) + + @mcp_router.post("/{client_name}/sse/{user_id}/messages/") +async def handle_post_message(request: Request): + return await handle_post_message(request) + async def handle_post_message(request: Request): """Handle POST messages for SSE""" try: diff --git a/openmemory/api/app/routers/memories.py b/openmemory/api/app/routers/memories.py index 29a942db..66500352 100644 --- a/openmemory/api/app/routers/memories.py +++ b/openmemory/api/app/routers/memories.py @@ -1,12 +1,14 @@ from datetime import datetime, UTC from typing import List, Optional, Set -from uuid import UUID +from uuid import UUID, uuid4 +import logging from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session, joinedload from fastapi_pagination import Page, Params from fastapi_pagination.ext.sqlalchemy import paginate as sqlalchemy_paginate from pydantic import BaseModel from sqlalchemy import or_, func +from app.utils.memory import get_memory_client from app.database import get_db from app.models import ( @@ -16,6 +18,8 @@ from app.models import ( from app.schemas import MemoryResponse, PaginatedMemoryResponse from app.utils.permissions import check_memory_access_permissions +memory_client = get_memory_client() + router = APIRouter(prefix="/api/v1/memories", tags=["memories"]) @@ -220,16 +224,102 @@ async def create_memory( if not app_obj.is_active: raise HTTPException(status_code=403, detail=f"App {request.app} is currently paused on OpenMemory. Cannot create new memories.") - # Create memory + # Log what we're about to do + logging.info(f"Creating memory for user_id: {request.user_id} with app: {request.app}") + + # Save to Qdrant via memory_client + qdrant_response = memory_client.add( + request.text, + user_id=request.user_id, # Use string user_id to match search + metadata={ + "source_app": "openmemory", + "mcp_client": request.app, + } + ) + + # Log the response for debugging + logging.info(f"Qdrant response: {qdrant_response}") + + # Process Qdrant response + if isinstance(qdrant_response, dict) and 'results' in qdrant_response: + for result in qdrant_response['results']: + if result['event'] == 'ADD': + # Get the Qdrant-generated ID + memory_id = UUID(result['id']) + + # Check if memory already exists + existing_memory = db.query(Memory).filter(Memory.id == memory_id).first() + + if existing_memory: + # Update existing memory + existing_memory.state = MemoryState.active + existing_memory.content = result['memory'] + memory = existing_memory + else: + # Create memory with the EXACT SAME ID from Qdrant + memory = Memory( + id=memory_id, # Use the same ID that Qdrant generated + user_id=user.id, + app_id=app_obj.id, + content=result['memory'], + metadata_=request.metadata, + state=MemoryState.active + ) + db.add(memory) + + # Create history entry + history = MemoryStatusHistory( + memory_id=memory_id, + changed_by=user.id, + old_state=MemoryState.deleted if existing_memory else MemoryState.deleted, + new_state=MemoryState.active + ) + db.add(history) + + db.commit() + db.refresh(memory) + return memory + + # Fallback to traditional DB-only approach if Qdrant integration fails + # Generate a random UUID for the memory + memory_id = uuid4() memory = Memory( + id=memory_id, user_id=user.id, app_id=app_obj.id, content=request.text, metadata_=request.metadata ) db.add(memory) + + # Create history entry + history = MemoryStatusHistory( + memory_id=memory_id, + changed_by=user.id, + old_state=MemoryState.deleted, + new_state=MemoryState.active + ) + db.add(history) + db.commit() db.refresh(memory) + + # Attempt to add to Qdrant with the same ID we just created + try: + # Try to add with our specific ID + memory_client.add( + request.text, + memory_id=str(memory_id), # Specify the ID + user_id=request.user_id, + metadata={ + "source_app": "openmemory", + "mcp_client": request.app, + } + ) + except Exception as e: + logging.error(f"Failed to add to Qdrant in fallback path: {e}") + # Continue anyway, the DB record is created + return memory @@ -572,4 +662,4 @@ async def get_related_memories( ) for memory in items ] - ) + ) \ No newline at end of file