Files
geutebruck/comprehensive_crud_test.py
Administrator 14893e62a5 feat: Geutebruck GeViScope/GeViSoft Action Mapping System - MVP
This MVP release provides a complete full-stack solution for managing action mappings
in Geutebruck's GeViScope and GeViSoft video surveillance systems.

## Features

### Flutter Web Application (Port 8081)
- Modern, responsive UI for managing action mappings
- Action picker dialog with full parameter configuration
- Support for both GSC (GeViScope) and G-Core server actions
- Consistent UI for input and output actions with edit/delete capabilities
- Real-time action mapping creation, editing, and deletion
- Server categorization (GSC: prefix for GeViScope, G-Core: prefix for G-Core servers)

### FastAPI REST Backend (Port 8000)
- RESTful API for action mapping CRUD operations
- Action template service with comprehensive action catalog (247 actions)
- Server management (G-Core and GeViScope servers)
- Configuration tree reading and writing
- JWT authentication with role-based access control
- PostgreSQL database integration

### C# SDK Bridge (gRPC, Port 50051)
- Native integration with GeViSoft SDK (GeViProcAPINET_4_0.dll)
- Action mapping creation with correct binary format
- Support for GSC and G-Core action types
- Proper Camera parameter inclusion in action strings (fixes CrossSwitch bug)
- Action ID lookup table with server-specific action IDs
- Configuration reading/writing via SetupClient

## Bug Fixes
- **CrossSwitch Bug**: GSC and G-Core actions now correctly display camera/PTZ head parameters in GeViSet
- Action strings now include Camera parameter: `@ PanLeft (Comment: "", Camera: 101028)`
- Proper filter flags and VideoInput=0 for action mappings
- Correct action ID assignment (4198 for GSC, 9294 for G-Core PanLeft)

## Technical Stack
- **Frontend**: Flutter Web, Dart, Dio HTTP client
- **Backend**: Python FastAPI, PostgreSQL, Redis
- **SDK Bridge**: C# .NET 8.0, gRPC, GeViSoft SDK
- **Authentication**: JWT tokens
- **Configuration**: GeViSoft .set files (binary format)

## Credentials
- GeViSoft/GeViScope: username=sysadmin, password=masterkey
- Default admin: username=admin, password=admin123

## Deployment
All services run on localhost:
- Flutter Web: http://localhost:8081
- FastAPI: http://localhost:8000
- SDK Bridge gRPC: localhost:50051
- GeViServer: localhost (default port)

