#!/usr/bin/env python3 """ Send Matrix messages as Home Assistant user for testing messaging functionality """ import asyncio import httpx import json MATRIX_HOMESERVER = "https://matrix.klas.chat" CLAUDE_ACCESS_TOKEN = "syt_Y2xhdWRl_CoBgPoHbtMOxhvOUcMnz_2WRPZJ" HOME_ASSISTANT_ROOM_ID = "!xZkScMybPseErYMJDz:matrix.klas.chat" SIGNAL_BRIDGE_ROOM_ID = "!oBnnfKDprgMEHNhNjL:matrix.klas.chat" async def send_matrix_message(room_id, message, sender_name="Claude"): """Send a message to a Matrix room""" try: async with httpx.AsyncClient() as client: headers = { "Authorization": f"Bearer {CLAUDE_ACCESS_TOKEN}", "Content-Type": "application/json" } # Format message with sender identification formatted_message = f"[{sender_name}] {message}" response = await client.post( f"{MATRIX_HOMESERVER}/_matrix/client/v3/rooms/{room_id}/send/m.room.message", headers=headers, json={ "msgtype": "m.text", "body": formatted_message } ) if response.status_code == 200: print(f"āœ… Message sent to room {room_id}") print(f"šŸ“Ø Message: {formatted_message}") return True else: print(f"āŒ Failed to send message: {response.status_code}") print(f"Response: {response.text}") return False except Exception as e: print(f"āŒ Error sending message: {e}") return False async def get_room_messages(room_id, limit=10): """Get recent messages from a Matrix room""" try: async with httpx.AsyncClient() as client: headers = {"Authorization": f"Bearer {CLAUDE_ACCESS_TOKEN}"} response = await client.get( f"{MATRIX_HOMESERVER}/_matrix/client/v3/rooms/{room_id}/messages", headers=headers, params={"limit": limit, "dir": "b"} ) if response.status_code == 200: data = response.json() return data.get("chunk", []) else: print(f"āŒ Failed to get messages: {response.status_code}") return [] except Exception as e: print(f"āŒ Error getting messages: {e}") return [] async def test_home_assistant_messaging(): """Test messaging capability through Home Assistant room""" print("šŸ  Testing Home Assistant messaging...") # Send a test message to Home Assistant room test_message = "Hello from Claude! Testing messaging system connectivity." success = await send_matrix_message(HOME_ASSISTANT_ROOM_ID, test_message, "Claude-Test") if success: print("āœ… Successfully sent message to Home Assistant room") # Wait a moment and check for any responses await asyncio.sleep(2) messages = await get_room_messages(HOME_ASSISTANT_ROOM_ID, 5) print(f"šŸ“¬ Recent messages in Home Assistant room:") for msg in messages[:3]: if msg.get("type") == "m.room.message": sender = msg.get("sender", "") body = msg.get("content", {}).get("body", "") print(f" [{sender}]: {body}") return success async def test_bridge_room_messaging(): """Test messaging capability through Signal bridge room""" print("\nšŸŒ‰ Testing Signal bridge room messaging...") # Send a test message to Signal bridge room test_message = "Test message from Claude - checking Matrix messaging without Signal authentication" success = await send_matrix_message(SIGNAL_BRIDGE_ROOM_ID, test_message, "Claude-Matrix") if success: print("āœ… Successfully sent message to Signal bridge room") # Wait a moment and check for any responses await asyncio.sleep(2) messages = await get_room_messages(SIGNAL_BRIDGE_ROOM_ID, 5) print(f"šŸ“¬ Recent messages in Signal bridge room:") for msg in messages[:3]: if msg.get("type") == "m.room.message": sender = msg.get("sender", "") body = msg.get("content", {}).get("body", "") print(f" [{sender}]: {body}") return success async def main(): """Main function""" print("šŸ“± Matrix Messaging Test") print("=" * 40) # Test Home Assistant messaging ha_success = await test_home_assistant_messaging() # Test Signal bridge room messaging bridge_success = await test_bridge_room_messaging() print("\n" + "=" * 40) print("šŸ“Š Test Results:") print(f"šŸ  Home Assistant room: {'āœ… Working' if ha_success else 'āŒ Failed'}") print(f"šŸŒ‰ Signal bridge room: {'āœ… Working' if bridge_success else 'āŒ Failed'}") if ha_success or bridge_success: print("\nāœ… Matrix messaging is functional!") print("šŸ’” You can receive messages through the working rooms") else: print("\nāŒ Matrix messaging issues detected") if __name__ == "__main__": asyncio.run(main())