""" Verify configuration changes via gRPC ReadConfigurationTree """ import asyncio import sys sys.path.insert(0, r'C:\DEV\COPILOT\geutebruck-api\src\api\protos') import grpc import configuration_pb2 import configuration_pb2_grpc async def verify_config(): """Verify configuration by reading tree""" channel = grpc.aio.insecure_channel('localhost:50051') stub = configuration_pb2_grpc.ConfigurationServiceStub(channel) print("=" * 80) print("CONFIGURATION VERIFICATION VIA GRPC") print("=" * 80) try: # Read configuration tree request = configuration_pb2.ReadConfigurationTreeRequest() response = await stub.ReadConfigurationTree(request, timeout=10.0) # Parse servers servers = [] for child in response.root.children: if child.name == "GeViGCoreServer": for server in child.children: if server.type == "folder": server_dict = {c.name: c for c in server.children} # Determine enabled value and type enabled_node = server_dict.get("Enabled") if enabled_node: if hasattr(enabled_node, 'bool_value'): enabled_val = enabled_node.bool_value enabled_type = "bool" elif hasattr(enabled_node, 'int_value'): enabled_val = bool(enabled_node.int_value) enabled_type = f"int32 (value={enabled_node.int_value})" else: enabled_val = False enabled_type = f"unknown (type={enabled_node.type})" else: enabled_val = False enabled_type = "missing" servers.append({ 'id': server.name, 'alias': server_dict.get("Alias", type('', (), {'string_value': ''})()).string_value, 'host': server_dict.get("Host", type('', (), {'string_value': ''})()).string_value, 'enabled': enabled_val, 'enabled_type': enabled_type }) break # Parse action mappings mappings_request = configuration_pb2.ReadActionMappingsRequest() mappings_response = await stub.ReadActionMappings(mappings_request, timeout=10.0) mappings = [] for i, mapping in enumerate(mappings_response.mappings, 1): mappings.append({ 'id': i, 'name': mapping.name, 'input_count': len(mapping.input_actions), 'output_count': len(mapping.output_actions) }) # Display results print("\n1. SERVERS:") print("-" * 80) for s in servers: print(f" ID: {s['id']:5s} | Alias: {s['alias']:30s} | " f"Host: {s['host']:15s} | Enabled: {s['enabled']} ({s['enabled_type']})") claude_servers = [s for s in servers if "Claude" in s['alias']] print(f"\n Total servers: {len(servers)}") print(f" Claude servers: {len(claude_servers)}") print("\n2. ACTION MAPPINGS:") print("-" * 80) claude_mappings = [m for m in mappings if "Claude" in m['name']] test_mappings = [m for m in mappings if "TEST" in m['name']] print(f" Total mappings: {len(mappings)}") print(f"\n Claude mappings ({len(claude_mappings)}):") for m in claude_mappings: print(f" #{m['id']:3d}: {m['name']} (In:{m['input_count']}, Out:{m['output_count']})") print(f"\n TEST mappings ({len(test_mappings)}):") if test_mappings: for m in test_mappings: print(f" #{m['id']:3d}: {m['name']} (In:{m['input_count']}, Out:{m['output_count']})") else: print(f" (None - all cleaned up)") print("\n3. BOOL TYPE VERIFICATION:") print("-" * 80) bool_servers = [s for s in servers if s['enabled_type'] == "bool"] if len(bool_servers) == len(servers): print(f" [PASS] All {len(servers)} servers use correct 'bool' type") else: print(f" [WARN] {len(servers) - len(bool_servers)} servers not using 'bool' type:") for s in servers: if s['enabled_type'] != "bool": print(f" Server {s['id']}: {s['enabled_type']}") print("\n" + "=" * 80) print("VERIFICATION SUMMARY") print("=" * 80) print(f" Servers: {len(servers)} total, {len(claude_servers)} Claude servers") print(f" Mappings: {len(mappings)} total, {len(claude_mappings)} Claude mappings") print(f" Bool types: {len(bool_servers)}/{len(servers)} correct") print(f" TEST cleanup: {'PASS' if len(test_mappings) == 0 else f'INCOMPLETE ({len(test_mappings)} remain)'}") await channel.close() return True except Exception as e: print(f"\n[ERROR] Verification failed: {e}") import traceback traceback.print_exc() await channel.close() return False if __name__ == "__main__": result = asyncio.run(verify_config()) sys.exit(0 if result else 1)