This MVP release provides a complete full-stack solution for managing action mappings in Geutebruck's GeViScope and GeViSoft video surveillance systems. ## Features ### Flutter Web Application (Port 8081) - Modern, responsive UI for managing action mappings - Action picker dialog with full parameter configuration - Support for both GSC (GeViScope) and G-Core server actions - Consistent UI for input and output actions with edit/delete capabilities - Real-time action mapping creation, editing, and deletion - Server categorization (GSC: prefix for GeViScope, G-Core: prefix for G-Core servers) ### FastAPI REST Backend (Port 8000) - RESTful API for action mapping CRUD operations - Action template service with comprehensive action catalog (247 actions) - Server management (G-Core and GeViScope servers) - Configuration tree reading and writing - JWT authentication with role-based access control - PostgreSQL database integration ### C# SDK Bridge (gRPC, Port 50051) - Native integration with GeViSoft SDK (GeViProcAPINET_4_0.dll) - Action mapping creation with correct binary format - Support for GSC and G-Core action types - Proper Camera parameter inclusion in action strings (fixes CrossSwitch bug) - Action ID lookup table with server-specific action IDs - Configuration reading/writing via SetupClient ## Bug Fixes - **CrossSwitch Bug**: GSC and G-Core actions now correctly display camera/PTZ head parameters in GeViSet - Action strings now include Camera parameter: `@ PanLeft (Comment: "", Camera: 101028)` - Proper filter flags and VideoInput=0 for action mappings - Correct action ID assignment (4198 for GSC, 9294 for G-Core PanLeft) ## Technical Stack - **Frontend**: Flutter Web, Dart, Dio HTTP client - **Backend**: Python FastAPI, PostgreSQL, Redis - **SDK Bridge**: C# .NET 8.0, gRPC, GeViSoft SDK - **Authentication**: JWT tokens - **Configuration**: GeViSoft .set files (binary format) ## Credentials - GeViSoft/GeViScope: username=sysadmin, password=masterkey - Default admin: username=admin, password=admin123 ## Deployment All services run on localhost: - Flutter Web: http://localhost:8081 - FastAPI: http://localhost:8000 - SDK Bridge gRPC: localhost:50051 - GeViServer: localhost (default port) Generated with Claude Code (https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
389 lines
13 KiB
Python
389 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
GeViSet Configuration File Parser and Generator
|
|
|
|
Supports reading and writing Geutebruck GeViSet .set configuration files.
|
|
|
|
Binary Format Specification:
|
|
- Type markers:
|
|
0x00 = Section/Container (2-byte LE length + name string)
|
|
0x01 = Boolean (1 byte: 0x00 or 0x01)
|
|
0x02 = 64-bit integer (8 bytes LE)
|
|
0x04 = 32-bit integer (4 bytes LE)
|
|
0x07 = Property name (1-byte length + string)
|
|
0x08 = String value (2-byte LE length + string)
|
|
"""
|
|
|
|
import struct
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Union
|
|
from collections import OrderedDict
|
|
|
|
|
|
class GeViSetParser:
|
|
"""Parser for GeViSet configuration files"""
|
|
|
|
# Type markers
|
|
TYPE_SECTION = 0x00
|
|
TYPE_BOOLEAN = 0x01
|
|
TYPE_INT64 = 0x02
|
|
TYPE_INT32 = 0x04
|
|
TYPE_PROPERTY_NAME = 0x07
|
|
TYPE_STRING = 0x08
|
|
|
|
def __init__(self):
|
|
self.data = None
|
|
self.pos = 0
|
|
self.config = OrderedDict()
|
|
|
|
def read_byte(self) -> int:
|
|
"""Read a single byte"""
|
|
if self.pos >= len(self.data):
|
|
raise EOFError(f"Unexpected end of file at position {self.pos}")
|
|
b = self.data[self.pos]
|
|
self.pos += 1
|
|
return b
|
|
|
|
def read_bytes(self, count: int) -> bytes:
|
|
"""Read multiple bytes"""
|
|
if self.pos + count > len(self.data):
|
|
raise EOFError(f"Unexpected end of file at position {self.pos}")
|
|
b = self.data[self.pos:self.pos + count]
|
|
self.pos += count
|
|
return b
|
|
|
|
def read_uint16(self) -> int:
|
|
"""Read 16-bit unsigned integer (little-endian)"""
|
|
return struct.unpack('<H', self.read_bytes(2))[0]
|
|
|
|
def read_uint32(self) -> int:
|
|
"""Read 32-bit unsigned integer (little-endian)"""
|
|
return struct.unpack('<I', self.read_bytes(4))[0]
|
|
|
|
def read_uint64(self) -> int:
|
|
"""Read 64-bit unsigned integer (little-endian)"""
|
|
return struct.unpack('<Q', self.read_bytes(8))[0]
|
|
|
|
def read_string_with_length(self, length_bytes: int = 2) -> str:
|
|
"""Read a length-prefixed string"""
|
|
if length_bytes == 1:
|
|
length = self.read_byte()
|
|
elif length_bytes == 2:
|
|
length = self.read_uint16()
|
|
else:
|
|
raise ValueError(f"Invalid length_bytes: {length_bytes}")
|
|
|
|
if length == 0:
|
|
return ""
|
|
|
|
string_bytes = self.read_bytes(length)
|
|
try:
|
|
return string_bytes.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
# Fallback to latin-1
|
|
return string_bytes.decode('latin-1')
|
|
|
|
def peek_byte(self) -> int:
|
|
"""Peek at the next byte without advancing position"""
|
|
if self.pos >= len(self.data):
|
|
raise EOFError(f"Unexpected end of file at position {self.pos}")
|
|
return self.data[self.pos]
|
|
|
|
def read_value(self, type_marker: int) -> Any:
|
|
"""Read a value based on its type marker"""
|
|
if type_marker == self.TYPE_BOOLEAN:
|
|
return bool(self.read_byte())
|
|
elif type_marker == self.TYPE_INT32:
|
|
return self.read_uint32()
|
|
elif type_marker == self.TYPE_INT64:
|
|
return self.read_uint64()
|
|
elif type_marker == self.TYPE_STRING:
|
|
return self.read_string_with_length(2)
|
|
else:
|
|
raise ValueError(f"Unknown value type: 0x{type_marker:02x} at position {self.pos - 1}")
|
|
|
|
def read_section(self, section_name: str = None) -> OrderedDict:
|
|
"""Read a section and its properties
|
|
|
|
After a section name, the format is:
|
|
- 4 bytes: int32 (metadata, possibly size or count)
|
|
- Then properties and nested sections
|
|
"""
|
|
section = OrderedDict()
|
|
|
|
# Read section metadata (comes after section name)
|
|
# This is just a 4-byte int32
|
|
if section_name:
|
|
try:
|
|
metadata_int = self.read_uint32()
|
|
# Store metadata for reference
|
|
section['_metadata'] = metadata_int
|
|
except EOFError:
|
|
return section
|
|
|
|
# Check if there's a flags/type byte before properties
|
|
# If next byte is NOT 0x00 (section) or 0x07 (property), skip it
|
|
if self.pos < len(self.data):
|
|
next_byte = self.peek_byte()
|
|
if next_byte not in [0x00, 0x07]:
|
|
# This is likely a flags/type byte, skip it
|
|
self.read_byte()
|
|
|
|
while self.pos < len(self.data):
|
|
# Peek at the next type marker
|
|
try:
|
|
type_marker = self.peek_byte()
|
|
except EOFError:
|
|
break
|
|
|
|
if type_marker == self.TYPE_SECTION:
|
|
# Nested section - read it recursively
|
|
self.read_byte() # Consume type marker
|
|
sub_section_name = self.read_string_with_length(1)
|
|
|
|
# Empty section name might indicate end of parent section
|
|
if not sub_section_name:
|
|
# End of section marker
|
|
# Usually followed by zeros
|
|
if self.pos + 3 < len(self.data):
|
|
self.pos += min(4, len(self.data) - self.pos)
|
|
break
|
|
|
|
section[sub_section_name] = self.read_section(sub_section_name)
|
|
|
|
elif type_marker == self.TYPE_PROPERTY_NAME:
|
|
# Property: name + value
|
|
self.read_byte() # Consume type marker
|
|
prop_name = self.read_string_with_length(1)
|
|
|
|
if not prop_name:
|
|
# Empty property name might indicate section end
|
|
continue
|
|
|
|
# Read value type and value
|
|
try:
|
|
value_type = self.read_byte()
|
|
value = self.read_value(value_type)
|
|
section[prop_name] = value
|
|
except (EOFError, ValueError) as e:
|
|
print(f"Warning: Error reading property '{prop_name}' at position {self.pos}: {e}")
|
|
break
|
|
|
|
else:
|
|
# Unknown type - might be end of section or malformed data
|
|
# Don't infinitely loop - break if we see too many unknowns
|
|
print(f"Warning: Unknown type marker 0x{type_marker:02x} at position {self.pos}")
|
|
break
|
|
|
|
return section
|
|
|
|
def parse_file(self, file_path: Union[str, Path]) -> OrderedDict:
|
|
"""Parse a GeViSet configuration file"""
|
|
file_path = Path(file_path)
|
|
self.data = file_path.read_bytes()
|
|
self.pos = 0
|
|
self.config = OrderedDict()
|
|
|
|
print(f"Parsing {file_path} ({len(self.data):,} bytes)...")
|
|
|
|
while self.pos < len(self.data):
|
|
try:
|
|
type_marker = self.peek_byte()
|
|
|
|
if type_marker == self.TYPE_SECTION:
|
|
self.read_byte() # Consume type marker
|
|
section_name = self.read_string_with_length(1) # Use 1-byte length
|
|
|
|
if not section_name:
|
|
# Check for end-of-file marker
|
|
if self.pos >= len(self.data) - 10:
|
|
break
|
|
continue
|
|
|
|
print(f" Reading section: {section_name}")
|
|
self.config[section_name] = self.read_section(section_name)
|
|
else:
|
|
# Skip unexpected bytes at end of file
|
|
if self.pos >= len(self.data) - 10:
|
|
break
|
|
print(f"Warning: Skipping unexpected byte 0x{type_marker:02x} at position {self.pos}")
|
|
self.read_byte()
|
|
|
|
except EOFError:
|
|
break
|
|
except Exception as e:
|
|
print(f"Error at position {self.pos}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
break
|
|
|
|
print(f"Parsing complete. Read {len(self.config)} top-level sections.")
|
|
return self.config
|
|
|
|
|
|
class GeViSetGenerator:
|
|
"""Generator for GeViSet configuration files"""
|
|
|
|
def __init__(self):
|
|
self.buffer = bytearray()
|
|
|
|
def write_byte(self, value: int):
|
|
"""Write a single byte"""
|
|
self.buffer.append(value & 0xFF)
|
|
|
|
def write_bytes(self, data: bytes):
|
|
"""Write multiple bytes"""
|
|
self.buffer.extend(data)
|
|
|
|
def write_uint16(self, value: int):
|
|
"""Write 16-bit unsigned integer (little-endian)"""
|
|
self.buffer.extend(struct.pack('<H', value))
|
|
|
|
def write_uint32(self, value: int):
|
|
"""Write 32-bit unsigned integer (little-endian)"""
|
|
self.buffer.extend(struct.pack('<I', value))
|
|
|
|
def write_uint64(self, value: int):
|
|
"""Write 64-bit unsigned integer (little-endian)"""
|
|
self.buffer.extend(struct.pack('<Q', value))
|
|
|
|
def write_string_with_length(self, text: str, length_bytes: int = 2):
|
|
"""Write a length-prefixed string"""
|
|
text_bytes = text.encode('utf-8')
|
|
|
|
if length_bytes == 1:
|
|
if len(text_bytes) > 255:
|
|
raise ValueError(f"String too long for 1-byte length: {len(text_bytes)} bytes")
|
|
self.write_byte(len(text_bytes))
|
|
elif length_bytes == 2:
|
|
if len(text_bytes) > 65535:
|
|
raise ValueError(f"String too long for 2-byte length: {len(text_bytes)} bytes")
|
|
self.write_uint16(len(text_bytes))
|
|
else:
|
|
raise ValueError(f"Invalid length_bytes: {length_bytes}")
|
|
|
|
self.write_bytes(text_bytes)
|
|
|
|
def write_value(self, value: Any):
|
|
"""Write a value with its type marker"""
|
|
if isinstance(value, bool):
|
|
self.write_byte(GeViSetParser.TYPE_BOOLEAN)
|
|
self.write_byte(1 if value else 0)
|
|
elif isinstance(value, int):
|
|
# Determine if 32-bit or 64-bit
|
|
if -2**31 <= value < 2**31:
|
|
self.write_byte(GeViSetParser.TYPE_INT32)
|
|
self.write_uint32(value & 0xFFFFFFFF)
|
|
else:
|
|
self.write_byte(GeViSetParser.TYPE_INT64)
|
|
self.write_uint64(value & 0xFFFFFFFFFFFFFFFF)
|
|
elif isinstance(value, str):
|
|
self.write_byte(GeViSetParser.TYPE_STRING)
|
|
self.write_string_with_length(value, 2)
|
|
else:
|
|
raise ValueError(f"Unsupported value type: {type(value)}")
|
|
|
|
def write_property(self, name: str, value: Any):
|
|
"""Write a property (name + value)"""
|
|
# Property name marker
|
|
self.write_byte(GeViSetParser.TYPE_PROPERTY_NAME)
|
|
self.write_string_with_length(name, 1)
|
|
|
|
# Property value
|
|
self.write_value(value)
|
|
|
|
def write_section(self, name: str, content: Dict[str, Any]):
|
|
"""Write a section with its properties"""
|
|
# Section marker and name
|
|
self.write_byte(GeViSetParser.TYPE_SECTION)
|
|
self.write_string_with_length(name, 1) # Use 1-byte length
|
|
|
|
# Write section metadata (just int32)
|
|
metadata = content.get('_metadata', 0)
|
|
if isinstance(metadata, dict):
|
|
# Handle old format
|
|
metadata = metadata.get('value', 0)
|
|
self.write_uint32(metadata)
|
|
|
|
# Write properties and nested sections
|
|
for key, value in content.items():
|
|
if key == '_metadata':
|
|
# Skip metadata - already written
|
|
continue
|
|
|
|
if isinstance(value, dict):
|
|
# Nested section
|
|
self.write_section(key, value)
|
|
else:
|
|
# Property
|
|
self.write_property(key, value)
|
|
|
|
# Section end marker (empty section)
|
|
self.write_byte(GeViSetParser.TYPE_SECTION)
|
|
self.write_byte(0) # Empty name (1-byte length)
|
|
# Followed by zeros
|
|
self.write_bytes(b'\x00\x00\x00\x00')
|
|
|
|
def generate_file(self, config: OrderedDict, file_path: Union[str, Path]):
|
|
"""Generate a GeViSet configuration file"""
|
|
file_path = Path(file_path)
|
|
self.buffer = bytearray()
|
|
|
|
print(f"Generating {file_path}...")
|
|
|
|
for section_name, section_content in config.items():
|
|
print(f" Writing section: {section_name}")
|
|
self.write_section(section_name, section_content)
|
|
|
|
# Write to file
|
|
file_path.write_bytes(self.buffer)
|
|
print(f"Generated {len(self.buffer):,} bytes.")
|
|
|
|
|
|
def main():
|
|
"""Example usage"""
|
|
import sys
|
|
import json
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage:")
|
|
print(" Parse: geviset_parser.py <input.set> [output.json]")
|
|
print(" Generate: geviset_parser.py <input.json> <output.set>")
|
|
return
|
|
|
|
input_file = Path(sys.argv[1])
|
|
|
|
if input_file.suffix == '.set':
|
|
# Parse mode
|
|
parser = GeViSetParser()
|
|
config = parser.parse_file(input_file)
|
|
|
|
# Output to JSON
|
|
output_file = Path(sys.argv[2]) if len(sys.argv) > 2 else input_file.with_suffix('.json')
|
|
with output_file.open('w', encoding='utf-8') as f:
|
|
json.dump(config, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\nConfiguration saved to {output_file}")
|
|
print(f"Top-level sections: {list(config.keys())}")
|
|
|
|
elif input_file.suffix == '.json':
|
|
# Generate mode
|
|
if len(sys.argv) < 3:
|
|
print("Error: Output .set file required for generation mode")
|
|
return
|
|
|
|
output_file = Path(sys.argv[2])
|
|
|
|
with input_file.open('r', encoding='utf-8') as f:
|
|
config = json.load(f, object_pairs_hook=OrderedDict)
|
|
|
|
generator = GeViSetGenerator()
|
|
generator.generate_file(config, output_file)
|
|
|
|
print(f"\nConfiguration file generated: {output_file}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|