355 lines
11 KiB
Plaintext
355 lines
11 KiB
Plaintext
---
|
|
title: Livekit
|
|
---
|
|
|
|
<Snippet file="paper-release.mdx" />
|
|
|
|
This guide demonstrates how to create a memory-enabled voice assistant using LiveKit, Deepgram, OpenAI, and Mem0, focusing on creating an intelligent, context-aware travel planning agent.
|
|
|
|
## Prerequisites
|
|
|
|
Before you begin, make sure you have:
|
|
|
|
1. Installed Livekit Agents SDK with voice dependencies of silero and deepgram:
|
|
```bash
|
|
pip install livekit \
|
|
livekit-agents \
|
|
livekit-plugins-silero \
|
|
livekit-plugins-deepgram \
|
|
livekit-plugins-openai
|
|
```
|
|
|
|
2. Installed Mem0 SDK:
|
|
```bash
|
|
pip install mem0ai
|
|
```
|
|
|
|
3. Set up your API keys in a `.env` file:
|
|
```sh
|
|
LIVEKIT_URL=your_livekit_url
|
|
LIVEKIT_API_KEY=your_livekit_api_key
|
|
LIVEKIT_API_SECRET=your_livekit_api_secret
|
|
DEEPGRAM_API_KEY=your_deepgram_api_key
|
|
MEM0_API_KEY=your_mem0_api_key
|
|
OPENAI_API_KEY=your_openai_api_key
|
|
```
|
|
|
|
> **Note**: Make sure to have a Livekit and Deepgram account. You can find these variables `LIVEKIT_URL` , `LIVEKIT_API_KEY` and `LIVEKIT_API_SECRET` from [LiveKit Cloud Console](https://cloud.livekit.io/) and for more information you can refer this website [LiveKit Documentation](https://docs.livekit.io/home/cloud/keys-and-tokens/). For `DEEPGRAM_API_KEY` you can get from [Deepgram Console](https://console.deepgram.com/) refer this website [Deepgram Documentation](https://developers.deepgram.com/docs/create-additional-api-keys) for more details.
|
|
|
|
## Code Breakdown
|
|
|
|
Let's break down the key components of this implementation:
|
|
|
|
### 1. Setting Up Dependencies and Environment
|
|
|
|
```python
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import List, Dict, Any, Annotated
|
|
|
|
import aiohttp
|
|
from dotenv import load_dotenv
|
|
from livekit.agents import (
|
|
AutoSubscribe,
|
|
JobContext,
|
|
JobProcess,
|
|
WorkerOptions,
|
|
cli,
|
|
llm,
|
|
metrics,
|
|
)
|
|
from livekit import rtc, api
|
|
from livekit.agents.pipeline import VoicePipelineAgent
|
|
from livekit.plugins import deepgram, openai, silero
|
|
from mem0 import AsyncMemoryClient
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger("memory-assistant")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Define a global user ID for simplicity
|
|
USER_ID = "voice_user"
|
|
|
|
# Initialize Mem0 client
|
|
mem0 = AsyncMemoryClient()
|
|
```
|
|
|
|
This section handles:
|
|
- Importing required modules
|
|
- Loading environment variables
|
|
- Setting up logging
|
|
- Extracting user identification
|
|
- Initializing the Mem0 client
|
|
|
|
### 2. Memory Enrichment Function
|
|
|
|
```python
|
|
async def _enrich_with_memory(agent: VoicePipelineAgent, chat_ctx: llm.ChatContext):
|
|
"""Add memories and Augment chat context with relevant memories"""
|
|
if not chat_ctx.messages:
|
|
return
|
|
|
|
# Store user message in Mem0
|
|
user_msg = chat_ctx.messages[-1]
|
|
await mem0.add(
|
|
[{"role": "user", "content": user_msg.content}],
|
|
user_id=USER_ID
|
|
)
|
|
|
|
# Search for relevant memories
|
|
results = await mem0.search(
|
|
user_msg.content,
|
|
user_id=USER_ID,
|
|
)
|
|
|
|
# Augment context with retrieved memories
|
|
if results:
|
|
memories = ' '.join([result["memory"] for result in results])
|
|
logger.info(f"Enriching with memory: {memories}")
|
|
|
|
rag_msg = llm.ChatMessage.create(
|
|
text=f"Relevant Memory: {memories}\n",
|
|
role="assistant",
|
|
)
|
|
|
|
# Modify chat context with retrieved memories
|
|
chat_ctx.messages[-1] = rag_msg
|
|
chat_ctx.messages.append(user_msg)
|
|
```
|
|
|
|
This function:
|
|
- Stores user messages in Mem0
|
|
- Performs semantic search for relevant memories
|
|
- Augments the chat context with retrieved memories
|
|
- Enables contextually aware responses
|
|
|
|
### 3. Prewarm and Entrypoint Functions
|
|
|
|
```python
|
|
def prewarm_process(proc: JobProcess):
|
|
# Preload silero VAD in memory to speed up session start
|
|
proc.userdata["vad"] = silero.VAD.load()
|
|
|
|
async def entrypoint(ctx: JobContext):
|
|
# Connect to LiveKit room
|
|
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
|
|
|
|
# Wait for participant
|
|
participant = await ctx.wait_for_participant()
|
|
|
|
# Initialize Mem0 client
|
|
mem0 = AsyncMemoryClient()
|
|
|
|
# Define initial system context
|
|
initial_ctx = llm.ChatContext().append(
|
|
role="system",
|
|
text=(
|
|
"""
|
|
You are a helpful voice assistant.
|
|
You are a travel guide named George and will help the user to plan a travel trip of their dreams.
|
|
You should help the user plan for various adventures like work retreats, family vacations or solo backpacking trips.
|
|
You should be careful to not suggest anything that would be dangerous, illegal or inappropriate.
|
|
You can remember past interactions and use them to inform your answers.
|
|
Use semantic memory retrieval to provide contextually relevant responses.
|
|
"""
|
|
),
|
|
)
|
|
|
|
# Create VoicePipelineAgent with memory capabilities
|
|
agent = VoicePipelineAgent(
|
|
chat_ctx=initial_ctx,
|
|
vad=silero.VAD.load(),
|
|
stt=deepgram.STT(),
|
|
llm=openai.LLM(model="gpt-4o-mini"),
|
|
tts=openai.TTS(),
|
|
before_llm_cb=_enrich_with_memory,
|
|
)
|
|
|
|
# Start agent and initial greeting
|
|
agent.start(ctx.room, participant)
|
|
await agent.say(
|
|
"Hello! I'm George. Can I help you plan an upcoming trip? ",
|
|
allow_interruptions=True
|
|
)
|
|
|
|
# Run the application
|
|
if __name__ == "__main__":
|
|
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm_process))
|
|
```
|
|
|
|
The entrypoint function:
|
|
- Connects to LiveKit room
|
|
- Initializes Mem0 memory client
|
|
- Sets up initial system context
|
|
- Creates a VoicePipelineAgent with memory enrichment
|
|
- Starts the agent with an initial greeting
|
|
|
|
## Create a Memory-Enabled Voice Agent
|
|
|
|
Now that we've explained each component, here's the complete implementation that combines OpenAI Agents SDK for voice with Mem0's memory capabilities:
|
|
|
|
```python
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import List, Dict, Any, Annotated
|
|
|
|
import aiohttp
|
|
from dotenv import load_dotenv
|
|
from livekit.agents import (
|
|
AutoSubscribe,
|
|
JobContext,
|
|
JobProcess,
|
|
WorkerOptions,
|
|
cli,
|
|
llm,
|
|
metrics,
|
|
)
|
|
from livekit import rtc, api
|
|
from livekit.agents.pipeline import VoicePipelineAgent
|
|
from livekit.plugins import deepgram, openai, silero
|
|
from mem0 import AsyncMemoryClient
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger("memory-assistant")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Define a global user ID for simplicity
|
|
USER_ID = "voice_user"
|
|
|
|
# Initialize Mem0 memory client
|
|
mem0 = AsyncMemoryClient()
|
|
|
|
def prewarm_process(proc: JobProcess):
|
|
# Preload silero VAD in memory to speed up session start
|
|
proc.userdata["vad"] = silero.VAD.load()
|
|
|
|
async def entrypoint(ctx: JobContext):
|
|
# Connect to LiveKit room
|
|
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
|
|
|
|
# Wait for participant
|
|
participant = await ctx.wait_for_participant()
|
|
|
|
async def _enrich_with_memory(agent: VoicePipelineAgent, chat_ctx: llm.ChatContext):
|
|
"""Add memories and Augment chat context with relevant memories"""
|
|
if not chat_ctx.messages:
|
|
return
|
|
|
|
# Store user message in Mem0
|
|
user_msg = chat_ctx.messages[-1]
|
|
await mem0.add(
|
|
[{"role": "user", "content": user_msg.content}],
|
|
user_id=USER_ID
|
|
)
|
|
|
|
# Search for relevant memories
|
|
results = await mem0.search(
|
|
user_msg.content,
|
|
user_id=USER_ID,
|
|
)
|
|
|
|
# Augment context with retrieved memories
|
|
if results:
|
|
memories = ' '.join([result["memory"] for result in results])
|
|
logger.info(f"Enriching with memory: {memories}")
|
|
|
|
rag_msg = llm.ChatMessage.create(
|
|
text=f"Relevant Memory: {memories}\n",
|
|
role="assistant",
|
|
)
|
|
|
|
# Modify chat context with retrieved memories
|
|
chat_ctx.messages[-1] = rag_msg
|
|
chat_ctx.messages.append(user_msg)
|
|
|
|
# Define initial system context
|
|
initial_ctx = llm.ChatContext().append(
|
|
role="system",
|
|
text=(
|
|
"""
|
|
You are a helpful voice assistant.
|
|
You are a travel guide named George and will help the user to plan a travel trip of their dreams.
|
|
You should help the user plan for various adventures like work retreats, family vacations or solo backpacking trips.
|
|
You should be careful to not suggest anything that would be dangerous, illegal or inappropriate.
|
|
You can remember past interactions and use them to inform your answers.
|
|
Use semantic memory retrieval to provide contextually relevant responses.
|
|
"""
|
|
),
|
|
)
|
|
|
|
# Create VoicePipelineAgent with memory capabilities
|
|
agent = VoicePipelineAgent(
|
|
chat_ctx=initial_ctx,
|
|
vad=silero.VAD.load(),
|
|
stt=deepgram.STT(),
|
|
llm=openai.LLM(model="gpt-4o-mini"),
|
|
tts=openai.TTS(),
|
|
before_llm_cb=_enrich_with_memory,
|
|
)
|
|
|
|
# Start agent and initial greeting
|
|
agent.start(ctx.room, participant)
|
|
await agent.say(
|
|
"Hello! I'm George. Can I help you plan an upcoming trip? ",
|
|
allow_interruptions=True
|
|
)
|
|
|
|
# Run the application
|
|
if __name__ == "__main__":
|
|
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm_process))
|
|
```
|
|
|
|
## Key Features of This Implementation
|
|
|
|
1. **Semantic Memory Retrieval**: Uses Mem0 to store and retrieve contextually relevant memories
|
|
2. **Voice Interaction**: Leverages LiveKit for voice communication
|
|
3. **Intelligent Context Management**: Augments conversations with past interactions
|
|
4. **Travel Planning Specialization**: Focused on creating a helpful travel guide assistant
|
|
|
|
## Running the Example
|
|
|
|
To run this example:
|
|
|
|
1. Install all required dependencies
|
|
2. Set up your `.env` file with the necessary API keys
|
|
3. Ensure your microphone and audio setup are configured
|
|
4. Run the script with Python 3.11 or newer and with the following command:
|
|
```sh
|
|
python mem0-livekit-voice-agent.py start
|
|
```
|
|
5. After the script starts, you can interact with the voice agent using [Livekit's Agent Platform](https://agents-playground.livekit.io/) and Connect to the agent inorder to start conversations.
|
|
|
|
## Best Practices for Voice Agents with Memory
|
|
|
|
1. **Context Preservation**: Store enough context with each memory for effective retrieval
|
|
2. **Privacy Considerations**: Implement secure memory management
|
|
3. **Relevant Memory Filtering**: Use semantic search to retrieve only the most pertinent memories
|
|
4. **Error Handling**: Implement robust error handling for memory operations
|
|
|
|
## Debugging Function Tools
|
|
|
|
- To run the script in debug mode simply start the assistant with `dev` mode:
|
|
```sh
|
|
python mem0-livekit-voice-agent.py dev
|
|
```
|
|
|
|
- When working with memory-enabled voice agents, use Python's `logging` module for effective debugging:
|
|
|
|
```python
|
|
import logging
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger("memory_voice_agent")
|
|
``` |