Update capture_event (#2527)

This commit is contained in:
Dev Khant
2025-04-16 10:42:36 +05:30
committed by GitHub
parent f77a084d1b
commit 541030d69c
2 changed files with 75 additions and 75 deletions

View File

@@ -62,7 +62,7 @@ class Memory(MemoryBase):
self.graph = MemoryGraph(self.config)
self.enable_graph = True
capture_event("mem0.init", self)
capture_event("mem0.init", self, {"sync_type": "sync"})
@classmethod
def from_config(cls, config_dict: Dict[str, Any]):
@@ -318,7 +318,7 @@ class Memory(MemoryBase):
capture_event(
"mem0.add",
self,
{"version": self.api_version, "keys": list(filters.keys())},
{"version": self.api_version, "keys": list(filters.keys()), "sync_type": "sync"},
)
return returned_memories
@@ -344,7 +344,7 @@ class Memory(MemoryBase):
Returns:
dict: Retrieved memory.
"""
capture_event("mem0.get", self, {"memory_id": memory_id})
capture_event("mem0.get", self, {"memory_id": memory_id, "sync_type": "sync"})
memory = self.vector_store.get(vector_id=memory_id)
if not memory:
return None
@@ -394,7 +394,7 @@ class Memory(MemoryBase):
if run_id:
filters["run_id"] = run_id
capture_event("mem0.get_all", self, {"limit": limit, "keys": list(filters.keys())})
capture_event("mem0.get_all", self, {"limit": limit, "keys": list(filters.keys()), "sync_type": "sync"})
with concurrent.futures.ThreadPoolExecutor() as executor:
future_memories = executor.submit(self._get_all_from_vector_store, filters, limit)
@@ -484,7 +484,7 @@ class Memory(MemoryBase):
capture_event(
"mem0.search",
self,
{"limit": limit, "version": self.api_version, "keys": list(filters.keys())},
{"limit": limit, "version": self.api_version, "keys": list(filters.keys()), "sync_type": "sync"},
)
with concurrent.futures.ThreadPoolExecutor() as executor:
@@ -563,7 +563,7 @@ class Memory(MemoryBase):
Returns:
dict: Updated memory.
"""
capture_event("mem0.update", self, {"memory_id": memory_id})
capture_event("mem0.update", self, {"memory_id": memory_id, "sync_type": "sync"})
existing_embeddings = {data: self.embedding_model.embed(data, "update")}
@@ -577,7 +577,7 @@ class Memory(MemoryBase):
Args:
memory_id (str): ID of the memory to delete.
"""
capture_event("mem0.delete", self, {"memory_id": memory_id})
capture_event("mem0.delete", self, {"memory_id": memory_id, "sync_type": "sync"})
self._delete_memory(memory_id)
return {"message": "Memory deleted successfully!"}
@@ -603,7 +603,7 @@ class Memory(MemoryBase):
"At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method."
)
capture_event("mem0.delete_all", self, {"keys": list(filters.keys())})
capture_event("mem0.delete_all", self, {"keys": list(filters.keys()), "sync_type": "sync"})
memories = self.vector_store.list(filters=filters)[0]
for memory in memories:
self._delete_memory(memory.id)
@@ -625,7 +625,7 @@ class Memory(MemoryBase):
Returns:
list: List of changes for the memory.
"""
capture_event("mem0.history", self, {"memory_id": memory_id})
capture_event("mem0.history", self, {"memory_id": memory_id, "sync_type": "sync"})
return self.db.get_history(memory_id)
def _create_memory(self, data, existing_embeddings, metadata=None):
@@ -646,7 +646,7 @@ class Memory(MemoryBase):
payloads=[metadata],
)
self.db.add_history(memory_id, None, data, "ADD", created_at=metadata["created_at"])
capture_event("mem0._create_memory", self, {"memory_id": memory_id})
capture_event("mem0._create_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
return memory_id
def _create_procedural_memory(self, messages, metadata=None, prompt=None):
@@ -683,7 +683,7 @@ class Memory(MemoryBase):
embeddings = self.embedding_model.embed(procedural_memory, memory_action="add")
# Create the memory
memory_id = self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata)
capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id})
capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
# Return results in the same format as add()
result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]}
@@ -730,7 +730,7 @@ class Memory(MemoryBase):
created_at=new_metadata["created_at"],
updated_at=new_metadata["updated_at"],
)
capture_event("mem0._update_memory", self, {"memory_id": memory_id})
capture_event("mem0._update_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
return memory_id
def _delete_memory(self, memory_id):
@@ -739,7 +739,7 @@ class Memory(MemoryBase):
prev_value = existing_memory.payload["data"]
self.vector_store.delete(vector_id=memory_id)
self.db.add_history(memory_id, prev_value, None, "DELETE", is_deleted=1)
capture_event("mem0._delete_memory", self, {"memory_id": memory_id})
capture_event("mem0._delete_memory", self, {"memory_id": memory_id, "sync_type": "sync"})
return memory_id
def reset(self):
@@ -752,7 +752,7 @@ class Memory(MemoryBase):
self.config.vector_store.provider, self.config.vector_store.config
)
self.db.reset()
capture_event("mem0.reset", self)
capture_event("mem0.reset", self, {"sync_type": "sync"})
def chat(self, query):
raise NotImplementedError("Chat function not implemented yet.")
@@ -783,7 +783,7 @@ class AsyncMemory(MemoryBase):
self.graph = MemoryGraph(self.config)
self.enable_graph = True
capture_event("async_mem0.init", self)
capture_event("mem0.init", self, {"sync_type": "async"})
@classmethod
async def from_config(cls, config_dict: Dict[str, Any]):
@@ -1063,7 +1063,7 @@ class AsyncMemory(MemoryBase):
except Exception as e:
logging.error(f"Error in new_memories_with_actions: {e}")
capture_event("async_mem0.add", self, {"version": self.api_version, "keys": list(filters.keys())})
capture_event("mem0.add", self, {"version": self.api_version, "keys": list(filters.keys()), "sync_type": "async"})
return returned_memories
@@ -1088,7 +1088,7 @@ class AsyncMemory(MemoryBase):
Returns:
dict: Retrieved memory.
"""
capture_event("async_mem0.get", self, {"memory_id": memory_id})
capture_event("mem0.get", self, {"memory_id": memory_id, "sync_type": "async"})
memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id)
if not memory:
return None
@@ -1129,7 +1129,7 @@ class AsyncMemory(MemoryBase):
if run_id:
filters["run_id"] = run_id
capture_event("async_mem0.get_all", self, {"limit": limit, "keys": list(filters.keys())})
capture_event("mem0.get_all", self, {"limit": limit, "keys": list(filters.keys()), "sync_type": "async"})
# Run vector store and graph operations concurrently
vector_store_task = asyncio.create_task(self._get_all_from_vector_store(filters, limit))
@@ -1216,9 +1216,9 @@ class AsyncMemory(MemoryBase):
raise ValueError("One of the filters: user_id, agent_id or run_id is required!")
capture_event(
"async_mem0.search",
"mem0.search",
self,
{"limit": limit, "version": self.api_version, "keys": list(filters.keys())},
{"limit": limit, "version": self.api_version, "keys": list(filters.keys()), "sync_type": "async"},
)
# Run vector store and graph operations concurrently
@@ -1296,7 +1296,7 @@ class AsyncMemory(MemoryBase):
Returns:
dict: Updated memory.
"""
capture_event("async_mem0.update", self, {"memory_id": memory_id})
capture_event("mem0.update", self, {"memory_id": memory_id, "sync_type": "async"})
embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update")
existing_embeddings = {data: embeddings}
@@ -1311,7 +1311,7 @@ class AsyncMemory(MemoryBase):
Args:
memory_id (str): ID of the memory to delete.
"""
capture_event("async_mem0.delete", self, {"memory_id": memory_id})
capture_event("mem0.delete", self, {"memory_id": memory_id, "sync_type": "async"})
await self._delete_memory(memory_id)
return {"message": "Memory deleted successfully!"}
@@ -1337,7 +1337,7 @@ class AsyncMemory(MemoryBase):
"At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method."
)
capture_event("async_mem0.delete_all", self, {"keys": list(filters.keys())})
capture_event("mem0.delete_all", self, {"keys": list(filters.keys()), "sync_type": "async"})
memories = await asyncio.to_thread(self.vector_store.list, filters=filters)
delete_tasks = []
@@ -1363,7 +1363,7 @@ class AsyncMemory(MemoryBase):
Returns:
list: List of changes for the memory.
"""
capture_event("async_mem0.history", self, {"memory_id": memory_id})
capture_event("mem0.history", self, {"memory_id": memory_id, "sync_type": "async"})
return await asyncio.to_thread(self.db.get_history, memory_id)
async def _create_memory(self, data, existing_embeddings, metadata=None):
@@ -1388,7 +1388,7 @@ class AsyncMemory(MemoryBase):
await asyncio.to_thread(self.db.add_history, memory_id, None, data, "ADD", created_at=metadata["created_at"])
capture_event("async_mem0._create_memory", self, {"memory_id": memory_id})
capture_event("mem0._create_memory", self, {"memory_id": memory_id, "sync_type": "async"})
return memory_id
async def _create_procedural_memory(self, messages, metadata=None, llm=None, prompt=None):
@@ -1438,7 +1438,7 @@ class AsyncMemory(MemoryBase):
embeddings = await asyncio.to_thread(self.embedding_model.embed, procedural_memory, memory_action="add")
# Create the memory
memory_id = await self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata)
capture_event("async_mem0._create_procedural_memory", self, {"memory_id": memory_id})
capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id, "sync_type": "async"})
# Return results in the same format as add()
result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]}
@@ -1492,7 +1492,7 @@ class AsyncMemory(MemoryBase):
updated_at=new_metadata["updated_at"],
)
capture_event("async_mem0._update_memory", self, {"memory_id": memory_id})
capture_event("mem0._update_memory", self, {"memory_id": memory_id, "sync_type": "async"})
return memory_id
async def _delete_memory(self, memory_id):
@@ -1503,7 +1503,7 @@ class AsyncMemory(MemoryBase):
await asyncio.to_thread(self.vector_store.delete, vector_id=memory_id)
await asyncio.to_thread(self.db.add_history, memory_id, prev_value, None, "DELETE", is_deleted=1)
capture_event("async_mem0._delete_memory", self, {"memory_id": memory_id})
capture_event("mem0._delete_memory", self, {"memory_id": memory_id, "sync_type": "async"})
return memory_id
async def reset(self):
@@ -1516,7 +1516,7 @@ class AsyncMemory(MemoryBase):
self.config.vector_store.provider, self.config.vector_store.config
)
await asyncio.to_thread(self.db.reset)
capture_event("async_mem0.reset", self)
capture_event("mem0.reset", self, {"sync_type": "async"})
async def chat(self, query):
raise NotImplementedError("Chat function not implemented yet.")