# feat: Add Group Chat Memory Feature support to Python SDK enhancing mem0 (#2669)

This commit is contained in:
Chaithanya Kumar
2025-05-16 23:38:36 +05:30
committed by GitHub
parent 931df14e25
commit a1c9a63074
4 changed files with 1121 additions and 593 deletions

View File

@@ -203,6 +203,7 @@
"examples/aws_example", "examples/aws_example",
"examples/mem0-demo", "examples/mem0-demo",
"examples/ai_companion_js", "examples/ai_companion_js",
"examples/collaborative-task-agent",
"examples/eliza_os", "examples/eliza_os",
"examples/mem0-mastra", "examples/mem0-mastra",
"examples/mem0-with-ollama", "examples/mem0-with-ollama",

View File

@@ -0,0 +1,273 @@
---
title: Collaborative Task Agent
---
<Snippet file="paper-release.mdx" />
# Building a Collaborative Task Management System with Mem0
## Overview
Mem0's advanced attribution capabilities now allow you to create multi-user , multi-agent collaborative or chat systems by attaching an **`actor_id`** to each memory. By setting the users's name in `message["name"]`, you can build powerful team collaboration tools where contributions are properly attributed to their authors.
When using `infer=False`, messages are stored exactly as provided while still preserving actor metadata—making this approach ideal for:
- Multi-user chat applications
- Team brainstorming sessions
- Any collaborative "shared canvas" scenario
> ** Note**
> Actor attribution works today with `infer=False` mode.
> Full attribution support for the fact-extraction pipeline (`infer=True`) will be available in an upcoming release.
## Key Concepts
### Session Context
Session context is defined by one of three identifiers:
- **`user_id`**: Ideal for personal memory or user-specific data
- **`agent_id`**: Used for agent-specific memory storage
- **`run_id`**: Best for shared task contexts or collaborative spaces
Developers choose which identifier best represents their use case. In this example, we use `run_id` to create a shared project space where all team members can collaborate.
### Actor Attribution
Actor attribution is derived internally from:
- **`message["name"]`**: Becomes the `actor_id` in the memory's metadata
- **`message["role"]`**: Stored as the `role` in the memory's metadata
Note that `actor_id` is not a top-level parameter for the `add()` method, but is instead extracted from the message itself.
### Memory Filtering
When retrieving memories, you can filter by actor using the `filters` parameter:
```python
# Get all memories from a specific actor
memories = mem.search("query", run_id="landing-v1", filters={"actor_id": "alice"})
# Get all memories from all team members
all_memories = mem.get_all(run_id="landing-v1")
```
## Upcoming Features
Mem0 will soon support full actor attribution with `infer=True`, enabling automatic extraction of actor names during the fact extraction process. This enhancement will allow the system to:
1. Maintain attribution information when converting raw messages to semantic facts
2. Associate extracted knowledge with its original source
3. Track the provenance of information across complex interactions
Mem0's actor attribution system can power a wide range of advanced conversation and agent scenarios:
### Conversation Scenarios
| Scenario | Description | Implementation |
|----------|-------------|----------------|
| **Simple Chat** | One-to-one conversation between user and assistant
| **Multi-User Chat** | Multiple users conversing with a single assistant
| **Multi-Agent Chat** | Multiple AI assistants with distinct personas or capabilities
| **Group Chat** | Complex interactions between multiple humans and assistants
### Agent-Based Applications
The collaborative task agent uses a simple but powerful architecture:
* A **shared project space** identified by a single `run_id`
* Each participant (user or AI) writes with their own **unique name** which becomes the `actor_id` in Mem0
* All memories can be searched, filtered, or visualized by actor
## Implementation
Below is a complete implementation of a collaborative task agent that demonstrates how to build team-oriented applications with Mem0.
```python
from openai import OpenAI
from mem0 import Memory
import os
from datetime import datetime # For parsing and formatting timestamps
# Configuration
os.environ["OPENAI_API_KEY"] = "sk-your-key" # Replace with your key
client = OpenAI()
RUN_ID = "landing-v1" # Shared project context
APP_ID = "task-agent-demo" # Application identifier
# Initialize Mem0 with default settings (local Qdrant + SQLite)
# Ensure the path is writable if not using in-memory
mem = Memory()
class TaskAgent:
def __init__(self, run_id: str):
"""
Initialize a collaborative task agent for a specific project.
Args:
run_id: Unique identifier for this project workspace
"""
self.run_id = run_id
self.mem = mem
def add_message(self, role: str, speaker: str, content: str):
"""
Store a chat message with proper attribution.
Args:
role: Message role (user, assistant, system)
speaker: Name of the person/agent speaking (becomes actor_id)
content: The actual message content
"""
msg = {"role": role, "name": speaker, "content": content}
# Ensure created_at is stored. Mem0 does this by default.
self.mem.add(
[msg],
run_id=self.run_id,
metadata={"app_id": APP_ID},
infer=False
)
def brainstorm(self, prompt: str, speaker: str = "assistant", search_limit: int = 10, exclude_assistant_context: bool = False):
"""
Generate a response based on project context and team input.
Args:
prompt: The question or task to address
speaker: Name to attribute the assistant's response to
search_limit: Max number of memories to retrieve for context
exclude_assistant_context: If True, filters out assistant's own messages from context
Returns:
str: The assistant's response
"""
# Retrieve relevant context from team's shared memory
# Fetch a bit more if we plan to filter, to ensure we still get enough relevant user messages.
fetch_limit = search_limit + 5 if exclude_assistant_context else search_limit
retrieved_memories = self.mem.search(prompt, run_id=self.run_id, limit=fetch_limit)["results"]
# Client-side sorting by 'created_at' to prioritize recent memories for context.
# Note: Timestamps should be in a directly comparable format or parsed.
# Mem0 stores created_at as ISO format strings, which are comparable.
retrieved_memories.sort(key=lambda m: m.get('created_at', ''), reverse=True)
ctx_for_llm = []
if exclude_assistant_context:
for m in retrieved_memories:
if m.get("role") != "assistant":
ctx_for_llm.append(m)
if len(ctx_for_llm) >= search_limit:
break
else:
ctx_for_llm = retrieved_memories[:search_limit]
context_parts = []
for m in ctx_for_llm:
actor = m.get('actor_id') or "Unknown"
# Attempt to parse and format the timestamp for better readability
try:
ts_iso = m.get('created_at', '')
if ts_iso:
ts_obj = datetime.fromisoformat(ts_iso.replace('Z', '+00:00')) # Handle Zulu time
formatted_ts = ts_obj.strftime('%Y-%m-%d %H:%M:%S %Z')
else:
formatted_ts = "Timestamp N/A"
except ValueError:
formatted_ts = ts_iso # Fallback to raw string if parsing fails
context_parts.append(f"- {m['memory']} (by {actor} at {formatted_ts})")
context_str = "\n".join(context_parts)
# Generate response with context-aware prompting
sys_prompt = "You are the team's project assistant. Use the provided memory context, paying attention to timestamps for recency, to answer the user's query or perform the task."
user_prompt_with_context = f"Query: {prompt}\n\nRelevant Context (most recent first):\n{context_str}"
msgs = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt_with_context}
]
reply = client.chat.completions.create(
model="gpt-4o-mini",
messages=msgs
).choices[0].message.content.strip()
# Store the assistant's response with attribution
self.add_message("assistant", speaker, reply)
return reply
def dump(self, sort_by_time: bool = True, group_by_speaker: bool = False):
"""
Display all messages in the shared project space with attribution.
Can be sorted by time and/or grouped by speaker.
"""
results = self.mem.get_all(run_id=self.run_id)["results"]
if not results:
print("No memories found for this run.")
return
# Sort by 'created_at' if requested
if sort_by_time:
results.sort(key=lambda m: m.get('created_at', ''))
print(f"\n--- Project memory (run_id: {self.run_id}, sorted by time) ---")
else:
print(f"\n--- Project memory (run_id: {self.run_id}) ---")
if group_by_speaker:
from collections import defaultdict
grouped_memories = defaultdict(list)
for m in results: # Use already potentially sorted results
grouped_memories[m.get("actor_id") or "Unknown"].append(m)
for speaker, mem_list in grouped_memories.items():
print(f"\n=== Speaker: {speaker} ===")
# If not already sorted by time globally, sort within group
# If already sorted globally, this re-sort is redundant unless different key.
# For simplicity, if sort_by_time was true, list is already sorted.
for m_item in mem_list:
timestamp_str = m_item.get('created_at', 'Timestamp N/A')
try:
# Basic parsing for display, adjust as needed
dt_obj = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
formatted_time = timestamp_str # Fallback
print(f"[{formatted_time:19}] {m_item['memory']}")
else: # Not grouping by speaker
for m in results:
who = m.get("actor_id") or "Unknown"
timestamp_str = m.get('created_at', 'Timestamp N/A')
try:
dt_obj = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
formatted_time = timestamp_str # Fallback
print(f"[{formatted_time:19}][{who:8}] {m['memory']}")
# Demo Usage
agent = TaskAgent(RUN_ID)
# Team collaboration session
agent.add_message("user", "alice", "Let's list tasks for the new landing page.")
agent.add_message("user", "bob", "I'll own the hero section copy. Maybe tomorrow.")
agent.add_message("user", "carol", "I'll choose three product screenshots later today.")
agent.add_message("user", "alice", "Actually, I will work on the hero section copy today.")
print("\nAssistant brainstorm reply (default settings):\n")
print(agent.brainstorm("What are the current open tasks related to the hero section?"))
print("\nAssistant brainstorm reply (excluding its own prior context):\n")
print(agent.brainstorm("Summarize what Alice is working on.", exclude_assistant_context=True))
print("\n--- Dump (sorted by time by default) ---")
agent.dump()
print("\n--- Dump (grouped by speaker, also sorted by time globally) ---")
agent.dump(group_by_speaker=True)
print("\n--- Dump (default order, not sorted by time explicitly by dump) ---")
agent.dump(sort_by_time=False)
```

