#!/usr/bin/env python3 """ Store basic personal information in LangMem """ import asyncio import httpx from datetime import datetime # Configuration API_BASE_URL = "http://localhost:8765" API_KEY = "langmem_api_key_2025" # Personal information to store personal_memories = [ { "content": "Ondrej has a son named Cyril", "user_id": "ondrej", "session_id": "personal_info_session", "metadata": { "category": "family", "subcategory": "relationships", "importance": "high", "privacy_level": "personal", "tags": ["family", "son", "relationship", "personal"] }, "relationships": [ { "entity_name": "Cyril", "entity_type": "Person", "relationship": "HAS_SON", "confidence": 1.0, "properties": { "relationship_type": "father_son", "family_role": "son", "person_type": "child" } }, { "entity_name": "Ondrej", "entity_type": "Person", "relationship": "IS_FATHER_OF", "confidence": 1.0, "properties": { "relationship_type": "father_son", "family_role": "father", "person_type": "parent" } } ] }, { "content": "Cyril is Ondrej's son", "user_id": "ondrej", "session_id": "personal_info_session", "metadata": { "category": "family", "subcategory": "relationships", "importance": "high", "privacy_level": "personal", "tags": ["family", "father", "relationship", "personal"] }, "relationships": [ { "entity_name": "Ondrej", "entity_type": "Person", "relationship": "HAS_FATHER", "confidence": 1.0, "properties": { "relationship_type": "father_son", "family_role": "father", "person_type": "parent" } } ] }, { "content": "Ondrej is a person with family members", "user_id": "ondrej", "session_id": "personal_info_session", "metadata": { "category": "personal", "subcategory": "identity", "importance": "medium", "privacy_level": "personal", "tags": ["identity", "person", "family_member"] }, "relationships": [ { "entity_name": "Family", "entity_type": "Group", "relationship": "BELONGS_TO", "confidence": 1.0, "properties": { "group_type": "family_unit", "role": "parent" } } ] }, { "content": "Cyril is a young person who is part of a family", "user_id": "ondrej", "session_id": "personal_info_session", "metadata": { "category": "personal", "subcategory": "identity", "importance": "medium", "privacy_level": "personal", "tags": ["identity", "person", "family_member", "young"] }, "relationships": [ { "entity_name": "Family", "entity_type": "Group", "relationship": "BELONGS_TO", "confidence": 1.0, "properties": { "group_type": "family_unit", "role": "child" } } ] } ] async def store_personal_memories(): """Store personal memories in LangMem""" print("šŸ‘Øā€šŸ‘¦ Storing Personal Information in LangMem") print("=" * 50) headers = {"Authorization": f"Bearer {API_KEY}"} async with httpx.AsyncClient() as client: stored_memories = [] for i, memory in enumerate(personal_memories, 1): try: print(f"\n{i}. Storing: {memory['content']}") response = await client.post( f"{API_BASE_URL}/v1/memories/store", json=memory, headers=headers, timeout=30.0 ) if response.status_code == 200: data = response.json() stored_memories.append(data) print(f" āœ… Stored with ID: {data['id']}") print(f" šŸ“Š Relationships: {len(memory.get('relationships', []))}") else: print(f" āŒ Failed: {response.status_code}") print(f" Response: {response.text}") except Exception as e: print(f" āŒ Error: {e}") # Wait for background tasks to complete await asyncio.sleep(3) print(f"\nāœ… Successfully stored {len(stored_memories)} personal memories!") # Test search functionality print("\nšŸ” Testing search for family information...") search_tests = [ "Who is Ondrej's son?", "Tell me about Cyril", "Family relationships", "Who is Cyril's father?" ] for query in search_tests: try: response = await client.post( f"{API_BASE_URL}/v1/memories/search", json={ "query": query, "user_id": "ondrej", "limit": 3, "threshold": 0.4, "include_graph": True }, headers=headers, timeout=30.0 ) if response.status_code == 200: data = response.json() print(f"\n Query: '{query}'") print(f" Results: {data['total_count']}") for memory in data['memories']: print(f" - {memory['content']} (similarity: {memory['similarity']:.3f})") if 'relationships' in memory and memory['relationships']: print(f" Relationships: {len(memory['relationships'])}") for rel in memory['relationships']: print(f" → {rel['relationship']} {rel['entity_name']} ({rel['confidence']})") else: print(f" Query: '{query}' -> Failed ({response.status_code})") except Exception as e: print(f" Query: '{query}' -> Error: {e}") # Test conversation retrieval print("\nšŸ’¬ Testing conversation retrieval...") try: response = await client.post( f"{API_BASE_URL}/v1/memories/retrieve", json={ "messages": [ {"role": "user", "content": "Tell me about my family"}, {"role": "assistant", "content": "I'd be happy to help with family information. What would you like to know?"}, {"role": "user", "content": "Who are my children?"} ], "user_id": "ondrej", "session_id": "personal_info_session" }, headers=headers, timeout=30.0 ) if response.status_code == 200: data = response.json() print(f" āœ… Retrieved {data['total_count']} relevant memories for conversation") for memory in data['memories']: print(f" - {memory['content']} (similarity: {memory['similarity']:.3f})") else: print(f" āŒ Conversation retrieval failed: {response.status_code}") except Exception as e: print(f" āŒ Conversation retrieval error: {e}") print("\n" + "=" * 50) print("šŸŽ‰ Personal Information Storage Complete!") print("šŸ“Š Summary:") print(f" - User: ondrej") print(f" - Memories stored: {len(stored_memories)}") print(f" - Family relationships: Father-Son (Ondrej-Cyril)") print(f" - Graph relationships: Stored in Neo4j") print(f" - Vector embeddings: Stored in PostgreSQL") print("\n🌐 You can now view the data in:") print(" - Supabase: http://localhost:8000 (langmem_documents table)") print(" - Neo4j: http://localhost:7474 (Memory and Entity nodes)") if __name__ == "__main__": asyncio.run(store_personal_memories())