diff --git a/mem0/memory/storage.py b/mem0/memory/storage.py index 7df0e000..967dc0c8 100644 --- a/mem0/memory/storage.py +++ b/mem0/memory/storage.py @@ -21,59 +21,107 @@ class SQLiteManager: rename it, create the new schema, copy the intersecting data, then drop the old table. """ - with self._lock, self.connection: - cur = self.connection.cursor() - cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='history'") - if cur.fetchone() is None: - return # nothing to migrate + with self._lock: + try: + # Start a transaction + self.connection.execute("BEGIN") + cur = self.connection.cursor() - cur.execute("PRAGMA table_info(history)") - old_cols = {row[1] for row in cur.fetchall()} + cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='history'") + if cur.fetchone() is None: + self.connection.execute("COMMIT") + return # nothing to migrate - expected_cols = { - "id", - "memory_id", - "old_memory", - "new_memory", - "event", - "created_at", - "updated_at", - "is_deleted", - "actor_id", - "role", - } + cur.execute("PRAGMA table_info(history)") + old_cols = {row[1] for row in cur.fetchall()} - if old_cols == expected_cols: - return + expected_cols = { + "id", + "memory_id", + "old_memory", + "new_memory", + "event", + "created_at", + "updated_at", + "is_deleted", + "actor_id", + "role", + } - logger.info("Migrating history table to new schema (no convo columns).") - cur.execute("ALTER TABLE history RENAME TO history_old") + if old_cols == expected_cols: + self.connection.execute("COMMIT") + return - self._create_history_table() + logger.info("Migrating history table to new schema (no convo columns).") - intersecting = list(expected_cols & old_cols) - cols_csv = ", ".join(intersecting) - cur.execute(f"INSERT INTO history ({cols_csv}) SELECT {cols_csv} FROM history_old") - cur.execute("DROP TABLE history_old") + # Clean up any existing history_old table from previous failed migration + cur.execute("DROP TABLE IF EXISTS history_old") + + # Rename the current history table + cur.execute("ALTER TABLE history RENAME TO history_old") + + # Create the new history table with updated schema + cur.execute( + """ + CREATE TABLE history ( + id TEXT PRIMARY KEY, + memory_id TEXT, + old_memory TEXT, + new_memory TEXT, + event TEXT, + created_at DATETIME, + updated_at DATETIME, + is_deleted INTEGER, + actor_id TEXT, + role TEXT + ) + """ + ) + + # Copy data from old table to new table + intersecting = list(expected_cols & old_cols) + if intersecting: + cols_csv = ", ".join(intersecting) + cur.execute(f"INSERT INTO history ({cols_csv}) SELECT {cols_csv} FROM history_old") + + # Drop the old table + cur.execute("DROP TABLE history_old") + + # Commit the transaction + self.connection.execute("COMMIT") + logger.info("History table migration completed successfully.") + + except Exception as e: + # Rollback the transaction on any error + self.connection.execute("ROLLBACK") + logger.error(f"History table migration failed: {e}") + raise def _create_history_table(self) -> None: - with self._lock, self.connection: - self.connection.execute( + with self._lock: + try: + self.connection.execute("BEGIN") + self.connection.execute( + """ + CREATE TABLE IF NOT EXISTS history ( + id TEXT PRIMARY KEY, + memory_id TEXT, + old_memory TEXT, + new_memory TEXT, + event TEXT, + created_at DATETIME, + updated_at DATETIME, + is_deleted INTEGER, + actor_id TEXT, + role TEXT + ) """ - CREATE TABLE IF NOT EXISTS history ( - id TEXT PRIMARY KEY, - memory_id TEXT, - old_memory TEXT, - new_memory TEXT, - event TEXT, - created_at DATETIME, - updated_at DATETIME, - is_deleted INTEGER, - actor_id TEXT, - role TEXT ) - """ - ) + self.connection.execute("COMMIT") + except Exception as e: + self.connection.execute("ROLLBACK") + logger.error(f"Failed to create history table: {e}") + raise def add_history( self, @@ -88,28 +136,35 @@ class SQLiteManager: actor_id: Optional[str] = None, role: Optional[str] = None, ) -> None: - with self._lock, self.connection: - self.connection.execute( - """ - INSERT INTO history ( - id, memory_id, old_memory, new_memory, event, - created_at, updated_at, is_deleted, actor_id, role + with self._lock: + try: + self.connection.execute("BEGIN") + self.connection.execute( + """ + INSERT INTO history ( + id, memory_id, old_memory, new_memory, event, + created_at, updated_at, is_deleted, actor_id, role + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(uuid.uuid4()), + memory_id, + old_memory, + new_memory, + event, + created_at, + updated_at, + is_deleted, + actor_id, + role, + ), ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - str(uuid.uuid4()), - memory_id, - old_memory, - new_memory, - event, - created_at, - updated_at, - is_deleted, - actor_id, - role, - ), - ) + self.connection.execute("COMMIT") + except Exception as e: + self.connection.execute("ROLLBACK") + logger.error(f"Failed to add history record: {e}") + raise def get_history(self, memory_id: str) -> List[Dict[str, Any]]: with self._lock: @@ -143,9 +198,16 @@ class SQLiteManager: def reset(self) -> None: """Drop and recreate the history table.""" - with self._lock, self.connection: - self.connection.execute("DROP TABLE IF EXISTS history") - self._create_history_table() + with self._lock: + try: + self.connection.execute("BEGIN") + self.connection.execute("DROP TABLE IF EXISTS history") + self.connection.execute("COMMIT") + self._create_history_table() + except Exception as e: + self.connection.execute("ROLLBACK") + logger.error(f"Failed to reset history table: {e}") + raise def close(self) -> None: if self.connection: diff --git a/pyproject.toml b/pyproject.toml index 7ea01c1f..1bed7579 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "mem0ai" -version = "0.1.106" +version = "0.1.107" description = "Long-term memory for AI Agents" authors = [ { name = "Mem0", email = "founders@mem0.ai" }