""" Comprehensive CRUD Test with Verification Tests full create → read → update → read → delete → read cycle """ import asyncio import sys sys.path.insert(0, r'C:\DEV\COPILOT\geutebruck-api\src\api\protos') import grpc import configuration_pb2 import configuration_pb2_grpc async def get_all_servers(): """Read and return all servers""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) try: request = configuration_pb2.ReadConfigurationTreeRequest() response = await stub.ReadConfigurationTree(request, timeout=10.0) servers = [] for child in response.root.children: if child.name == "GeViGCoreServer": for server in child.children: if server.type == "folder": # Get enabled value - might be bool_value, int_value, or value depending on protobuf definition enabled_node = next((c for c in server.children if c.name == "Enabled"), None) if enabled_node: if hasattr(enabled_node, 'bool_value'): enabled = enabled_node.bool_value elif hasattr(enabled_node, 'int_value'): enabled = bool(enabled_node.int_value) else: enabled = False else: enabled = False server_data = { 'id': server.name, 'alias': next((c.string_value for c in server.children if c.name == "Alias"), ""), 'host': next((c.string_value for c in server.children if c.name == "Host"), ""), 'enabled': enabled } servers.append(server_data) break await channel.close() return servers except grpc.RpcError as e: print(f"Error reading servers: {e.details()}") await channel.close() return [] async def get_all_mappings(): """Read and return all action mappings""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) try: request = configuration_pb2.ReadActionMappingsRequest() response = await stub.ReadActionMappings(request, timeout=10.0) mappings = [] for i, mapping in enumerate(response.mappings, 1): mapping_data = { 'id': i, 'name': mapping.name, 'input_actions': [ { 'action': action.action, 'parameters': {p.name: p.value for p in action.parameters} } for action in mapping.input_actions ], 'output_actions': [ { 'action': action.action, 'parameters': {p.name: p.value for p in action.parameters} } for action in mapping.output_actions ] } mappings.append(mapping_data) await channel.close() return mappings except grpc.RpcError as e: print(f"Error reading mappings: {e.details()}") await channel.close() return [] def get_next_server_id(servers): """Find highest numeric ID and increment""" numeric_ids = [] for server in servers: try: numeric_ids.append(int(server['id'])) except ValueError: pass return str(max(numeric_ids) + 1) if numeric_ids else "1" async def create_server(server_id, alias, host): """Create a server""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) request = configuration_pb2.CreateServerRequest( server=configuration_pb2.ServerData( id=server_id, alias=alias, host=host, user="testuser", password="testpass", enabled=True, deactivate_echo=False, deactivate_live_check=False ) ) try: response = await stub.CreateServer(request, timeout=10.0) await channel.close() return response.server except grpc.RpcError as e: print(f" [ERROR] Create server failed: {e.details()}") await channel.close() return None async def update_server(server_id, alias, host): """Update a server""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) request = configuration_pb2.UpdateServerRequest( server=configuration_pb2.ServerData( id=server_id, alias=alias, host=host, user="updateduser", password="updatedpass", enabled=False, deactivate_echo=True, deactivate_live_check=True ) ) try: response = await stub.UpdateServer(request, timeout=10.0) await channel.close() return response.server except grpc.RpcError as e: print(f" [ERROR] Update server failed: {e.details()}") await channel.close() return None async def delete_server(server_id): """Delete a server""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) try: request = configuration_pb2.DeleteServerRequest(server_id=server_id) response = await stub.DeleteServer(request, timeout=10.0) await channel.close() return response.success except grpc.RpcError as e: print(f" [ERROR] Delete server failed: {e.details()}") await channel.close() return False async def create_action_mapping(name, input_action, input_params, output_action, output_params): """Create an action mapping""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) # Build input action parameters input_action_params = [ configuration_pb2.ActionParameter(name=k, value=v) for k, v in input_params.items() ] # Build output action parameters output_action_params = [ configuration_pb2.ActionParameter(name=k, value=v) for k, v in output_params.items() ] mapping_request = configuration_pb2.CreateActionMappingRequest( mapping=configuration_pb2.ActionMappingInput( name=name, input_actions=[ configuration_pb2.ActionDefinition( action=input_action, parameters=input_action_params ) ], output_actions=[ configuration_pb2.ActionDefinition( action=output_action, parameters=output_action_params ) ], video_input=101027 ) ) try: response = await stub.CreateActionMapping(mapping_request, timeout=10.0) await channel.close() return response.mapping except grpc.RpcError as e: print(f" [ERROR] Create mapping failed: {e.details()}") await channel.close() return None async def update_action_mapping(mapping_id, name, output_action, output_params): """Update an action mapping""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) output_action_params = [ configuration_pb2.ActionParameter(name=k, value=v) for k, v in output_params.items() ] mapping_request = configuration_pb2.UpdateActionMappingRequest( mapping_id=mapping_id, mapping=configuration_pb2.ActionMappingInput( name=name, output_actions=[ configuration_pb2.ActionDefinition( action=output_action, parameters=output_action_params ) ] ) ) try: response = await stub.UpdateActionMapping(mapping_request, timeout=10.0) await channel.close() return response.mapping except grpc.RpcError as e: print(f" [ERROR] Update mapping failed: {e.details()}") await channel.close() return None async def delete_action_mapping(mapping_id): """Delete an action mapping""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) try: request = configuration_pb2.DeleteActionMappingRequest(mapping_id=mapping_id) response = await stub.DeleteActionMapping(request, timeout=10.0) await channel.close() return response.success except grpc.RpcError as e: print(f" [ERROR] Delete mapping failed: {e.details()}") await channel.close() return False def print_servers(servers, title): """Print server list""" print(f"\n{title}") print("-" * 80) if not servers: print(" (No servers)") for s in servers: print(f" ID: {s['id']:5s} | Alias: {s['alias']:30s} | Host: {s['host']:15s} | Enabled: {s['enabled']}") def print_mappings(mappings, title, filter_prefix=None): """Print mapping list""" print(f"\n{title}") print("-" * 80) filtered = [m for m in mappings if not filter_prefix or m['name'].startswith(filter_prefix)] if not filtered: print(f" (No mappings{' with prefix ' + filter_prefix if filter_prefix else ''})") for m in filtered: print(f" #{m['id']:3d}: {m['name']}") if m['input_actions']: print(f" Input: {m['input_actions'][0]['action']} {m['input_actions'][0]['parameters']}") if m['output_actions']: print(f" Output: {m['output_actions'][0]['action']} {m['output_actions'][0]['parameters']}") async def main(): print("=" * 80) print("COMPREHENSIVE CRUD TEST WITH VERIFICATION") print("=" * 80) # ========== SERVER CRUD TESTS ========== print("\n" + "=" * 80) print("PART 1: SERVER CRUD TEST") print("=" * 80) # 1. Initial state servers = await get_all_servers() print_servers(servers, "1A. INITIAL STATE - Servers before test") # 2. CREATE - Add test server print("\n1B. CREATE - Adding test server") next_id = get_next_server_id(servers) created_server = await create_server(next_id, "TEST Server CRUD", "192.168.99.99") if created_server: print(f" [OK] Created server ID={created_server.id}, Alias={created_server.alias}") # 3. READ - Verify creation servers = await get_all_servers() print_servers(servers, "1C. READ - Servers after creation (verify new server exists)") test_server = next((s for s in servers if s['id'] == next_id), None) if test_server: print(f" [PASS] VERIFIED: Server {next_id} exists with alias '{test_server['alias']}'") else: print(f" [FAIL] VERIFICATION FAILED: Server {next_id} not found!") # 4. UPDATE - Modify the server print("\n1D. UPDATE - Modifying test server") updated_server = await update_server(next_id, "TEST Server CRUD UPDATED", "192.168.99.100") if updated_server: print(f" [OK] Updated server ID={updated_server.id}, New Alias={updated_server.alias}") print(f" Enabled: {updated_server.enabled}, DeactivateEcho: {updated_server.deactivate_echo}") # 5. READ - Verify update servers = await get_all_servers() print_servers(servers, "1E. READ - Servers after update (verify changes)") test_server = next((s for s in servers if s['id'] == next_id), None) if test_server and "UPDATED" in test_server['alias']: print(f" [PASS] VERIFIED: Server {next_id} updated to '{test_server['alias']}'") print(f" [PASS] VERIFIED: Enabled={test_server['enabled']} (should be False)") else: print(f" [FAIL] VERIFICATION FAILED: Server {next_id} update not reflected!") # 6. DELETE - Remove the server print("\n1F. DELETE - Removing test server") deleted = await delete_server(next_id) if deleted: print(f" [OK] Deleted server ID={next_id}") # 7. READ - Verify deletion servers = await get_all_servers() print_servers(servers, "1G. READ - Servers after deletion (verify removal)") test_server = next((s for s in servers if s['id'] == next_id), None) if not test_server: print(f" [PASS] VERIFIED: Server {next_id} successfully deleted") else: print(f" [FAIL] VERIFICATION FAILED: Server {next_id} still exists!") # ========== ACTION MAPPING CRUD TESTS ========== print("\n" + "=" * 80) print("PART 2: ACTION MAPPING CRUD TEST") print("=" * 80) # 1. Initial state mappings = await get_all_mappings() print_mappings(mappings, "2A. INITIAL STATE - All action mappings", filter_prefix="TEST") # 2. CREATE - Add test mapping similar to "GeVi PanLeft_101027" print("\n2B. CREATE - Adding test mapping (Camera Pan Left)") created_mapping = await create_action_mapping( name="TEST Mapping - Pan Left Camera", input_action="DigitalContactChanged", input_params={"Contact": "5", "State": "closed"}, output_action="CameraPanLeft", output_params={"Channel": "101027", "Speed": "50"} ) if created_mapping: print(f" [OK] Created mapping: {created_mapping.name}") # 3. READ - Verify creation mappings = await get_all_mappings() print_mappings(mappings, "2C. READ - Mappings after creation (verify new mapping)", filter_prefix="TEST") test_mapping = next((m for m in mappings if "TEST Mapping - Pan Left" in m['name']), None) if test_mapping: print(f" [PASS] VERIFIED: Mapping #{test_mapping['id']} exists: '{test_mapping['name']}'") if test_mapping['input_actions']: print(f" Input: {test_mapping['input_actions'][0]['action']}") if test_mapping['output_actions']: print(f" Output: {test_mapping['output_actions'][0]['action']}") test_mapping_id = test_mapping['id'] else: print(f" [FAIL] VERIFICATION FAILED: Test mapping not found!") test_mapping_id = None # 4. CREATE - Add another mapping (Pan Right) print("\n2D. CREATE - Adding second test mapping (Camera Pan Right)") created_mapping2 = await create_action_mapping( name="TEST Mapping - Pan Right Camera", input_action="DigitalContactChanged", input_params={"Contact": "6", "State": "closed"}, output_action="CameraPanRight", output_params={"Channel": "101027", "Speed": "30"} ) if created_mapping2: print(f" [OK] Created mapping: {created_mapping2.name}") # 5. READ - Verify both mappings mappings = await get_all_mappings() print_mappings(mappings, "2E. READ - All test mappings (verify both exist)", filter_prefix="TEST") # 6. UPDATE - Modify first mapping if test_mapping_id: print("\n2F. UPDATE - Modifying first test mapping (change to Tilt Up)") updated_mapping = await update_action_mapping( mapping_id=test_mapping_id, name="TEST Mapping - Pan Left UPDATED to Tilt Up", output_action="CameraTiltUp", output_params={"Channel": "101027", "Speed": "75"} ) if updated_mapping: print(f" [OK] Updated mapping #{test_mapping_id}: {updated_mapping.name}") # 7. READ - Verify update mappings = await get_all_mappings() print_mappings(mappings, "2G. READ - Mappings after update (verify changes)", filter_prefix="TEST") test_mapping = next((m for m in mappings if m['id'] == test_mapping_id), None) if test_mapping and "UPDATED" in test_mapping['name'] and test_mapping['output_actions'][0]['action'] == "CameraTiltUp": print(f" [PASS] VERIFIED: Mapping #{test_mapping_id} updated successfully") print(f" New name: {test_mapping['name']}") print(f" New output: {test_mapping['output_actions'][0]['action']}") else: print(f" [FAIL] VERIFICATION FAILED: Mapping #{test_mapping_id} update not reflected!") # 8. DELETE - Remove test mappings (CRITICAL: Delete in REVERSE order!) print("\n2H. DELETE - Removing all test mappings") mappings = await get_all_mappings() test_mappings = [m for m in mappings if "TEST" in m['name']] # Sort by ID descending to delete from highest to lowest # This prevents cascade deletion bug where IDs shift after each delete test_mappings_sorted = sorted(test_mappings, key=lambda x: x['id'], reverse=True) print(f" Deleting {len(test_mappings_sorted)} mappings in reverse order (highest ID first)") for m in test_mappings_sorted: deleted = await delete_action_mapping(m['id']) if deleted: print(f" [OK] Deleted mapping #{m['id']}: {m['name']}") # 9. READ - Verify deletion mappings = await get_all_mappings() print_mappings(mappings, "2I. READ - Mappings after deletion (verify removal)", filter_prefix="TEST") remaining_test = [m for m in mappings if "TEST" in m['name']] if not remaining_test: print(f" [PASS] VERIFIED: All test mappings successfully deleted") else: print(f" [FAIL] VERIFICATION FAILED: {len(remaining_test)} test mappings still exist!") # ========== FINAL SUMMARY ========== print("\n" + "=" * 80) print("TEST SUMMARY") print("=" * 80) servers = await get_all_servers() mappings = await get_all_mappings() print(f"Final server count: {len(servers)}") print(f"Final mapping count: {len(mappings)}") print("\n" + "=" * 80) print("COMPREHENSIVE CRUD TEST COMPLETE") print("=" * 80) if __name__ == "__main__": asyncio.run(main())