Generated with Claude Code (https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-31 18:10:54 +01:00

475 lines
18 KiB
Python

"""
Comprehensive CRUD Test with Verification
Tests full create → read → update → read → delete → read cycle
"""
import asyncio
import sys
sys.path.insert(0, r'C:\DEV\COPILOT\geutebruck-api\src\api\protos')
import grpc
import configuration_pb2
import configuration_pb2_grpc
async def get_all_servers():
"""Read and return all servers"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
try:
request = configuration_pb2.ReadConfigurationTreeRequest()
response = await stub.ReadConfigurationTree(request, timeout=10.0)
servers = []
for child in response.root.children:
if child.name == "GeViGCoreServer":
for server in child.children:
if server.type == "folder":
# Get enabled value - might be bool_value, int_value, or value depending on protobuf definition
enabled_node = next((c for c in server.children if c.name == "Enabled"), None)
if enabled_node:
if hasattr(enabled_node, 'bool_value'):
enabled = enabled_node.bool_value
elif hasattr(enabled_node, 'int_value'):
enabled = bool(enabled_node.int_value)
else:
enabled = False
else:
enabled = False
server_data = {
'id': server.name,
'alias': next((c.string_value for c in server.children if c.name == "Alias"), ""),
'host': next((c.string_value for c in server.children if c.name == "Host"), ""),
'enabled': enabled
}
servers.append(server_data)
break
await channel.close()
return servers
except grpc.RpcError as e:
print(f"Error reading servers: {e.details()}")
await channel.close()
return []
async def get_all_mappings():
"""Read and return all action mappings"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
try:
request = configuration_pb2.ReadActionMappingsRequest()
response = await stub.ReadActionMappings(request, timeout=10.0)
mappings = []
for i, mapping in enumerate(response.mappings, 1):
mapping_data = {
'id': i,
'name': mapping.name,
'input_actions': [
{
'action': action.action,
'parameters': {p.name: p.value for p in action.parameters}
}
for action in mapping.input_actions
],
'output_actions': [
{
'action': action.action,
'parameters': {p.name: p.value for p in action.parameters}
}
for action in mapping.output_actions
]
}
mappings.append(mapping_data)
await channel.close()
return mappings
except grpc.RpcError as e:
print(f"Error reading mappings: {e.details()}")
await channel.close()
return []
def get_next_server_id(servers):
"""Find highest numeric ID and increment"""
numeric_ids = []
for server in servers:
try:
numeric_ids.append(int(server['id']))
except ValueError:
pass
return str(max(numeric_ids) + 1) if numeric_ids else "1"
async def create_server(server_id, alias, host):
"""Create a server"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
request = configuration_pb2.CreateServerRequest(
server=configuration_pb2.ServerData(
id=server_id,
alias=alias,
host=host,
user="testuser",
password="testpass",
enabled=True,
deactivate_echo=False,
deactivate_live_check=False
)
)
try:
response = await stub.CreateServer(request, timeout=10.0)
await channel.close()
return response.server
except grpc.RpcError as e:
print(f" [ERROR] Create server failed: {e.details()}")
await channel.close()
return None
async def update_server(server_id, alias, host):
"""Update a server"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
request = configuration_pb2.UpdateServerRequest(
server=configuration_pb2.ServerData(
id=server_id,
alias=alias,
host=host,
user="updateduser",
password="updatedpass",
enabled=False,
deactivate_echo=True,
deactivate_live_check=True
)
)
try:
response = await stub.UpdateServer(request, timeout=10.0)
await channel.close()
return response.server
except grpc.RpcError as e:
print(f" [ERROR] Update server failed: {e.details()}")
await channel.close()
return None
async def delete_server(server_id):
"""Delete a server"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
try:
request = configuration_pb2.DeleteServerRequest(server_id=server_id)
response = await stub.DeleteServer(request, timeout=10.0)
await channel.close()
return response.success
except grpc.RpcError as e:
print(f" [ERROR] Delete server failed: {e.details()}")
await channel.close()
return False
async def create_action_mapping(name, input_action, input_params, output_action, output_params):
"""Create an action mapping"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
# Build input action parameters
input_action_params = [
configuration_pb2.ActionParameter(name=k, value=v)
for k, v in input_params.items()
]
# Build output action parameters
output_action_params = [
configuration_pb2.ActionParameter(name=k, value=v)
for k, v in output_params.items()
]
mapping_request = configuration_pb2.CreateActionMappingRequest(
mapping=configuration_pb2.ActionMappingInput(
name=name,
input_actions=[
configuration_pb2.ActionDefinition(
action=input_action,
parameters=input_action_params
)
],
output_actions=[
configuration_pb2.ActionDefinition(
action=output_action,
parameters=output_action_params
)
],
video_input=101027
)
)
try:
response = await stub.CreateActionMapping(mapping_request, timeout=10.0)
await channel.close()
return response.mapping
except grpc.RpcError as e:
print(f" [ERROR] Create mapping failed: {e.details()}")
await channel.close()
return None
async def update_action_mapping(mapping_id, name, output_action, output_params):
"""Update an action mapping"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
output_action_params = [
configuration_pb2.ActionParameter(name=k, value=v)
for k, v in output_params.items()
]
mapping_request = configuration_pb2.UpdateActionMappingRequest(
mapping_id=mapping_id,
mapping=configuration_pb2.ActionMappingInput(
name=name,
output_actions=[
configuration_pb2.ActionDefinition(
action=output_action,
parameters=output_action_params
)
]
)
)
try:
response = await stub.UpdateActionMapping(mapping_request, timeout=10.0)
await channel.close()
return response.mapping
except grpc.RpcError as e:
print(f" [ERROR] Update mapping failed: {e.details()}")
await channel.close()
return None
async def delete_action_mapping(mapping_id):
"""Delete an action mapping"""
channel = grpc.aio.insecure_channel('localhost:50051')
stub = configuration_pb2_grpc.ConfigurationServiceStub(channel)
try:
request = configuration_pb2.DeleteActionMappingRequest(mapping_id=mapping_id)
response = await stub.DeleteActionMapping(request, timeout=10.0)
await channel.close()
return response.success
except grpc.RpcError as e:
print(f" [ERROR] Delete mapping failed: {e.details()}")
await channel.close()
return False
def print_servers(servers, title):
"""Print server list"""
print(f"\n{title}")
print("-" * 80)
if not servers:
print(" (No servers)")
for s in servers:
print(f" ID: {s['id']:5s} | Alias: {s['alias']:30s} | Host: {s['host']:15s} | Enabled: {s['enabled']}")
def print_mappings(mappings, title, filter_prefix=None):
"""Print mapping list"""
print(f"\n{title}")
print("-" * 80)
filtered = [m for m in mappings if not filter_prefix or m['name'].startswith(filter_prefix)]
if not filtered:
print(f" (No mappings{' with prefix ' + filter_prefix if filter_prefix else ''})")
for m in filtered:
print(f" #{m['id']:3d}: {m['name']}")
if m['input_actions']:
print(f" Input: {m['input_actions'][0]['action']} {m['input_actions'][0]['parameters']}")
if m['output_actions']:
print(f" Output: {m['output_actions'][0]['action']} {m['output_actions'][0]['parameters']}")
async def main():
print("=" * 80)
print("COMPREHENSIVE CRUD TEST WITH VERIFICATION")
print("=" * 80)
# ========== SERVER CRUD TESTS ==========
print("\n" + "=" * 80)
print("PART 1: SERVER CRUD TEST")
print("=" * 80)
# 1. Initial state
servers = await get_all_servers()
print_servers(servers, "1A. INITIAL STATE - Servers before test")
# 2. CREATE - Add test server
print("\n1B. CREATE - Adding test server")
next_id = get_next_server_id(servers)
created_server = await create_server(next_id, "TEST Server CRUD", "192.168.99.99")
if created_server:
print(f" [OK] Created server ID={created_server.id}, Alias={created_server.alias}")
# 3. READ - Verify creation
servers = await get_all_servers()
print_servers(servers, "1C. READ - Servers after creation (verify new server exists)")
test_server = next((s for s in servers if s['id'] == next_id), None)
if test_server:
print(f" [PASS] VERIFIED: Server {next_id} exists with alias '{test_server['alias']}'")
else:
print(f" [FAIL] VERIFICATION FAILED: Server {next_id} not found!")
# 4. UPDATE - Modify the server
print("\n1D. UPDATE - Modifying test server")
updated_server = await update_server(next_id, "TEST Server CRUD UPDATED", "192.168.99.100")
if updated_server:
print(f" [OK] Updated server ID={updated_server.id}, New Alias={updated_server.alias}")
print(f" Enabled: {updated_server.enabled}, DeactivateEcho: {updated_server.deactivate_echo}")
# 5. READ - Verify update
servers = await get_all_servers()
print_servers(servers, "1E. READ - Servers after update (verify changes)")
test_server = next((s for s in servers if s['id'] == next_id), None)
if test_server and "UPDATED" in test_server['alias']:
print(f" [PASS] VERIFIED: Server {next_id} updated to '{test_server['alias']}'")
print(f" [PASS] VERIFIED: Enabled={test_server['enabled']} (should be False)")
else:
print(f" [FAIL] VERIFICATION FAILED: Server {next_id} update not reflected!")
# 6. DELETE - Remove the server
print("\n1F. DELETE - Removing test server")
deleted = await delete_server(next_id)
if deleted:
print(f" [OK] Deleted server ID={next_id}")
# 7. READ - Verify deletion
servers = await get_all_servers()
print_servers(servers, "1G. READ - Servers after deletion (verify removal)")
test_server = next((s for s in servers if s['id'] == next_id), None)
if not test_server:
print(f" [PASS] VERIFIED: Server {next_id} successfully deleted")
else:
print(f" [FAIL] VERIFICATION FAILED: Server {next_id} still exists!")
# ========== ACTION MAPPING CRUD TESTS ==========
print("\n" + "=" * 80)
print("PART 2: ACTION MAPPING CRUD TEST")
print("=" * 80)
# 1. Initial state
mappings = await get_all_mappings()
print_mappings(mappings, "2A. INITIAL STATE - All action mappings", filter_prefix="TEST")
# 2. CREATE - Add test mapping similar to "GeVi PanLeft_101027"
print("\n2B. CREATE - Adding test mapping (Camera Pan Left)")
created_mapping = await create_action_mapping(
name="TEST Mapping - Pan Left Camera",
input_action="DigitalContactChanged",
input_params={"Contact": "5", "State": "closed"},
output_action="CameraPanLeft",
output_params={"Channel": "101027", "Speed": "50"}
)
if created_mapping:
print(f" [OK] Created mapping: {created_mapping.name}")
# 3. READ - Verify creation
mappings = await get_all_mappings()
print_mappings(mappings, "2C. READ - Mappings after creation (verify new mapping)", filter_prefix="TEST")
test_mapping = next((m for m in mappings if "TEST Mapping - Pan Left" in m['name']), None)
if test_mapping:
print(f" [PASS] VERIFIED: Mapping #{test_mapping['id']} exists: '{test_mapping['name']}'")
if test_mapping['input_actions']:
print(f" Input: {test_mapping['input_actions'][0]['action']}")
if test_mapping['output_actions']:
print(f" Output: {test_mapping['output_actions'][0]['action']}")
test_mapping_id = test_mapping['id']
else:
print(f" [FAIL] VERIFICATION FAILED: Test mapping not found!")
test_mapping_id = None
# 4. CREATE - Add another mapping (Pan Right)
print("\n2D. CREATE - Adding second test mapping (Camera Pan Right)")
created_mapping2 = await create_action_mapping(
name="TEST Mapping - Pan Right Camera",
input_action="DigitalContactChanged",
input_params={"Contact": "6", "State": "closed"},
output_action="CameraPanRight",
output_params={"Channel": "101027", "Speed": "30"}
)
if created_mapping2:
print(f" [OK] Created mapping: {created_mapping2.name}")
# 5. READ - Verify both mappings
mappings = await get_all_mappings()
print_mappings(mappings, "2E. READ - All test mappings (verify both exist)", filter_prefix="TEST")
# 6. UPDATE - Modify first mapping
if test_mapping_id:
print("\n2F. UPDATE - Modifying first test mapping (change to Tilt Up)")
updated_mapping = await update_action_mapping(
mapping_id=test_mapping_id,
name="TEST Mapping - Pan Left UPDATED to Tilt Up",
output_action="CameraTiltUp",
output_params={"Channel": "101027", "Speed": "75"}
)
if updated_mapping:
print(f" [OK] Updated mapping #{test_mapping_id}: {updated_mapping.name}")
# 7. READ - Verify update
mappings = await get_all_mappings()
print_mappings(mappings, "2G. READ - Mappings after update (verify changes)", filter_prefix="TEST")
test_mapping = next((m for m in mappings if m['id'] == test_mapping_id), None)
if test_mapping and "UPDATED" in test_mapping['name'] and test_mapping['output_actions'][0]['action'] == "CameraTiltUp":
print(f" [PASS] VERIFIED: Mapping #{test_mapping_id} updated successfully")
print(f" New name: {test_mapping['name']}")
print(f" New output: {test_mapping['output_actions'][0]['action']}")
else:
print(f" [FAIL] VERIFICATION FAILED: Mapping #{test_mapping_id} update not reflected!")
# 8. DELETE - Remove test mappings (CRITICAL: Delete in REVERSE order!)
print("\n2H. DELETE - Removing all test mappings")
mappings = await get_all_mappings()
test_mappings = [m for m in mappings if "TEST" in m['name']]
# Sort by ID descending to delete from highest to lowest
# This prevents cascade deletion bug where IDs shift after each delete
test_mappings_sorted = sorted(test_mappings, key=lambda x: x['id'], reverse=True)
print(f" Deleting {len(test_mappings_sorted)} mappings in reverse order (highest ID first)")
for m in test_mappings_sorted:
deleted = await delete_action_mapping(m['id'])
if deleted:
print(f" [OK] Deleted mapping #{m['id']}: {m['name']}")
# 9. READ - Verify deletion
mappings = await get_all_mappings()
print_mappings(mappings, "2I. READ - Mappings after deletion (verify removal)", filter_prefix="TEST")
remaining_test = [m for m in mappings if "TEST" in m['name']]
if not remaining_test:
print(f" [PASS] VERIFIED: All test mappings successfully deleted")
else:
print(f" [FAIL] VERIFICATION FAILED: {len(remaining_test)} test mappings still exist!")
# ========== FINAL SUMMARY ==========
print("\n" + "=" * 80)
print("TEST SUMMARY")
print("=" * 80)
servers = await get_all_servers()
mappings = await get_all_mappings()
print(f"Final server count: {len(servers)}")
print(f"Final mapping count: {len(mappings)}")
print("\n" + "=" * 80)
print("COMPREHENSIVE CRUD TEST COMPLETE")
print("=" * 80)
if __name__ == "__main__":
asyncio.run(main())