#!/usr/bin/env python3 """ Live test of T6 Mem0 MCP Server Tests the actual MCP protocol communication """ import asyncio import json import sys from mcp_server.main import T6Mem0Server async def test_mcp_server(): """Test MCP server with actual tool calls""" print("=" * 60) print("T6 Mem0 v2 MCP Server Live Test") print("=" * 60) try: # Initialize server print("\n[1/6] Initializing MCP server...") server = T6Mem0Server() await server.initialize() print("✓ Server initialized") # List tools print("\n[2/6] Listing available tools...") tools = server.tools.get_tool_definitions() print(f"✓ Found {len(tools)} tools:") for tool in tools: print(f" • {tool.name}") # Test 1: Add memory print("\n[3/6] Testing add_memory...") add_result = await server.tools.handle_add_memory({ "messages": [ {"role": "user", "content": "I am a software engineer who loves Python and TypeScript"}, {"role": "assistant", "content": "Got it! I'll remember that."} ], "user_id": "mcp_test_user", "metadata": {"test": "mcp_live_test"} }) print(add_result[0].text) # Test 2: Search memories print("\n[4/6] Testing search_memories...") search_result = await server.tools.handle_search_memories({ "query": "What programming languages does the user know?", "user_id": "mcp_test_user", "limit": 5 }) print(search_result[0].text) # Test 3: Get all memories print("\n[5/6] Testing get_all_memories...") get_all_result = await server.tools.handle_get_all_memories({ "user_id": "mcp_test_user" }) print(get_all_result[0].text) # Clean up - delete test memories print("\n[6/6] Cleaning up test data...") delete_result = await server.tools.handle_delete_all_memories({ "user_id": "mcp_test_user" }) print(delete_result[0].text) print("\n" + "=" * 60) print("✅ All MCP server tests passed!") print("=" * 60) return 0 except Exception as e: print(f"\n❌ Test failed: {e}", file=sys.stderr) import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = asyncio.run(test_mcp_server()) sys.exit(exit_code)