1810 lines
73 KiB
Python
1810 lines
73 KiB
Python
import asyncio
|
|
import concurrent
|
|
import gc
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import uuid
|
|
import warnings
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Optional
|
|
|
|
import pytz
|
|
from pydantic import ValidationError
|
|
|
|
from mem0.configs.base import MemoryConfig, MemoryItem
|
|
from mem0.configs.enums import MemoryType
|
|
from mem0.configs.prompts import (
|
|
PROCEDURAL_MEMORY_SYSTEM_PROMPT,
|
|
get_update_memory_messages,
|
|
)
|
|
from mem0.memory.base import MemoryBase
|
|
from mem0.memory.setup import mem0_dir, setup_config
|
|
from mem0.memory.storage import SQLiteManager
|
|
from mem0.memory.telemetry import capture_event
|
|
from mem0.memory.utils import (
|
|
get_fact_retrieval_messages,
|
|
parse_messages,
|
|
parse_vision_messages,
|
|
remove_code_blocks,
|
|
)
|
|
from mem0.utils.factory import EmbedderFactory, LlmFactory, VectorStoreFactory
|
|
|
|
|
|
def _build_filters_and_metadata(
|
|
*, # Enforce keyword-only arguments
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
actor_id: Optional[str] = None, # For query-time filtering
|
|
input_metadata: Optional[Dict[str, Any]] = None,
|
|
input_filters: Optional[Dict[str, Any]] = None,
|
|
) -> tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""
|
|
Constructs metadata for storage and filters for querying based on session and actor identifiers.
|
|
|
|
This helper ties every memory/query to exactly one session id (`user_id`, `agent_id`, or `run_id`) and optionally narrows queries to a specific `actor_id`. It returns two dicts:
|
|
|
|
|
|
1. `base_metadata_template`: Used as a template for metadata when storing new memories.
|
|
It includes the primary session identifier(s) and any `input_metadata`.
|
|
2. `effective_query_filters`: Used for querying existing memories. It includes the
|
|
primary session identifier(s), any `input_filters`, and a resolved actor
|
|
identifier for targeted filtering if specified by any actor-related inputs.
|
|
|
|
Actor filtering precedence: explicit `actor_id` arg → `filters["actor_id"]`
|
|
This resolved actor ID is used for querying but is not added to `base_metadata_template`,
|
|
as the actor for storage is typically derived from message content at a later stage.
|
|
|
|
Args:
|
|
user_id (Optional[str]): User identifier, primarily for Classic Mode session scoping.
|
|
agent_id (Optional[str]): Agent identifier, for Classic Mode session scoping or
|
|
as auxiliary information in Group Mode.
|
|
run_id (Optional[str]): Run identifier, for Classic Mode session scoping or
|
|
as auxiliary information in Group Mode.
|
|
actor_id (Optional[str]): Explicit actor identifier, used as a potential source for
|
|
actor-specific filtering. See actor resolution precedence in the main description.
|
|
input_metadata (Optional[Dict[str, Any]]): Base dictionary to be augmented with
|
|
session identifiers for the storage metadata template. Defaults to an empty dict.
|
|
input_filters (Optional[Dict[str, Any]]): Base dictionary to be augmented with
|
|
session and actor identifiers for query filters. Defaults to an empty dict.
|
|
|
|
Returns:
|
|
tuple[Dict[str, Any], Dict[str, Any]]: A tuple containing:
|
|
- base_metadata_template (Dict[str, Any]): Metadata template for storing memories,
|
|
scoped to the determined session.
|
|
- effective_query_filters (Dict[str, Any]): Filters for querying memories,
|
|
scoped to the determined session and potentially a resolved actor.
|
|
"""
|
|
|
|
base_metadata_template = deepcopy(input_metadata) if input_metadata else {}
|
|
effective_query_filters = deepcopy(input_filters) if input_filters else {}
|
|
|
|
# ---------- resolve session id (mandatory) ----------
|
|
session_key, session_val = None, None
|
|
if user_id:
|
|
session_key, session_val = "user_id", user_id
|
|
elif agent_id:
|
|
session_key, session_val = "agent_id", agent_id
|
|
elif run_id:
|
|
session_key, session_val = "run_id", run_id
|
|
|
|
if session_key is None:
|
|
raise ValueError("One of 'user_id', 'agent_id', or 'run_id' must be provided.")
|
|
|
|
base_metadata_template[session_key] = session_val
|
|
effective_query_filters[session_key] = session_val
|
|
|
|
# ---------- optional actor filter ----------
|
|
resolved_actor_id = actor_id or effective_query_filters.get("actor_id")
|
|
if resolved_actor_id:
|
|
effective_query_filters["actor_id"] = resolved_actor_id
|
|
|
|
return base_metadata_template, effective_query_filters
|
|
|
|
|
|
setup_config()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Memory(MemoryBase):
|
|
def __init__(self, config: MemoryConfig = MemoryConfig()):
|
|
self.config = config
|
|
|
|
self.custom_fact_extraction_prompt = self.config.custom_fact_extraction_prompt
|
|
self.custom_update_memory_prompt = self.config.custom_update_memory_prompt
|
|
self.embedding_model = EmbedderFactory.create(
|
|
self.config.embedder.provider,
|
|
self.config.embedder.config,
|
|
self.config.vector_store.config,
|
|
)
|
|
self.vector_store = VectorStoreFactory.create(
|
|
self.config.vector_store.provider, self.config.vector_store.config
|
|
)
|
|
self.llm = LlmFactory.create(self.config.llm.provider, self.config.llm.config)
|
|
self.db = SQLiteManager(self.config.history_db_path)
|
|
self.collection_name = self.config.vector_store.config.collection_name
|
|
self.api_version = self.config.version
|
|
|
|
self.enable_graph = False
|
|
|
|
if self.config.graph_store.config:
|
|
if self.config.graph_store.provider == "memgraph":
|
|
from mem0.memory.memgraph_memory import MemoryGraph
|
|
else:
|
|
from mem0.memory.graph_memory import MemoryGraph
|
|
|
|
self.graph = MemoryGraph(self.config)
|
|
self.enable_graph = True
|
|
else:
|
|
self.graph = None
|
|
self.config.vector_store.config.collection_name = "mem0migrations"
|
|
if self.config.vector_store.provider in ["faiss", "qdrant"]:
|
|
provider_path = f"migrations_{self.config.vector_store.provider}"
|
|
self.config.vector_store.config.path = os.path.join(mem0_dir, provider_path)
|
|
os.makedirs(self.config.vector_store.config.path, exist_ok=True)
|
|
self._telemetry_vector_store = VectorStoreFactory.create(
|
|
self.config.vector_store.provider, self.config.vector_store.config
|
|
)
|
|
capture_event("mem0.init", self, {"sync_type": "sync"})
|
|
|
|
@classmethod
|
|
def from_config(cls, config_dict: Dict[str, Any]):
|
|
try:
|
|
config = cls._process_config(config_dict)
|
|
config = MemoryConfig(**config_dict)
|
|
except ValidationError as e:
|
|
logger.error(f"Configuration validation error: {e}")
|
|
raise
|
|
return cls(config)
|
|
|
|
@staticmethod
|
|
def _process_config(config_dict: Dict[str, Any]) -> Dict[str, Any]:
|
|
if "graph_store" in config_dict:
|
|
if "vector_store" not in config_dict and "embedder" in config_dict:
|
|
config_dict["vector_store"] = {}
|
|
config_dict["vector_store"]["config"] = {}
|
|
config_dict["vector_store"]["config"]["embedding_model_dims"] = config_dict["embedder"]["config"][
|
|
"embedding_dims"
|
|
]
|
|
try:
|
|
return config_dict
|
|
except ValidationError as e:
|
|
logger.error(f"Configuration validation error: {e}")
|
|
raise
|
|
|
|
def add(
|
|
self,
|
|
messages,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
infer: bool = True,
|
|
memory_type: Optional[str] = None,
|
|
prompt: Optional[str] = None,
|
|
):
|
|
"""
|
|
Create a new memory.
|
|
|
|
Adds new memories scoped to a single session id (e.g. `user_id`, `agent_id`, or `run_id`). One of those ids is required.
|
|
|
|
Args:
|
|
messages (str or List[Dict[str, str]]): The message content or list of messages
|
|
(e.g., `[{"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi"}]`)
|
|
to be processed and stored.
|
|
user_id (str, optional): ID of the user creating the memory. Defaults to None.
|
|
agent_id (str, optional): ID of the agent creating the memory. Defaults to None.
|
|
run_id (str, optional): ID of the run creating the memory. Defaults to None.
|
|
metadata (dict, optional): Metadata to store with the memory. Defaults to None.
|
|
infer (bool, optional): If True (default), an LLM is used to extract key facts from
|
|
'messages' and decide whether to add, update, or delete related memories.
|
|
If False, 'messages' are added as raw memories directly.
|
|
memory_type (str, optional): Specifies the type of memory. Currently, only
|
|
`MemoryType.PROCEDURAL.value` ("procedural_memory") is explicitly handled for
|
|
creating procedural memories (typically requires 'agent_id'). Otherwise, memories
|
|
are treated as general conversational/factual memories.memory_type (str, optional): Type of memory to create. Defaults to None. By default, it creates the short term memories and long term (semantic and episodic) memories. Pass "procedural_memory" to create procedural memories.
|
|
prompt (str, optional): Prompt to use for the memory creation. Defaults to None.
|
|
|
|
|
|
Returns:
|
|
dict: A dictionary containing the result of the memory addition operation, typically
|
|
including a list of memory items affected (added, updated) under a "results" key,
|
|
and potentially "relations" if graph store is enabled.
|
|
Example for v1.1+: `{"results": [{"id": "...", "memory": "...", "event": "ADD"}]}`
|
|
"""
|
|
|
|
processed_metadata, effective_filters = _build_filters_and_metadata(
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
input_metadata=metadata,
|
|
)
|
|
|
|
if memory_type is not None and memory_type != MemoryType.PROCEDURAL.value:
|
|
raise ValueError(
|
|
f"Invalid 'memory_type'. Please pass {MemoryType.PROCEDURAL.value} to create procedural memories."
|
|
)
|
|
|
|
if isinstance(messages, str):
|
|
messages = [{"role": "user", "content": messages}]
|
|
|
|
elif isinstance(messages, dict):
|
|
messages = [messages]
|
|
|
|
elif not isinstance(messages, list):
|
|
raise ValueError("messages must be str, dict, or list[dict]")
|
|
|
|
if agent_id is not None and memory_type == MemoryType.PROCEDURAL.value:
|
|
results = self._create_procedural_memory(messages, metadata=processed_metadata, prompt=prompt)
|
|
return results
|
|
|
|
if self.config.llm.config.get("enable_vision"):
|
|
messages = parse_vision_messages(messages, self.llm, self.config.llm.config.get("vision_details"))
|
|
else:
|
|
messages = parse_vision_messages(messages)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future1 = executor.submit(self._add_to_vector_store, messages, processed_metadata, effective_filters, infer)
|
|
future2 = executor.submit(self._add_to_graph, messages, effective_filters)
|
|
|
|
concurrent.futures.wait([future1, future2])
|
|
|
|
vector_store_result = future1.result()
|
|
graph_result = future2.result()
|
|
|
|
if self.api_version == "v1.0":
|
|
warnings.warn(
|
|
"The current add API output format is deprecated. "
|
|
"To use the latest format, set `api_version='v1.1'`. "
|
|
"The current format will be removed in mem0ai 1.1.0 and later versions.",
|
|
category=DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return vector_store_result
|
|
|
|
if self.enable_graph:
|
|
return {
|
|
"results": vector_store_result,
|
|
"relations": graph_result,
|
|
}
|
|
|
|
return {"results": vector_store_result}
|
|
|
|
def _add_to_vector_store(self, messages, metadata, filters, infer):
|
|
if not infer:
|
|
returned_memories = []
|
|
for message_dict in messages:
|
|
if not isinstance(message_dict, dict) or \
|
|
message_dict.get("role") is None or \
|
|
message_dict.get("content") is None:
|
|
logger.warning(f"Skipping invalid message format: {message_dict}")
|
|
continue
|
|
|
|
if message_dict["role"] == "system":
|
|
continue
|
|
|
|
|
|
per_msg_meta = deepcopy(metadata)
|
|
per_msg_meta["role"] = message_dict["role"]
|
|
|
|
|
|
actor_name = message_dict.get("name")
|
|
if actor_name:
|
|
per_msg_meta["actor_id"] = actor_name
|
|
|
|
msg_content = message_dict["content"]
|
|
msg_embeddings = self.embedding_model.embed(msg_content, "add")
|
|
mem_id = self._create_memory(msg_content, msg_embeddings, per_msg_meta)
|
|
|
|
returned_memories.append(
|
|
{
|
|
"id": mem_id,
|
|
"memory": msg_content,
|
|
"event": "ADD",
|
|
"actor_id": actor_name if actor_name else None,
|
|
"role": message_dict["role"],
|
|
}
|
|
)
|
|
return returned_memories
|
|
|
|
parsed_messages = parse_messages(messages)
|
|
|
|
if self.config.custom_fact_extraction_prompt:
|
|
system_prompt = self.config.custom_fact_extraction_prompt
|
|
user_prompt = f"Input:\n{parsed_messages}"
|
|
else:
|
|
system_prompt, user_prompt = get_fact_retrieval_messages(parsed_messages)
|
|
|
|
response = self.llm.generate_response(
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt},
|
|
],
|
|
response_format={"type": "json_object"},
|
|
)
|
|
|
|
try:
|
|
response = remove_code_blocks(response)
|
|
new_retrieved_facts = json.loads(response)["facts"]
|
|
except Exception as e:
|
|
logging.error(f"Error in new_retrieved_facts: {e}")
|
|
new_retrieved_facts = []
|
|
|
|
retrieved_old_memory = []
|
|
new_message_embeddings = {}
|
|
for new_mem in new_retrieved_facts:
|
|
messages_embeddings = self.embedding_model.embed(new_mem, "add")
|
|
new_message_embeddings[new_mem] = messages_embeddings
|
|
existing_memories = self.vector_store.search(
|
|
query=new_mem,
|
|
vectors=messages_embeddings,
|
|
limit=5,
|
|
filters=filters,
|
|
)
|
|
for mem in existing_memories:
|
|
retrieved_old_memory.append({"id": mem.id, "text": mem.payload["data"]})
|
|
|
|
unique_data = {}
|
|
for item in retrieved_old_memory:
|
|
unique_data[item["id"]] = item
|
|
retrieved_old_memory = list(unique_data.values())
|
|
logging.info(f"Total existing memories: {len(retrieved_old_memory)}")
|
|
|
|
# mapping UUIDs with integers for handling UUID hallucinations
|
|
temp_uuid_mapping = {}
|
|
for idx, item in enumerate(retrieved_old_memory):
|
|
temp_uuid_mapping[str(idx)] = item["id"]
|
|
retrieved_old_memory[idx]["id"] = str(idx)
|
|
|
|
function_calling_prompt = get_update_memory_messages(
|
|
retrieved_old_memory, new_retrieved_facts, self.config.custom_update_memory_prompt
|
|
)
|
|
|
|
try:
|
|
response: str = self.llm.generate_response(
|
|
messages=[{"role": "user", "content": function_calling_prompt}],
|
|
response_format={"type": "json_object"},
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Error in new memory actions response: {e}")
|
|
response = ""
|
|
|
|
try:
|
|
response = remove_code_blocks(response)
|
|
new_memories_with_actions = json.loads(response)
|
|
except Exception as e:
|
|
logging.error(f"Invalid JSON response: {e}")
|
|
new_memories_with_actions = {}
|
|
|
|
returned_memories = []
|
|
try:
|
|
for resp in new_memories_with_actions.get("memory", []):
|
|
logging.info(resp)
|
|
try:
|
|
action_text = resp.get("text")
|
|
if not action_text:
|
|
logging.info("Skipping memory entry because of empty `text` field.")
|
|
continue
|
|
|
|
event_type = resp.get("event")
|
|
if event_type == "ADD":
|
|
memory_id = self._create_memory(
|
|
data=action_text,
|
|
existing_embeddings=new_message_embeddings,
|
|
metadata=deepcopy(metadata),
|
|
)
|
|
returned_memories.append({"id": memory_id, "memory": action_text, "event": event_type})
|
|
elif event_type == "UPDATE":
|
|
self._update_memory(
|
|
memory_id=temp_uuid_mapping[resp.get("id")],
|
|
data=action_text,
|
|
existing_embeddings=new_message_embeddings,
|
|
metadata=deepcopy(metadata),
|
|
)
|
|
returned_memories.append({
|
|
"id": temp_uuid_mapping[resp.get("id")], "memory": action_text,
|
|
"event": event_type, "previous_memory": resp.get("old_memory"),
|
|
})
|
|
elif event_type == "DELETE":
|
|
self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")])
|
|
returned_memories.append({
|
|
"id": temp_uuid_mapping[resp.get("id")], "memory": action_text,
|
|
"event": event_type,
|
|
})
|
|
elif event_type == "NONE":
|
|
logging.info("NOOP for Memory.")
|
|
except Exception as e:
|
|
logging.error(f"Error processing memory action: {resp}, Error: {e}")
|
|
except Exception as e:
|
|
logging.error(f"Error iterating new_memories_with_actions: {e}")
|
|
|
|
capture_event(
|
|
"mem0.add",
|
|
self,
|
|
{"version": self.api_version, "keys": list(filters.keys()), "sync_type": "sync"},
|
|
)
|
|
return returned_memories
|
|
|
|
def _add_to_graph(self, messages, filters):
|
|
added_entities = []
|
|
if self.enable_graph:
|
|
if filters.get("user_id") is None:
|
|
filters["user_id"] = "user"
|
|
|
|
data = "\n".join([msg["content"] for msg in messages if "content" in msg and msg["role"] != "system"])
|
|
added_entities = self.graph.add(data, filters)
|
|
|
|
return added_entities
|
|
|
|
def get(self, memory_id):
|
|
"""
|
|
Retrieve a memory by ID.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to retrieve.
|
|
|
|
Returns:
|
|
dict: Retrieved memory.
|
|
"""
|
|
capture_event("mem0.get", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
memory = self.vector_store.get(vector_id=memory_id)
|
|
if not memory:
|
|
return None
|
|
|
|
promoted_payload_keys = [
|
|
"user_id",
|
|
"agent_id",
|
|
"run_id",
|
|
"actor_id",
|
|
"role",
|
|
]
|
|
|
|
core_and_promoted_keys = {
|
|
"data", "hash", "created_at", "updated_at", "id",
|
|
*promoted_payload_keys
|
|
}
|
|
|
|
result_item = MemoryItem(
|
|
id=memory.id,
|
|
memory=memory.payload["data"],
|
|
hash=memory.payload.get("hash"),
|
|
created_at=memory.payload.get("created_at"),
|
|
updated_at=memory.payload.get("updated_at"),
|
|
).model_dump()
|
|
|
|
for key in promoted_payload_keys:
|
|
if key in memory.payload:
|
|
result_item[key] = memory.payload[key]
|
|
|
|
additional_metadata = {
|
|
k: v for k, v in memory.payload.items() if k not in core_and_promoted_keys
|
|
}
|
|
if additional_metadata:
|
|
result_item["metadata"] = additional_metadata
|
|
|
|
return result_item
|
|
|
|
def get_all(
|
|
self,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
limit: int = 100,
|
|
):
|
|
"""
|
|
List all memories.
|
|
|
|
Args:
|
|
user_id (str, optional): user id
|
|
agent_id (str, optional): agent id
|
|
run_id (str, optional): run id
|
|
filters (dict, optional): Additional custom key-value filters to apply to the search.
|
|
These are merged with the ID-based scoping filters. For example,
|
|
`filters={"actor_id": "some_user"}`.
|
|
limit (int, optional): The maximum number of memories to return. Defaults to 100.
|
|
|
|
Returns:
|
|
dict: A dictionary containing a list of memories under the "results" key,
|
|
and potentially "relations" if graph store is enabled. For API v1.0,
|
|
it might return a direct list (see deprecation warning).
|
|
Example for v1.1+: `{"results": [{"id": "...", "memory": "...", ...}]}`
|
|
"""
|
|
|
|
_, effective_filters = _build_filters_and_metadata(
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
input_filters=filters
|
|
)
|
|
|
|
if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")):
|
|
raise ValueError("At least one of 'user_id', 'agent_id', or 'run_id' must be specified.")
|
|
|
|
capture_event(
|
|
"mem0.get_all",
|
|
self,
|
|
{"limit": limit, "keys": list(effective_filters.keys()), "sync_type": "sync"}
|
|
)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future_memories = executor.submit(self._get_all_from_vector_store, effective_filters, limit)
|
|
future_graph_entities = (
|
|
executor.submit(self.graph.get_all, effective_filters, limit) if self.enable_graph else None
|
|
)
|
|
|
|
concurrent.futures.wait(
|
|
[future_memories, future_graph_entities] if future_graph_entities else [future_memories]
|
|
)
|
|
|
|
all_memories_result = future_memories.result()
|
|
graph_entities_result = future_graph_entities.result() if future_graph_entities else None
|
|
|
|
if self.enable_graph:
|
|
return {"results": all_memories_result, "relations": graph_entities_result}
|
|
|
|
if self.api_version == "v1.0":
|
|
warnings.warn(
|
|
"The current get_all API output format is deprecated. "
|
|
"To use the latest format, set `api_version='v1.1'` (which returns a dict with a 'results' key). "
|
|
"The current format (direct list for v1.0) will be removed in mem0ai 1.1.0 and later versions.",
|
|
category=DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return all_memories_result
|
|
else:
|
|
return {"results": all_memories_result}
|
|
|
|
def _get_all_from_vector_store(self, filters, limit):
|
|
memories_result = self.vector_store.list(filters=filters, limit=limit)
|
|
actual_memories = memories_result[0] if isinstance(memories_result, tuple) and len(memories_result) > 0 else memories_result
|
|
|
|
promoted_payload_keys = [
|
|
"user_id", "agent_id", "run_id",
|
|
"actor_id",
|
|
"role",
|
|
]
|
|
core_and_promoted_keys = {
|
|
"data", "hash", "created_at", "updated_at", "id",
|
|
*promoted_payload_keys
|
|
}
|
|
|
|
formatted_memories = []
|
|
for mem in actual_memories:
|
|
memory_item_dict = MemoryItem(
|
|
id=mem.id,
|
|
memory=mem.payload["data"],
|
|
hash=mem.payload.get("hash"),
|
|
created_at=mem.payload.get("created_at"),
|
|
updated_at=mem.payload.get("updated_at"),
|
|
).model_dump(exclude={"score"})
|
|
|
|
for key in promoted_payload_keys:
|
|
if key in mem.payload:
|
|
memory_item_dict[key] = mem.payload[key]
|
|
|
|
additional_metadata = {
|
|
k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys
|
|
}
|
|
if additional_metadata:
|
|
memory_item_dict["metadata"] = additional_metadata
|
|
|
|
formatted_memories.append(memory_item_dict)
|
|
|
|
return formatted_memories
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
limit: int = 100,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
):
|
|
"""
|
|
Searches for memories based on a query
|
|
Args:
|
|
query (str): Query to search for.
|
|
user_id (str, optional): ID of the user to search for. Defaults to None.
|
|
agent_id (str, optional): ID of the agent to search for. Defaults to None.
|
|
run_id (str, optional): ID of the run to search for. Defaults to None.
|
|
limit (int, optional): Limit the number of results. Defaults to 100.
|
|
filters (dict, optional): Filters to apply to the search. Defaults to None..
|
|
|
|
Returns:
|
|
dict: A dictionary containing the search results, typically under a "results" key,
|
|
and potentially "relations" if graph store is enabled.
|
|
Example for v1.1+: `{"results": [{"id": "...", "memory": "...", "score": 0.8, ...}]}`
|
|
"""
|
|
_, effective_filters = _build_filters_and_metadata(
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
input_filters=filters
|
|
)
|
|
|
|
if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")):
|
|
raise ValueError("At least one of 'user_id', 'agent_id', or 'run_id' must be specified.")
|
|
|
|
capture_event(
|
|
"mem0.search",
|
|
self,
|
|
{"limit": limit, "version": self.api_version, "keys": list(effective_filters.keys()), "sync_type": "sync"},
|
|
)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future_memories = executor.submit(self._search_vector_store, query, effective_filters, limit)
|
|
future_graph_entities = (
|
|
executor.submit(self.graph.search, query, effective_filters, limit) if self.enable_graph else None
|
|
)
|
|
|
|
concurrent.futures.wait(
|
|
[future_memories, future_graph_entities] if future_graph_entities else [future_memories]
|
|
)
|
|
|
|
original_memories = future_memories.result()
|
|
graph_entities = future_graph_entities.result() if future_graph_entities else None
|
|
|
|
if self.enable_graph:
|
|
return {"results": original_memories, "relations": graph_entities}
|
|
|
|
if self.api_version == "v1.0":
|
|
warnings.warn(
|
|
"The current search API output format is deprecated. "
|
|
"To use the latest format, set `api_version='v1.1'`. "
|
|
"The current format will be removed in mem0ai 1.1.0 and later versions.",
|
|
category=DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return {"results": original_memories}
|
|
else:
|
|
return {"results": original_memories}
|
|
|
|
def _search_vector_store(self, query, filters, limit):
|
|
embeddings = self.embedding_model.embed(query, "search")
|
|
memories = self.vector_store.search(query=query, vectors=embeddings, limit=limit, filters=filters)
|
|
|
|
promoted_payload_keys = [
|
|
"user_id",
|
|
"agent_id",
|
|
"run_id",
|
|
"actor_id",
|
|
"role",
|
|
]
|
|
|
|
core_and_promoted_keys = {
|
|
"data", "hash", "created_at", "updated_at", "id",
|
|
*promoted_payload_keys
|
|
}
|
|
|
|
original_memories = []
|
|
for mem in memories:
|
|
memory_item_dict = MemoryItem(
|
|
id=mem.id,
|
|
memory=mem.payload["data"],
|
|
hash=mem.payload.get("hash"),
|
|
created_at=mem.payload.get("created_at"),
|
|
updated_at=mem.payload.get("updated_at"),
|
|
score=mem.score,
|
|
).model_dump()
|
|
|
|
for key in promoted_payload_keys:
|
|
if key in mem.payload:
|
|
memory_item_dict[key] = mem.payload[key]
|
|
|
|
additional_metadata = {
|
|
k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys
|
|
}
|
|
if additional_metadata:
|
|
memory_item_dict["metadata"] = additional_metadata
|
|
|
|
original_memories.append(memory_item_dict)
|
|
|
|
return original_memories
|
|
|
|
def update(self, memory_id, data):
|
|
"""
|
|
Update a memory by ID.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to update.
|
|
data (dict): Data to update the memory with.
|
|
|
|
Returns:
|
|
dict: Updated memory.
|
|
"""
|
|
capture_event("mem0.update", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
|
|
existing_embeddings = {data: self.embedding_model.embed(data, "update")}
|
|
|
|
self._update_memory(memory_id, data, existing_embeddings)
|
|
return {"message": "Memory updated successfully!"}
|
|
|
|
def delete(self, memory_id):
|
|
"""
|
|
Delete a memory by ID.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to delete.
|
|
"""
|
|
capture_event("mem0.delete", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
self._delete_memory(memory_id)
|
|
return {"message": "Memory deleted successfully!"}
|
|
|
|
def delete_all(self, user_id:Optional[str]=None, agent_id:Optional[str]=None, run_id:Optional[str]=None):
|
|
"""
|
|
Delete all memories.
|
|
|
|
Args:
|
|
user_id (str, optional): ID of the user to delete memories for. Defaults to None.
|
|
agent_id (str, optional): ID of the agent to delete memories for. Defaults to None.
|
|
run_id (str, optional): ID of the run to delete memories for. Defaults to None.
|
|
"""
|
|
filters: Dict[str, Any] = {}
|
|
if user_id:
|
|
filters["user_id"] = user_id
|
|
if agent_id:
|
|
filters["agent_id"] = agent_id
|
|
if run_id:
|
|
filters["run_id"] = run_id
|
|
|
|
if not filters:
|
|
raise ValueError(
|
|
"At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method."
|
|
)
|
|
|
|
capture_event("mem0.delete_all", self, {"keys": list(filters.keys()), "sync_type": "sync"})
|
|
memories = self.vector_store.list(filters=filters)[0]
|
|
for memory in memories:
|
|
self._delete_memory(memory.id)
|
|
|
|
logger.info(f"Deleted {len(memories)} memories")
|
|
|
|
if self.enable_graph:
|
|
self.graph.delete_all(filters)
|
|
|
|
return {"message": "Memories deleted successfully!"}
|
|
|
|
def history(self, memory_id):
|
|
"""
|
|
Get the history of changes for a memory by ID.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to get history for.
|
|
|
|
Returns:
|
|
list: List of changes for the memory.
|
|
"""
|
|
capture_event("mem0.history", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
return self.db.get_history(memory_id)
|
|
|
|
def _create_memory(self, data, existing_embeddings, metadata=None):
|
|
logging.debug(f"Creating memory with {data=}")
|
|
if data in existing_embeddings:
|
|
embeddings = existing_embeddings[data]
|
|
else:
|
|
embeddings = self.embedding_model.embed(data, memory_action="add")
|
|
memory_id = str(uuid.uuid4())
|
|
metadata = metadata or {}
|
|
metadata["data"] = data
|
|
metadata["hash"] = hashlib.md5(data.encode()).hexdigest()
|
|
metadata["created_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat()
|
|
|
|
self.vector_store.insert(
|
|
vectors=[embeddings],
|
|
ids=[memory_id],
|
|
payloads=[metadata],
|
|
)
|
|
self.db.add_history(
|
|
memory_id,
|
|
None,
|
|
data,
|
|
"ADD",
|
|
created_at=metadata.get("created_at"),
|
|
actor_id=metadata.get("actor_id"),
|
|
role=metadata.get("role"),
|
|
)
|
|
capture_event("mem0._create_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
return memory_id
|
|
|
|
def _create_procedural_memory(self, messages, metadata=None, prompt=None):
|
|
"""
|
|
Create a procedural memory
|
|
|
|
Args:
|
|
messages (list): List of messages to create a procedural memory from.
|
|
metadata (dict): Metadata to create a procedural memory from.
|
|
prompt (str, optional): Prompt to use for the procedural memory creation. Defaults to None.
|
|
"""
|
|
logger.info("Creating procedural memory")
|
|
|
|
parsed_messages = [
|
|
{"role": "system", "content": prompt or PROCEDURAL_MEMORY_SYSTEM_PROMPT},
|
|
*messages,
|
|
{
|
|
"role": "user",
|
|
"content": "Create procedural memory of the above conversation.",
|
|
},
|
|
]
|
|
|
|
try:
|
|
procedural_memory = self.llm.generate_response(messages=parsed_messages)
|
|
except Exception as e:
|
|
logger.error(f"Error generating procedural memory summary: {e}")
|
|
raise
|
|
|
|
if metadata is None:
|
|
raise ValueError("Metadata cannot be done for procedural memory.")
|
|
|
|
metadata["memory_type"] = MemoryType.PROCEDURAL.value
|
|
embeddings = self.embedding_model.embed(procedural_memory, memory_action="add")
|
|
memory_id = self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata)
|
|
capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
|
|
result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]}
|
|
|
|
return result
|
|
|
|
def _update_memory(self, memory_id, data, existing_embeddings, metadata=None):
|
|
logger.info(f"Updating memory with {data=}")
|
|
|
|
try:
|
|
existing_memory = self.vector_store.get(vector_id=memory_id)
|
|
except Exception:
|
|
logger.error(f"Error getting memory with ID {memory_id} during update.")
|
|
raise ValueError(f"Error getting memory with ID {memory_id}. Please provide a valid 'memory_id'")
|
|
|
|
prev_value = existing_memory.payload.get("data")
|
|
|
|
new_metadata = deepcopy(metadata) if metadata is not None else {}
|
|
|
|
new_metadata["data"] = data
|
|
new_metadata["hash"] = hashlib.md5(data.encode()).hexdigest()
|
|
new_metadata["created_at"] = existing_memory.payload.get("created_at")
|
|
new_metadata["updated_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat()
|
|
|
|
if "user_id" in existing_memory.payload:
|
|
new_metadata["user_id"] = existing_memory.payload["user_id"]
|
|
if "agent_id" in existing_memory.payload:
|
|
new_metadata["agent_id"] = existing_memory.payload["agent_id"]
|
|
if "run_id" in existing_memory.payload:
|
|
new_metadata["run_id"] = existing_memory.payload["run_id"]
|
|
if "actor_id" in existing_memory.payload:
|
|
new_metadata["actor_id"] = existing_memory.payload["actor_id"]
|
|
if "role" in existing_memory.payload:
|
|
new_metadata["role"] = existing_memory.payload["role"]
|
|
|
|
if data in existing_embeddings:
|
|
embeddings = existing_embeddings[data]
|
|
else:
|
|
embeddings = self.embedding_model.embed(data, "update")
|
|
|
|
self.vector_store.update(
|
|
vector_id=memory_id,
|
|
vector=embeddings,
|
|
payload=new_metadata,
|
|
)
|
|
logger.info(f"Updating memory with ID {memory_id=} with {data=}")
|
|
|
|
self.db.add_history(
|
|
memory_id,
|
|
prev_value,
|
|
data,
|
|
"UPDATE",
|
|
created_at=new_metadata["created_at"],
|
|
updated_at=new_metadata["updated_at"],
|
|
actor_id=new_metadata.get("actor_id"),
|
|
role=new_metadata.get("role"),
|
|
)
|
|
capture_event("mem0._update_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
return memory_id
|
|
|
|
def _delete_memory(self, memory_id):
|
|
logging.info(f"Deleting memory with {memory_id=}")
|
|
existing_memory = self.vector_store.get(vector_id=memory_id)
|
|
prev_value = existing_memory.payload["data"]
|
|
self.vector_store.delete(vector_id=memory_id)
|
|
self.db.add_history(
|
|
memory_id,
|
|
prev_value,
|
|
None,
|
|
"DELETE",
|
|
actor_id=existing_memory.payload.get("actor_id"),
|
|
role=existing_memory.payload.get("role"),
|
|
is_deleted=1,
|
|
)
|
|
capture_event("mem0._delete_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
|
|
return memory_id
|
|
|
|
def reset(self):
|
|
"""
|
|
Reset the memory store by:
|
|
Deletes the vector store collection
|
|
Resets the database
|
|
Recreates the vector store with a new client
|
|
"""
|
|
logger.warning("Resetting all memories")
|
|
|
|
if hasattr(self.db, "connection") and self.db.connection:
|
|
self.db.connection.execute("DROP TABLE IF EXISTS history")
|
|
self.db.connection.close()
|
|
|
|
self.db = SQLiteManager(self.config.history_db_path)
|
|
|
|
if hasattr(self.vector_store, "reset"):
|
|
self.vector_store = VectorStoreFactory.reset(self.vector_store)
|
|
else:
|
|
logger.warning("Vector store does not support reset. Skipping.")
|
|
self.vector_store.delete_col()
|
|
self.vector_store = VectorStoreFactory.create(
|
|
self.config.vector_store.provider, self.config.vector_store.config
|
|
)
|
|
capture_event("mem0.reset", self, {"sync_type": "sync"})
|
|
|
|
def chat(self, query):
|
|
raise NotImplementedError("Chat function not implemented yet.")
|
|
|
|
|
|
class AsyncMemory(MemoryBase):
|
|
def __init__(self, config: MemoryConfig = MemoryConfig()):
|
|
self.config = config
|
|
|
|
self.embedding_model = EmbedderFactory.create(
|
|
self.config.embedder.provider,
|
|
self.config.embedder.config,
|
|
self.config.vector_store.config,
|
|
)
|
|
self.vector_store = VectorStoreFactory.create(
|
|
self.config.vector_store.provider, self.config.vector_store.config
|
|
)
|
|
self.llm = LlmFactory.create(self.config.llm.provider, self.config.llm.config)
|
|
self.db = SQLiteManager(self.config.history_db_path)
|
|
self.collection_name = self.config.vector_store.config.collection_name
|
|
self.api_version = self.config.version
|
|
|
|
self.enable_graph = False
|
|
|
|
if self.config.graph_store.config:
|
|
from mem0.memory.graph_memory import MemoryGraph
|
|
|
|
self.graph = MemoryGraph(self.config)
|
|
self.enable_graph = True
|
|
else:
|
|
self.graph = None
|
|
|
|
capture_event("mem0.init", self, {"sync_type": "async"})
|
|
|
|
@classmethod
|
|
async def from_config(cls, config_dict: Dict[str, Any]):
|
|
try:
|
|
config = cls._process_config(config_dict)
|
|
config = MemoryConfig(**config_dict)
|
|
except ValidationError as e:
|
|
logger.error(f"Configuration validation error: {e}")
|
|
raise
|
|
return cls(config)
|
|
|
|
@staticmethod
|
|
def _process_config(config_dict: Dict[str, Any]) -> Dict[str, Any]:
|
|
if "graph_store" in config_dict:
|
|
if "vector_store" not in config_dict and "embedder" in config_dict:
|
|
config_dict["vector_store"] = {}
|
|
config_dict["vector_store"]["config"] = {}
|
|
config_dict["vector_store"]["config"]["embedding_model_dims"] = config_dict["embedder"]["config"][
|
|
"embedding_dims"
|
|
]
|
|
try:
|
|
return config_dict
|
|
except ValidationError as e:
|
|
logger.error(f"Configuration validation error: {e}")
|
|
raise
|
|
|
|
async def add(
|
|
self,
|
|
messages,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
infer: bool = True,
|
|
memory_type: Optional[str] = None,
|
|
prompt: Optional[str] = None,
|
|
llm=None,
|
|
):
|
|
"""
|
|
Create a new memory asynchronously.
|
|
|
|
Args:
|
|
messages (str or List[Dict[str, str]]): Messages to store in the memory.
|
|
user_id (str, optional): ID of the user creating the memory.
|
|
agent_id (str, optional): ID of the agent creating the memory. Defaults to None.
|
|
run_id (str, optional): ID of the run creating the memory. Defaults to None.
|
|
metadata (dict, optional): Metadata to store with the memory. Defaults to None.
|
|
infer (bool, optional): Whether to infer the memories. Defaults to True.
|
|
memory_type (str, optional): Type of memory to create. Defaults to None.
|
|
Pass "procedural_memory" to create procedural memories.
|
|
prompt (str, optional): Prompt to use for the memory creation. Defaults to None.
|
|
llm (BaseChatModel, optional): LLM class to use for generating procedural memories. Defaults to None. Useful when user is using LangChain ChatModel.
|
|
Returns:
|
|
dict: A dictionary containing the result of the memory addition operation.
|
|
"""
|
|
processed_metadata, effective_filters = _build_filters_and_metadata(
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
input_metadata=metadata
|
|
)
|
|
|
|
if memory_type is not None and memory_type != MemoryType.PROCEDURAL.value:
|
|
raise ValueError(
|
|
f"Invalid 'memory_type'. Please pass {MemoryType.PROCEDURAL.value} to create procedural memories."
|
|
)
|
|
|
|
if isinstance(messages, str):
|
|
messages = [{"role": "user", "content": messages}]
|
|
|
|
elif isinstance(messages, dict):
|
|
messages = [messages]
|
|
|
|
elif not isinstance(messages, list):
|
|
raise ValueError("messages must be str, dict, or list[dict]")
|
|
|
|
if agent_id is not None and memory_type == MemoryType.PROCEDURAL.value:
|
|
results = await self._create_procedural_memory(messages, metadata=processed_metadata, prompt=prompt, llm=llm)
|
|
return results
|
|
|
|
if self.config.llm.config.get("enable_vision"):
|
|
messages = parse_vision_messages(messages, self.llm, self.config.llm.config.get("vision_details"))
|
|
else:
|
|
messages = parse_vision_messages(messages)
|
|
|
|
vector_store_task = asyncio.create_task(self._add_to_vector_store(messages, processed_metadata, effective_filters, infer))
|
|
graph_task = asyncio.create_task(self._add_to_graph(messages, effective_filters))
|
|
|
|
vector_store_result, graph_result = await asyncio.gather(vector_store_task, graph_task)
|
|
|
|
if self.api_version == "v1.0":
|
|
warnings.warn(
|
|
"The current add API output format is deprecated. "
|
|
"To use the latest format, set `api_version='v1.1'`. "
|
|
"The current format will be removed in mem0ai 1.1.0 and later versions.",
|
|
category=DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return vector_store_result
|
|
|
|
if self.enable_graph:
|
|
return {
|
|
"results": vector_store_result,
|
|
"relations": graph_result,
|
|
}
|
|
|
|
return {"results": vector_store_result}
|
|
|
|
async def _add_to_vector_store(
|
|
self,
|
|
messages: list,
|
|
metadata: dict,
|
|
filters: dict,
|
|
infer: bool,
|
|
):
|
|
if not infer:
|
|
returned_memories = []
|
|
for message_dict in messages:
|
|
if not isinstance(message_dict, dict) or \
|
|
message_dict.get("role") is None or \
|
|
message_dict.get("content") is None:
|
|
logger.warning(f"Skipping invalid message format (async): {message_dict}")
|
|
continue
|
|
|
|
if message_dict["role"] == "system":
|
|
continue
|
|
|
|
per_msg_meta = deepcopy(metadata)
|
|
per_msg_meta["role"] = message_dict["role"]
|
|
|
|
actor_name = message_dict.get("name")
|
|
if actor_name:
|
|
per_msg_meta["actor_id"] = actor_name
|
|
|
|
msg_content = message_dict["content"]
|
|
msg_embeddings = await asyncio.to_thread(self.embedding_model.embed, msg_content, "add")
|
|
mem_id = await self._create_memory(msg_content, msg_embeddings, per_msg_meta)
|
|
|
|
returned_memories.append({
|
|
"id": mem_id, "memory": msg_content, "event": "ADD",
|
|
"actor_id": actor_name if actor_name else None,
|
|
"role": message_dict["role"],
|
|
})
|
|
return returned_memories
|
|
|
|
parsed_messages = parse_messages(messages)
|
|
if self.config.custom_fact_extraction_prompt:
|
|
system_prompt = self.config.custom_fact_extraction_prompt
|
|
user_prompt = f"Input:\n{parsed_messages}"
|
|
else:
|
|
system_prompt, user_prompt = get_fact_retrieval_messages(parsed_messages)
|
|
|
|
response = await asyncio.to_thread(
|
|
self.llm.generate_response,
|
|
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
|
|
response_format={"type": "json_object"},
|
|
)
|
|
try:
|
|
response = remove_code_blocks(response)
|
|
new_retrieved_facts = json.loads(response)["facts"]
|
|
except Exception as e:
|
|
logging.error(f"Error in new_retrieved_facts: {e}"); new_retrieved_facts = []
|
|
|
|
retrieved_old_memory = []
|
|
new_message_embeddings = {}
|
|
|
|
async def process_fact_for_search(new_mem_content):
|
|
embeddings = await asyncio.to_thread(self.embedding_model.embed, new_mem_content, "add")
|
|
new_message_embeddings[new_mem_content] = embeddings
|
|
existing_mems = await asyncio.to_thread(
|
|
self.vector_store.search, query=new_mem_content, vectors=embeddings,
|
|
limit=5, filters=filters, # 'filters' is query_filters_for_inference
|
|
)
|
|
return [{"id": mem.id, "text": mem.payload["data"]} for mem in existing_mems]
|
|
|
|
search_tasks = [process_fact_for_search(fact) for fact in new_retrieved_facts]
|
|
search_results_list = await asyncio.gather(*search_tasks)
|
|
for result_group in search_results_list:
|
|
retrieved_old_memory.extend(result_group)
|
|
|
|
unique_data = {}
|
|
for item in retrieved_old_memory: unique_data[item["id"]] = item
|
|
retrieved_old_memory = list(unique_data.values())
|
|
logging.info(f"Total existing memories: {len(retrieved_old_memory)}")
|
|
temp_uuid_mapping = {}
|
|
for idx, item in enumerate(retrieved_old_memory):
|
|
temp_uuid_mapping[str(idx)] = item["id"]
|
|
retrieved_old_memory[idx]["id"] = str(idx)
|
|
|
|
function_calling_prompt = get_update_memory_messages(
|
|
retrieved_old_memory, new_retrieved_facts, self.config.custom_update_memory_prompt
|
|
)
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
self.llm.generate_response,
|
|
messages=[{"role": "user", "content": function_calling_prompt}],
|
|
response_format={"type": "json_object"},
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Error in new memory actions response: {e}"); response = ""
|
|
|
|
try:
|
|
response = remove_code_blocks(response)
|
|
new_memories_with_actions = json.loads(response)
|
|
except Exception as e:
|
|
logging.error(f"Invalid JSON response: {e}"); new_memories_with_actions = {}
|
|
|
|
returned_memories = []
|
|
try:
|
|
memory_tasks = []
|
|
for resp in new_memories_with_actions.get("memory", []):
|
|
logging.info(resp)
|
|
try:
|
|
action_text = resp.get("text")
|
|
if not action_text: continue
|
|
event_type = resp.get("event")
|
|
|
|
if event_type == "ADD":
|
|
task = asyncio.create_task(self._create_memory(
|
|
data=action_text, existing_embeddings=new_message_embeddings,
|
|
metadata=deepcopy(metadata)
|
|
))
|
|
memory_tasks.append((task, resp, "ADD", None))
|
|
elif event_type == "UPDATE":
|
|
task = asyncio.create_task(self._update_memory(
|
|
memory_id=temp_uuid_mapping[resp["id"]], data=action_text,
|
|
existing_embeddings=new_message_embeddings, metadata=deepcopy(metadata)
|
|
))
|
|
memory_tasks.append((task, resp, "UPDATE", temp_uuid_mapping[resp["id"]]))
|
|
elif event_type == "DELETE":
|
|
task = asyncio.create_task(self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")]))
|
|
memory_tasks.append((task, resp, "DELETE", temp_uuid_mapping[resp.get("id")]))
|
|
elif event_type == "NONE":
|
|
logging.info("NOOP for Memory (async).")
|
|
except Exception as e:
|
|
logging.error(f"Error processing memory action (async): {resp}, Error: {e}")
|
|
|
|
for task, resp, event_type, mem_id in memory_tasks:
|
|
try:
|
|
result_id = await task
|
|
if event_type == "ADD":
|
|
returned_memories.append({
|
|
"id": result_id, "memory": resp.get("text"), "event": event_type
|
|
})
|
|
elif event_type == "UPDATE":
|
|
returned_memories.append({
|
|
"id": mem_id, "memory": resp.get("text"),
|
|
"event": event_type, "previous_memory": resp.get("old_memory")
|
|
})
|
|
elif event_type == "DELETE":
|
|
returned_memories.append({
|
|
"id": mem_id, "memory": resp.get("text"), "event": event_type
|
|
})
|
|
except Exception as e:
|
|
logging.error(f"Error awaiting memory task (async): {e}")
|
|
except Exception as e:
|
|
logging.error(f"Error in memory processing loop (async): {e}")
|
|
|
|
capture_event(
|
|
"mem0.add", self,
|
|
{"version": self.api_version, "keys": list(filters.keys()), "sync_type": "async"}
|
|
)
|
|
return returned_memories
|
|
|
|
async def _add_to_graph(self, messages, filters):
|
|
added_entities = []
|
|
if self.enable_graph:
|
|
if filters.get("user_id") is None:
|
|
filters["user_id"] = "user"
|
|
|
|
data = "\n".join([msg["content"] for msg in messages if "content" in msg and msg["role"] != "system"])
|
|
added_entities = await asyncio.to_thread(self.graph.add, data, filters)
|
|
|
|
return added_entities
|
|
|
|
async def get(self, memory_id):
|
|
"""
|
|
Retrieve a memory by ID asynchronously.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to retrieve.
|
|
|
|
Returns:
|
|
dict: Retrieved memory.
|
|
"""
|
|
capture_event("mem0.get", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id)
|
|
if not memory:
|
|
return None
|
|
|
|
promoted_payload_keys = [
|
|
"user_id",
|
|
"agent_id",
|
|
"run_id",
|
|
"actor_id",
|
|
"role",
|
|
]
|
|
|
|
core_and_promoted_keys = {
|
|
"data", "hash", "created_at", "updated_at", "id",
|
|
*promoted_payload_keys
|
|
}
|
|
|
|
result_item = MemoryItem(
|
|
id=memory.id,
|
|
memory=memory.payload["data"],
|
|
hash=memory.payload.get("hash"),
|
|
created_at=memory.payload.get("created_at"),
|
|
updated_at=memory.payload.get("updated_at"),
|
|
).model_dump()
|
|
|
|
for key in promoted_payload_keys:
|
|
if key in memory.payload:
|
|
result_item[key] = memory.payload[key]
|
|
|
|
additional_metadata = {
|
|
k: v for k, v in memory.payload.items() if k not in core_and_promoted_keys
|
|
}
|
|
if additional_metadata:
|
|
result_item["metadata"] = additional_metadata
|
|
|
|
return result_item
|
|
|
|
async def get_all(
|
|
self,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
limit: int = 100,
|
|
):
|
|
"""
|
|
List all memories.
|
|
|
|
Args:
|
|
user_id (str, optional): user id
|
|
agent_id (str, optional): agent id
|
|
run_id (str, optional): run id
|
|
filters (dict, optional): Additional custom key-value filters to apply to the search.
|
|
These are merged with the ID-based scoping filters. For example,
|
|
`filters={"actor_id": "some_user"}`.
|
|
limit (int, optional): The maximum number of memories to return. Defaults to 100.
|
|
|
|
Returns:
|
|
dict: A dictionary containing a list of memories under the "results" key,
|
|
and potentially "relations" if graph store is enabled. For API v1.0,
|
|
it might return a direct list (see deprecation warning).
|
|
Example for v1.1+: `{"results": [{"id": "...", "memory": "...", ...}]}`
|
|
"""
|
|
|
|
_, effective_filters = _build_filters_and_metadata(
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
input_filters=filters
|
|
)
|
|
|
|
if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")):
|
|
raise ValueError(
|
|
"When 'conversation_id' is not provided (classic mode), "
|
|
"at least one of 'user_id', 'agent_id', or 'run_id' must be specified for get_all."
|
|
)
|
|
|
|
capture_event(
|
|
"mem0.get_all",
|
|
self,
|
|
{"limit": limit, "keys": list(effective_filters.keys()), "sync_type": "async"}
|
|
)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future_memories = executor.submit(self._get_all_from_vector_store, effective_filters, limit)
|
|
future_graph_entities = (
|
|
executor.submit(self.graph.get_all, effective_filters, limit) if self.enable_graph else None
|
|
)
|
|
|
|
concurrent.futures.wait(
|
|
[future_memories, future_graph_entities] if future_graph_entities else [future_memories]
|
|
)
|
|
|
|
all_memories_result = future_memories.result()
|
|
graph_entities_result = future_graph_entities.result() if future_graph_entities else None
|
|
|
|
if self.enable_graph:
|
|
return {"results": all_memories_result, "relations": graph_entities_result}
|
|
|
|
if self.api_version == "v1.0":
|
|
warnings.warn(
|
|
"The current get_all API output format is deprecated. "
|
|
"To use the latest format, set `api_version='v1.1'` (which returns a dict with a 'results' key). "
|
|
"The current format (direct list for v1.0) will be removed in mem0ai 1.1.0 and later versions.",
|
|
category=DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return all_memories_result
|
|
else:
|
|
return {"results": all_memories_result}
|
|
|
|
async def _get_all_from_vector_store(self, filters, limit):
|
|
memories_result = await asyncio.to_thread(self.vector_store.list, filters=filters, limit=limit)
|
|
actual_memories = memories_result[0] if isinstance(memories_result, tuple) and len(memories_result) > 0 else memories_result
|
|
|
|
promoted_payload_keys = [
|
|
"user_id", "agent_id", "run_id",
|
|
"actor_id",
|
|
"role",
|
|
]
|
|
core_and_promoted_keys = {
|
|
"data", "hash", "created_at", "updated_at", "id",
|
|
*promoted_payload_keys
|
|
}
|
|
|
|
formatted_memories = []
|
|
for mem in actual_memories:
|
|
memory_item_dict = MemoryItem(
|
|
id=mem.id,
|
|
memory=mem.payload["data"],
|
|
hash=mem.payload.get("hash"),
|
|
created_at=mem.payload.get("created_at"),
|
|
updated_at=mem.payload.get("updated_at"),
|
|
).model_dump(exclude={"score"})
|
|
|
|
for key in promoted_payload_keys:
|
|
if key in mem.payload:
|
|
memory_item_dict[key] = mem.payload[key]
|
|
|
|
additional_metadata = {
|
|
k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys
|
|
}
|
|
if additional_metadata:
|
|
memory_item_dict["metadata"] = additional_metadata
|
|
|
|
formatted_memories.append(memory_item_dict)
|
|
|
|
return formatted_memories
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
limit: int = 100,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
):
|
|
"""
|
|
Searches for memories based on a query
|
|
Args:
|
|
query (str): Query to search for.
|
|
user_id (str, optional): ID of the user to search for. Defaults to None.
|
|
agent_id (str, optional): ID of the agent to search for. Defaults to None.
|
|
run_id (str, optional): ID of the run to search for. Defaults to None.
|
|
limit (int, optional): Limit the number of results. Defaults to 100.
|
|
filters (dict, optional): Filters to apply to the search. Defaults to None..
|
|
|
|
Returns:
|
|
dict: A dictionary containing the search results, typically under a "results" key,
|
|
and potentially "relations" if graph store is enabled.
|
|
Example for v1.1+: `{"results": [{"id": "...", "memory": "...", "score": 0.8, ...}]}`
|
|
"""
|
|
|
|
_, effective_filters = _build_filters_and_metadata(
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
input_filters=filters
|
|
)
|
|
|
|
if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")):
|
|
raise ValueError("at least one of 'user_id', 'agent_id', or 'run_id' must be specified ")
|
|
|
|
capture_event(
|
|
"mem0.search",
|
|
self,
|
|
{"limit": limit, "version": self.api_version, "keys": list(effective_filters.keys()), "sync_type": "async"},
|
|
)
|
|
|
|
vector_store_task = asyncio.create_task(self._search_vector_store(query, effective_filters, limit))
|
|
|
|
graph_task = None
|
|
if self.enable_graph:
|
|
if hasattr(self.graph.search, "__await__"): # Check if graph search is async
|
|
graph_task = asyncio.create_task(self.graph.search(query, effective_filters, limit))
|
|
else:
|
|
graph_task = asyncio.create_task(
|
|
asyncio.to_thread(self.graph.search, query, effective_filters, limit)
|
|
)
|
|
|
|
if graph_task:
|
|
original_memories, graph_entities = await asyncio.gather(vector_store_task, graph_task)
|
|
else:
|
|
original_memories = await vector_store_task
|
|
graph_entities = None
|
|
|
|
if self.enable_graph:
|
|
return {"results": original_memories, "relations": graph_entities}
|
|
|
|
if self.api_version == "v1.0":
|
|
warnings.warn(
|
|
"The current search API output format is deprecated. "
|
|
"To use the latest format, set `api_version='v1.1'`. "
|
|
"The current format will be removed in mem0ai 1.1.0 and later versions.",
|
|
category=DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return {"results": original_memories}
|
|
else:
|
|
return {"results": original_memories}
|
|
|
|
async def _search_vector_store(self, query, filters, limit):
|
|
embeddings = await asyncio.to_thread(self.embedding_model.embed, query, "search")
|
|
memories = await asyncio.to_thread(
|
|
self.vector_store.search, query=query, vectors=embeddings, limit=limit, filters=filters
|
|
)
|
|
|
|
promoted_payload_keys = [
|
|
"user_id",
|
|
"agent_id",
|
|
"run_id",
|
|
"actor_id",
|
|
"role",
|
|
]
|
|
|
|
core_and_promoted_keys = {
|
|
"data", "hash", "created_at", "updated_at", "id",
|
|
*promoted_payload_keys
|
|
}
|
|
|
|
original_memories = []
|
|
for mem in memories:
|
|
memory_item_dict = MemoryItem(
|
|
id=mem.id,
|
|
memory=mem.payload["data"],
|
|
hash=mem.payload.get("hash"),
|
|
created_at=mem.payload.get("created_at"),
|
|
updated_at=mem.payload.get("updated_at"),
|
|
score=mem.score,
|
|
).model_dump()
|
|
|
|
for key in promoted_payload_keys:
|
|
if key in mem.payload:
|
|
memory_item_dict[key] = mem.payload[key]
|
|
|
|
additional_metadata = {
|
|
k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys
|
|
}
|
|
if additional_metadata:
|
|
memory_item_dict["metadata"] = additional_metadata
|
|
|
|
original_memories.append(memory_item_dict)
|
|
|
|
return original_memories
|
|
|
|
async def update(self, memory_id, data):
|
|
"""
|
|
Update a memory by ID asynchronously.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to update.
|
|
data (dict): Data to update the memory with.
|
|
|
|
Returns:
|
|
dict: Updated memory.
|
|
"""
|
|
capture_event("mem0.update", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
|
|
embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update")
|
|
existing_embeddings = {data: embeddings}
|
|
|
|
await self._update_memory(memory_id, data, existing_embeddings)
|
|
return {"message": "Memory updated successfully!"}
|
|
|
|
async def delete(self, memory_id):
|
|
"""
|
|
Delete a memory by ID asynchronously.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to delete.
|
|
"""
|
|
capture_event("mem0.delete", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
await self._delete_memory(memory_id)
|
|
return {"message": "Memory deleted successfully!"}
|
|
|
|
async def delete_all(self, user_id=None, agent_id=None, run_id=None):
|
|
"""
|
|
Delete all memories asynchronously.
|
|
|
|
Args:
|
|
user_id (str, optional): ID of the user to delete memories for. Defaults to None.
|
|
agent_id (str, optional): ID of the agent to delete memories for. Defaults to None.
|
|
run_id (str, optional): ID of the run to delete memories for. Defaults to None.
|
|
"""
|
|
filters = {}
|
|
if user_id:
|
|
filters["user_id"] = user_id
|
|
if agent_id:
|
|
filters["agent_id"] = agent_id
|
|
if run_id:
|
|
filters["run_id"] = run_id
|
|
|
|
if not filters:
|
|
raise ValueError(
|
|
"At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method."
|
|
)
|
|
|
|
capture_event("mem0.delete_all", self, {"keys": list(filters.keys()), "sync_type": "async"})
|
|
memories = await asyncio.to_thread(self.vector_store.list, filters=filters)
|
|
|
|
delete_tasks = []
|
|
for memory in memories[0]:
|
|
delete_tasks.append(self._delete_memory(memory.id))
|
|
|
|
await asyncio.gather(*delete_tasks)
|
|
|
|
logger.info(f"Deleted {len(memories[0])} memories")
|
|
|
|
if self.enable_graph:
|
|
await asyncio.to_thread(self.graph.delete_all, filters)
|
|
|
|
return {"message": "Memories deleted successfully!"}
|
|
|
|
async def history(self, memory_id):
|
|
"""
|
|
Get the history of changes for a memory by ID asynchronously.
|
|
|
|
Args:
|
|
memory_id (str): ID of the memory to get history for.
|
|
|
|
Returns:
|
|
list: List of changes for the memory.
|
|
"""
|
|
capture_event("mem0.history", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
return await asyncio.to_thread(self.db.get_history, memory_id)
|
|
|
|
async def _create_memory(self, data, existing_embeddings, metadata=None):
|
|
logging.debug(f"Creating memory with {data=}")
|
|
if data in existing_embeddings:
|
|
embeddings = existing_embeddings[data]
|
|
else:
|
|
embeddings = await asyncio.to_thread(self.embedding_model.embed, data, memory_action="add")
|
|
|
|
memory_id = str(uuid.uuid4())
|
|
metadata = metadata or {}
|
|
metadata["data"] = data
|
|
metadata["hash"] = hashlib.md5(data.encode()).hexdigest()
|
|
metadata["created_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat()
|
|
|
|
await asyncio.to_thread(
|
|
self.vector_store.insert,
|
|
vectors=[embeddings],
|
|
ids=[memory_id],
|
|
payloads=[metadata],
|
|
)
|
|
|
|
await asyncio.to_thread(
|
|
self.db.add_history,
|
|
memory_id,
|
|
None,
|
|
data,
|
|
"ADD",
|
|
created_at=metadata.get("created_at"),
|
|
actor_id=metadata.get("actor_id"),
|
|
role=metadata.get("role"),
|
|
)
|
|
|
|
capture_event("mem0._create_memory", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
return memory_id
|
|
|
|
async def _create_procedural_memory(self, messages, metadata=None,llm=None ,prompt=None):
|
|
"""
|
|
Create a procedural memory asynchronously
|
|
|
|
Args:
|
|
messages (list): List of messages to create a procedural memory from.
|
|
metadata (dict): Metadata to create a procedural memory from.
|
|
llm (llm, optional): LLM to use for the procedural memory creation. Defaults to None.
|
|
prompt (str, optional): Prompt to use for the procedural memory creation. Defaults to None.
|
|
"""
|
|
try:
|
|
from langchain_core.messages.utils import (
|
|
convert_to_messages, # type: ignore
|
|
)
|
|
except Exception:
|
|
logger.error(
|
|
"Import error while loading langchain-core. Please install 'langchain-core' to use procedural memory."
|
|
)
|
|
raise
|
|
|
|
logger.info("Creating procedural memory")
|
|
|
|
parsed_messages = [
|
|
{"role": "system", "content": prompt or PROCEDURAL_MEMORY_SYSTEM_PROMPT},
|
|
*messages,
|
|
{"role": "user", "content": "Create procedural memory of the above conversation."},
|
|
]
|
|
|
|
try:
|
|
if llm is not None:
|
|
parsed_messages = convert_to_messages(parsed_messages)
|
|
response = await asyncio.to_thread(llm.invoke, input=parsed_messages)
|
|
procedural_memory = response.content
|
|
else:
|
|
procedural_memory = await asyncio.to_thread(self.llm.generate_response, messages=parsed_messages)
|
|
except Exception as e:
|
|
logger.error(f"Error generating procedural memory summary: {e}")
|
|
raise
|
|
|
|
if metadata is None:
|
|
raise ValueError("Metadata cannot be done for procedural memory.")
|
|
|
|
metadata["memory_type"] = MemoryType.PROCEDURAL.value
|
|
embeddings = await asyncio.to_thread(self.embedding_model.embed, procedural_memory, memory_action="add")
|
|
memory_id = await self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata)
|
|
capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
|
|
result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]}
|
|
|
|
return result
|
|
|
|
async def _update_memory(self, memory_id, data, existing_embeddings, metadata=None):
|
|
logger.info(f"Updating memory with {data=}")
|
|
|
|
try:
|
|
existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id)
|
|
except Exception:
|
|
logger.error(f"Error getting memory with ID {memory_id} during update.")
|
|
raise ValueError(f"Error getting memory with ID {memory_id}. Please provide a valid 'memory_id'")
|
|
|
|
prev_value = existing_memory.payload.get("data")
|
|
|
|
new_metadata = deepcopy(metadata) if metadata is not None else {}
|
|
|
|
new_metadata["data"] = data
|
|
new_metadata["hash"] = hashlib.md5(data.encode()).hexdigest()
|
|
new_metadata["created_at"] = existing_memory.payload.get("created_at")
|
|
new_metadata["updated_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat()
|
|
|
|
if "user_id" in existing_memory.payload:
|
|
new_metadata["user_id"] = existing_memory.payload["user_id"]
|
|
if "agent_id" in existing_memory.payload:
|
|
new_metadata["agent_id"] = existing_memory.payload["agent_id"]
|
|
if "run_id" in existing_memory.payload:
|
|
new_metadata["run_id"] = existing_memory.payload["run_id"]
|
|
|
|
|
|
if "actor_id" in existing_memory.payload:
|
|
new_metadata["actor_id"] = existing_memory.payload["actor_id"]
|
|
if "role" in existing_memory.payload:
|
|
new_metadata["role"] = existing_memory.payload["role"]
|
|
|
|
if data in existing_embeddings:
|
|
embeddings = existing_embeddings[data]
|
|
else:
|
|
embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update")
|
|
|
|
await asyncio.to_thread(
|
|
self.vector_store.update,
|
|
vector_id=memory_id,
|
|
vector=embeddings,
|
|
payload=new_metadata,
|
|
)
|
|
logger.info(f"Updating memory with ID {memory_id=} with {data=}")
|
|
|
|
await asyncio.to_thread(
|
|
self.db.add_history,
|
|
memory_id,
|
|
prev_value,
|
|
data,
|
|
"UPDATE",
|
|
created_at=new_metadata["created_at"],
|
|
updated_at=new_metadata["updated_at"],
|
|
actor_id=new_metadata.get("actor_id"),
|
|
role=new_metadata.get("role"),
|
|
)
|
|
capture_event("mem0._update_memory", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
return memory_id
|
|
|
|
async def _delete_memory(self, memory_id):
|
|
logging.info(f"Deleting memory with {memory_id=}")
|
|
existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id)
|
|
prev_value = existing_memory.payload["data"]
|
|
|
|
await asyncio.to_thread(self.vector_store.delete, vector_id=memory_id)
|
|
await asyncio.to_thread(
|
|
self.db.add_history,
|
|
memory_id,
|
|
prev_value,
|
|
None,
|
|
"DELETE",
|
|
actor_id=existing_memory.payload.get("actor_id"),
|
|
role=existing_memory.payload.get("role"),
|
|
is_deleted=1,
|
|
)
|
|
|
|
capture_event("mem0._delete_memory", self, {"memory_id": memory_id, "sync_type": "async"})
|
|
return memory_id
|
|
|
|
async def reset(self):
|
|
"""
|
|
Reset the memory store asynchronously by:
|
|
Deletes the vector store collection
|
|
Resets the database
|
|
Recreates the vector store with a new client
|
|
"""
|
|
logger.warning("Resetting all memories")
|
|
await asyncio.to_thread(self.vector_store.delete_col)
|
|
|
|
gc.collect()
|
|
|
|
if hasattr(self.vector_store, "client") and hasattr(self.vector_store.client, "close"):
|
|
await asyncio.to_thread(self.vector_store.client.close)
|
|
|
|
if hasattr(self.db, "connection") and self.db.connection:
|
|
await asyncio.to_thread(lambda: self.db.connection.execute("DROP TABLE IF EXISTS history"))
|
|
await asyncio.to_thread(self.db.connection.close)
|
|
|
|
self.db = SQLiteManager(self.config.history_db_path)
|
|
|
|
self.vector_store = VectorStoreFactory.create(
|
|
self.config.vector_store.provider, self.config.vector_store.config
|
|
)
|
|
capture_event("mem0.reset", self, {"sync_type": "async"})
|
|
|
|
async def chat(self, query):
|
|
raise NotImplementedError("Chat function not implemented yet.")
|