diff --git a/mem0/memory/graph_memory.py b/mem0/memory/graph_memory.py index 20218684..000fe2e7 100644 --- a/mem0/memory/graph_memory.py +++ b/mem0/memory/graph_memory.py @@ -165,7 +165,7 @@ class MemoryGraph: try: for tool_call in search_results["tool_calls"]: - if tool_call['name'] != "extract_entities": + if tool_call["name"] != "extract_entities": continue for item in tool_call["arguments"]["entities"]: entity_type_map[item["entity"]] = item["entity_type"] diff --git a/mem0/memory/main.py b/mem0/memory/main.py index 6c502309..64c0907d 100644 --- a/mem0/memory/main.py +++ b/mem0/memory/main.py @@ -783,7 +783,7 @@ class AsyncMemory(MemoryBase): self.graph = MemoryGraph(self.config) self.enable_graph = True - capture_event("mem0.init", self) + capture_event("async_mem0.init", self) @classmethod async def from_config(cls, config_dict: Dict[str, Any]): @@ -882,7 +882,7 @@ class AsyncMemory(MemoryBase): # Run vector store and graph operations concurrently vector_store_task = asyncio.create_task(self._add_to_vector_store(messages, metadata, filters, infer)) graph_task = asyncio.create_task(self._add_to_graph(messages, filters)) - + vector_store_result, graph_result = await asyncio.gather(vector_store_task, graph_task) if self.api_version == "v1.0": @@ -939,7 +939,7 @@ class AsyncMemory(MemoryBase): retrieved_old_memory = [] new_message_embeddings = {} - + # Process all facts concurrently async def process_fact(new_mem): messages_embeddings = await asyncio.to_thread(self.embedding_model.embed, new_mem, "add") @@ -952,15 +952,15 @@ class AsyncMemory(MemoryBase): filters=filters, ) return [(mem.id, mem.payload["data"]) for mem in existing_memories] - + fact_tasks = [process_fact(fact) for fact in new_retrieved_facts] fact_results = await asyncio.gather(*fact_tasks) - + # Flatten results and build retrieved_old_memory for result in fact_results: for mem_id, mem_data in result: retrieved_old_memory.append({"id": mem_id, "text": mem_data}) - + unique_data = {} for item in retrieved_old_memory: unique_data[item["id"]] = item @@ -1004,60 +1004,66 @@ class AsyncMemory(MemoryBase): logging.info("Skipping memory entry because of empty `text` field.") continue elif resp.get("event") == "ADD": - task = asyncio.create_task(self._create_memory( - data=resp.get("text"), - existing_embeddings=new_message_embeddings, - metadata=metadata - )) + task = asyncio.create_task( + self._create_memory( + data=resp.get("text"), existing_embeddings=new_message_embeddings, metadata=metadata + ) + ) memory_tasks.append((task, resp, "ADD", None)) elif resp.get("event") == "UPDATE": - task = asyncio.create_task(self._update_memory( - memory_id=temp_uuid_mapping[resp["id"]], - data=resp.get("text"), - existing_embeddings=new_message_embeddings, - metadata=metadata, - )) + task = asyncio.create_task( + self._update_memory( + memory_id=temp_uuid_mapping[resp["id"]], + data=resp.get("text"), + existing_embeddings=new_message_embeddings, + metadata=metadata, + ) + ) memory_tasks.append((task, resp, "UPDATE", temp_uuid_mapping[resp["id"]])) elif resp.get("event") == "DELETE": - task = asyncio.create_task(self._delete_memory( - memory_id=temp_uuid_mapping[resp.get("id")] - )) + task = asyncio.create_task(self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")])) memory_tasks.append((task, resp, "DELETE", temp_uuid_mapping[resp["id"]])) elif resp.get("event") == "NONE": logging.info("NOOP for Memory.") except Exception as e: logging.error(f"Error in new_memories_with_actions: {e}") - + # Wait for all memory operations to complete for task, resp, event_type, mem_id in memory_tasks: try: result_id = await task if event_type == "ADD": - returned_memories.append({ - "id": result_id, - "memory": resp.get("text"), - "event": resp.get("event"), - }) + returned_memories.append( + { + "id": result_id, + "memory": resp.get("text"), + "event": resp.get("event"), + } + ) elif event_type == "UPDATE": - returned_memories.append({ - "id": mem_id, - "memory": resp.get("text"), - "event": resp.get("event"), - "previous_memory": resp.get("old_memory"), - }) + returned_memories.append( + { + "id": mem_id, + "memory": resp.get("text"), + "event": resp.get("event"), + "previous_memory": resp.get("old_memory"), + } + ) elif event_type == "DELETE": - returned_memories.append({ - "id": mem_id, - "memory": resp.get("text"), - "event": resp.get("event"), - }) + returned_memories.append( + { + "id": mem_id, + "memory": resp.get("text"), + "event": resp.get("event"), + } + ) except Exception as e: logging.error(f"Error processing memory task: {e}") - + except Exception as e: logging.error(f"Error in new_memories_with_actions: {e}") - capture_event("mem0.add", self, {"version": self.api_version, "keys": list(filters.keys())}) + capture_event("async_mem0.add", self, {"version": self.api_version, "keys": list(filters.keys())}) return returned_memories @@ -1082,7 +1088,7 @@ class AsyncMemory(MemoryBase): Returns: dict: Retrieved memory. """ - capture_event("mem0.get", self, {"memory_id": memory_id}) + capture_event("async_mem0.get", self, {"memory_id": memory_id}) memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) if not memory: return None @@ -1123,11 +1129,11 @@ class AsyncMemory(MemoryBase): if run_id: filters["run_id"] = run_id - capture_event("mem0.get_all", self, {"limit": limit, "keys": list(filters.keys())}) + capture_event("async_mem0.get_all", self, {"limit": limit, "keys": list(filters.keys())}) # Run vector store and graph operations concurrently vector_store_task = asyncio.create_task(self._get_all_from_vector_store(filters, limit)) - + if self.enable_graph: graph_task = asyncio.create_task(asyncio.to_thread(self.graph.get_all, filters, limit)) all_memories, graph_entities = await asyncio.gather(vector_store_task, graph_task) @@ -1210,14 +1216,14 @@ class AsyncMemory(MemoryBase): raise ValueError("One of the filters: user_id, agent_id or run_id is required!") capture_event( - "mem0.search", + "async_mem0.search", self, {"limit": limit, "version": self.api_version, "keys": list(filters.keys())}, ) # Run vector store and graph operations concurrently vector_store_task = asyncio.create_task(self._search_vector_store(query, filters, limit)) - + if self.enable_graph: graph_task = asyncio.create_task(asyncio.to_thread(self.graph.search, query, filters, limit)) original_memories, graph_entities = await asyncio.gather(vector_store_task, graph_task) @@ -1243,11 +1249,7 @@ class AsyncMemory(MemoryBase): async def _search_vector_store(self, query, filters, limit): embeddings = await asyncio.to_thread(self.embedding_model.embed, query, "search") memories = await asyncio.to_thread( - self.vector_store.search, - query=query, - vectors=embeddings, - limit=limit, - filters=filters + self.vector_store.search, query=query, vectors=embeddings, limit=limit, filters=filters ) excluded_keys = { @@ -1294,7 +1296,7 @@ class AsyncMemory(MemoryBase): Returns: dict: Updated memory. """ - capture_event("mem0.update", self, {"memory_id": memory_id}) + capture_event("async_mem0.update", self, {"memory_id": memory_id}) embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update") existing_embeddings = {data: embeddings} @@ -1309,7 +1311,7 @@ class AsyncMemory(MemoryBase): Args: memory_id (str): ID of the memory to delete. """ - capture_event("mem0.delete", self, {"memory_id": memory_id}) + capture_event("async_mem0.delete", self, {"memory_id": memory_id}) await self._delete_memory(memory_id) return {"message": "Memory deleted successfully!"} @@ -1335,15 +1337,15 @@ class AsyncMemory(MemoryBase): "At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method." ) - capture_event("mem0.delete_all", self, {"keys": list(filters.keys())}) + capture_event("async_mem0.delete_all", self, {"keys": list(filters.keys())}) memories = await asyncio.to_thread(self.vector_store.list, filters=filters) - + delete_tasks = [] for memory in memories[0]: delete_tasks.append(self._delete_memory(memory.id)) - + await asyncio.gather(*delete_tasks) - + logger.info(f"Deleted {len(memories[0])} memories") if self.enable_graph: @@ -1361,7 +1363,7 @@ class AsyncMemory(MemoryBase): Returns: list: List of changes for the memory. """ - capture_event("mem0.history", self, {"memory_id": memory_id}) + capture_event("async_mem0.history", self, {"memory_id": memory_id}) return await asyncio.to_thread(self.db.get_history, memory_id) async def _create_memory(self, data, existing_embeddings, metadata=None): @@ -1370,7 +1372,7 @@ class AsyncMemory(MemoryBase): embeddings = existing_embeddings[data] else: embeddings = await asyncio.to_thread(self.embedding_model.embed, data, memory_action="add") - + memory_id = str(uuid.uuid4()) metadata = metadata or {} metadata["data"] = data @@ -1383,17 +1385,10 @@ class AsyncMemory(MemoryBase): ids=[memory_id], payloads=[metadata], ) - - await asyncio.to_thread( - self.db.add_history, - memory_id, - None, - data, - "ADD", - created_at=metadata["created_at"] - ) - - capture_event("mem0._create_memory", self, {"memory_id": memory_id}) + + await asyncio.to_thread(self.db.add_history, memory_id, None, data, "ADD", created_at=metadata["created_at"]) + + capture_event("async_mem0._create_memory", self, {"memory_id": memory_id}) return memory_id async def _create_procedural_memory(self, messages, metadata=None, llm=None, prompt=None): @@ -1411,7 +1406,9 @@ class AsyncMemory(MemoryBase): convert_to_messages, # type: ignore ) except Exception: - logger.error("Import error while loading langchain-core. Please install 'langchain-core' to use procedural memory.") + logger.error( + "Import error while loading langchain-core. Please install 'langchain-core' to use procedural memory." + ) raise logger.info("Creating procedural memory") @@ -1428,10 +1425,7 @@ class AsyncMemory(MemoryBase): response = await asyncio.to_thread(llm.invoke, input=parsed_messages) procedural_memory = response.content else: - procedural_memory = await asyncio.to_thread( - self.llm.generate_response, - messages=parsed_messages - ) + procedural_memory = await asyncio.to_thread(self.llm.generate_response, messages=parsed_messages) except Exception as e: logger.error(f"Error generating procedural memory summary: {e}") raise @@ -1444,7 +1438,7 @@ class AsyncMemory(MemoryBase): embeddings = await asyncio.to_thread(self.embedding_model.embed, procedural_memory, memory_action="add") # Create the memory memory_id = await self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata) - capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id}) + capture_event("async_mem0._create_procedural_memory", self, {"memory_id": memory_id}) # Return results in the same format as add() result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]} @@ -1458,7 +1452,7 @@ class AsyncMemory(MemoryBase): existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) except Exception: raise ValueError(f"Error getting memory with ID {memory_id}. Please provide a valid 'memory_id'") - + prev_value = existing_memory.payload.get("data") new_metadata = metadata or {} @@ -1478,16 +1472,16 @@ class AsyncMemory(MemoryBase): embeddings = existing_embeddings[data] else: embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update") - + await asyncio.to_thread( self.vector_store.update, vector_id=memory_id, vector=embeddings, payload=new_metadata, ) - + logger.info(f"Updating memory with ID {memory_id=} with {data=}") - + await asyncio.to_thread( self.db.add_history, memory_id, @@ -1497,19 +1491,19 @@ class AsyncMemory(MemoryBase): created_at=new_metadata["created_at"], updated_at=new_metadata["updated_at"], ) - - capture_event("mem0._update_memory", self, {"memory_id": memory_id}) + + capture_event("async_mem0._update_memory", self, {"memory_id": memory_id}) return memory_id async def _delete_memory(self, memory_id): logging.info(f"Deleting memory with {memory_id=}") existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) prev_value = existing_memory.payload["data"] - + await asyncio.to_thread(self.vector_store.delete, vector_id=memory_id) await asyncio.to_thread(self.db.add_history, memory_id, prev_value, None, "DELETE", is_deleted=1) - - capture_event("mem0._delete_memory", self, {"memory_id": memory_id}) + + capture_event("async_mem0._delete_memory", self, {"memory_id": memory_id}) return memory_id async def reset(self): @@ -1522,7 +1516,7 @@ class AsyncMemory(MemoryBase): self.config.vector_store.provider, self.config.vector_store.config ) await asyncio.to_thread(self.db.reset) - capture_event("mem0.reset", self) + capture_event("async_mem0.reset", self) async def chat(self, query): raise NotImplementedError("Chat function not implemented yet.")