File diff suppressed because it is too large Load Diff

View File

@@ -1,144 +1,160 @@
import sqlite3 import sqlite3
import threading import threading
import uuid import uuid
import logging
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class SQLiteManager: class SQLiteManager:
def __init__(self, db_path=":memory:"): def __init__(self, db_path: str = ":memory:"):
self.connection = sqlite3.connect(db_path, check_same_thread=False) self.db_path = db_path
self.connection = sqlite3.connect(self.db_path, check_same_thread=False)
self._lock = threading.Lock() self._lock = threading.Lock()
self._migrate_history_table() self._migrate_history_table()
self._create_history_table() self._create_history_table()
def _migrate_history_table(self): def _migrate_history_table(self) -> None:
with self._lock: """
with self.connection: If a pre-existing history table had the old group-chat columns,
cursor = self.connection.cursor() rename it, create the new schema, copy the intersecting data, then
drop the old table.
"""
with self._lock, self.connection:
cur = self.connection.cursor()
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='history'"
)
if cur.fetchone() is None:
return # nothing to migrate
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='history'") cur.execute("PRAGMA table_info(history)")
table_exists = cursor.fetchone() is not None old_cols = {row[1] for row in cur.fetchall()}
if table_exists: expected_cols = {
# Get the current schema of the history table "id",
cursor.execute("PRAGMA table_info(history)") "memory_id",
current_schema = {row[1]: row[2] for row in cursor.fetchall()} "old_memory",
"new_memory",
"event",
"created_at",
"updated_at",
"is_deleted",
"actor_id",
"role",
}
# Define the expected schema if old_cols == expected_cols:
expected_schema = { return
"id": "TEXT",
"memory_id": "TEXT",
"old_memory": "TEXT",
"new_memory": "TEXT",
"new_value": "TEXT",
"event": "TEXT",
"created_at": "DATETIME",
"updated_at": "DATETIME",
"is_deleted": "INTEGER",
}
# Check if the schemas are the same logger.info("Migrating history table to new schema (no convo columns).")
if current_schema != expected_schema: cur.execute("ALTER TABLE history RENAME TO history_old")
# Rename the old table
cursor.execute("ALTER TABLE history RENAME TO old_history")
cursor.execute( self._create_history_table()
"""
CREATE TABLE IF NOT EXISTS history (
id TEXT PRIMARY KEY,
memory_id TEXT,
old_memory TEXT,
new_memory TEXT,
new_value TEXT,
event TEXT,
created_at DATETIME,
updated_at DATETIME,
is_deleted INTEGER
)
"""
)
# Copy data from the old table to the new table intersecting = list(expected_cols & old_cols)
cursor.execute( cols_csv = ", ".join(intersecting)
""" cur.execute(
INSERT INTO history (id, memory_id, old_memory, new_memory, new_value, event, created_at, updated_at, is_deleted) f"INSERT INTO history ({cols_csv}) SELECT {cols_csv} FROM history_old"
SELECT id, memory_id, prev_value, new_value, new_value, event, timestamp, timestamp, is_deleted )
FROM old_history cur.execute("DROP TABLE history_old")
""" # noqa: E501
)
cursor.execute("DROP TABLE old_history") def _create_history_table(self) -> None:
with self._lock, self.connection:
self.connection.commit() self.connection.execute(
def _create_history_table(self):
with self._lock:
with self.connection:
self.connection.execute(
"""
CREATE TABLE IF NOT EXISTS history (
id TEXT PRIMARY KEY,
memory_id TEXT,
old_memory TEXT,
new_memory TEXT,
new_value TEXT,
event TEXT,
created_at DATETIME,
updated_at DATETIME,
is_deleted INTEGER
)
""" """
CREATE TABLE IF NOT EXISTS history (
id TEXT PRIMARY KEY,
memory_id TEXT,
old_memory TEXT,
new_memory TEXT,
event TEXT,
created_at DATETIME,
updated_at DATETIME,
is_deleted INTEGER,
actor_id TEXT,
role TEXT
) )
"""
)
def add_history( def add_history(
self, self,
memory_id, memory_id: str,
old_memory, old_memory: Optional[str],
new_memory, new_memory: Optional[str],
event, event: str,
created_at=None, *,
updated_at=None, created_at: Optional[str] = None,
is_deleted=0, updated_at: Optional[str] = None,
): is_deleted: int = 0,
with self._lock: actor_id: Optional[str] = None,
with self.connection: role: Optional[str] = None,
self.connection.execute( ) -> None:
""" with self._lock, self.connection:
INSERT INTO history (id, memory_id, old_memory, new_memory, event, created_at, updated_at, is_deleted) self.connection.execute(
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(uuid.uuid4()),
memory_id,
old_memory,
new_memory,
event,
created_at,
updated_at,
is_deleted,
),
)
def get_history(self, memory_id):
with self._lock:
cursor = self.connection.execute(
""" """
SELECT id, memory_id, old_memory, new_memory, event, created_at, updated_at INSERT INTO history (
id, memory_id, old_memory, new_memory, event,
created_at, updated_at, is_deleted, actor_id, role
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(uuid.uuid4()),
memory_id,
old_memory,
new_memory,
event,
created_at,
updated_at,
is_deleted,
actor_id,
role,
),
)
def get_history(self, memory_id: str) -> List[Dict[str, Any]]:
with self._lock:
cur = self.connection.execute(
"""
SELECT id, memory_id, old_memory, new_memory, event,
created_at, updated_at, is_deleted, actor_id, role
FROM history FROM history
WHERE memory_id = ? WHERE memory_id = ?
ORDER BY updated_at ASC ORDER BY created_at ASC, DATETIME(updated_at) ASC
""", """,
(memory_id,), (memory_id,),
) )
rows = cursor.fetchall() rows = cur.fetchall()
return [
{ return [
"id": row[0], {
"memory_id": row[1], "id": r[0],
"old_memory": row[2], "memory_id": r[1],
"new_memory": row[3], "old_memory": r[2],
"event": row[4], "new_memory": r[3],
"created_at": row[5], "event": r[4],
"updated_at": row[6], "created_at": r[5],
} "updated_at": r[6],
for row in rows "is_deleted": bool(r[7]),
] "actor_id": r[8],
"role": r[9],
}
for r in rows
]
def reset(self) -> None:
"""Drop and recreate the history table."""
with self._lock, self.connection:
self.connection.execute("DROP TABLE IF EXISTS history")
self._create_history_table()
def close(self) -> None:
if self.connection:
self.connection.close()
self.connection = None
def __del__(self):
self.close()