#!/usr/bin/env python3 """ Test mem0 integration with self-hosted Supabase + Ollama """ import os import sys from dotenv import load_dotenv from mem0 import Memory from config import load_config, get_mem0_config def test_supabase_ollama_integration(): """Test mem0 with Supabase vector store + Ollama""" print("๐Ÿงช Testing mem0 with Supabase + Ollama integration...") # Load configuration config = load_config() if not config.database.supabase_url or not config.database.supabase_key: print("โŒ Supabase configuration not found") return False try: # Get mem0 configuration for Ollama mem0_config = get_mem0_config(config, "ollama") print(f"๐Ÿ“‹ Configuration: {mem0_config}") # Create memory instance m = Memory.from_config(mem0_config) # Test basic operations print("๐Ÿ’พ Testing memory addition...") messages = [ {"role": "user", "content": "I love programming in Python and building AI applications"}, {"role": "assistant", "content": "That's excellent! Python is perfect for AI development with libraries like mem0, Neo4j, and Supabase."} ] result = m.add(messages, user_id="test_user_supabase_ollama") print(f"โœ… Memory added: {result}") print("๐Ÿ” Testing memory search...") search_results = m.search(query="Python programming AI", user_id="test_user_supabase_ollama") print(f"โœ… Search results: {search_results}") print("๐Ÿ“œ Testing memory retrieval...") all_memories = m.get_all(user_id="test_user_supabase_ollama") print(f"โœ… Retrieved {len(all_memories)} memories") # Test with different content print("๐Ÿ’พ Testing additional memory...") messages2 = [ {"role": "user", "content": "I'm working on a memory system using Neo4j for graph storage"}, {"role": "assistant", "content": "Neo4j is excellent for graph-based memory systems. It allows for complex relationship mapping."} ] result2 = m.add(messages2, user_id="test_user_supabase_ollama") print(f"โœ… Additional memory added: {result2}") # Search for related memories print("๐Ÿ” Testing semantic search...") search_results2 = m.search(query="graph database memory", user_id="test_user_supabase_ollama") print(f"โœ… Semantic search results: {search_results2}") # Cleanup print("๐Ÿงน Cleaning up test data...") all_memories_final = m.get_all(user_id="test_user_supabase_ollama") for memory in all_memories_final: if 'id' in memory: m.delete(memory_id=memory['id']) print("โœ… Supabase + Ollama integration test successful!") return True except Exception as e: print(f"โŒ Supabase + Ollama integration test failed: {e}") import traceback traceback.print_exc() return False def main(): """Main test function""" print("=" * 60) print("MEM0 + SUPABASE + OLLAMA INTEGRATION TEST") print("=" * 60) # Load environment load_dotenv() # Test integration success = test_supabase_ollama_integration() # Summary print("\n" + "=" * 60) print("TEST SUMMARY") print("=" * 60) if success: print("โœ… PASS mem0 + Supabase + Ollama Integration") print("๐ŸŽ‰ All integration tests passed!") sys.exit(0) else: print("โŒ FAIL mem0 + Supabase + Ollama Integration") print("๐Ÿ’ฅ Integration test failed - check configuration") sys.exit(1) if __name__ == "__main__": main()