from io import BytesIO from typing import Any, Dict, List, Tuple import openpyxl SERVER_HEADER_ALIASES = { "hostname": "alias", "alias": "alias", "typ": "type", "type": "type", "ip server": "host", "ip": "host", "server": "host", "username": "user", "user": "user", "password": "password", } KEYBOARD_HEADER_ALIASES = { "description": "description", "name": "name", "ip address": "host", "ip": "host", "host": "host", "ip host": "host", "port": "port", "ip port": "port", } def _clean(value): if value is None: return "" return str(value).strip() def _normalize_type(value: str) -> str: lowered = value.lower() if "geviscope" in lowered or "gsc" in lowered: return "geviscope" if "g-core" in lowered or "gcore" in lowered: return "gcore" return "gcore" def _load_sheets(contents: bytes): values_wb = openpyxl.load_workbook(BytesIO(contents), data_only=True) formulas_wb = openpyxl.load_workbook(BytesIO(contents), data_only=False) return values_wb.active, formulas_wb.active def _cell_value(values_sheet, formulas_sheet, row: int, col: int) -> Tuple[Any, bool]: if not col or col < 1: return None, False value = values_sheet.cell(row=row, column=col).value if value is None: fcell = formulas_sheet.cell(row=row, column=col) if fcell.data_type == "f": return None, True return value, False def parse_servers_xlsx(contents: bytes) -> Dict[str, Any]: sheet, formula_sheet = _load_sheets(contents) formula_missing = 0 header_row = None for row in range(1, min(15, sheet.max_row + 1)): for col in range(1, min(10, sheet.max_column + 1)): value, is_formula = _cell_value(sheet, formula_sheet, row, col) formula_missing += 1 if is_formula else 0 cell = _clean(value).lower() if cell == "hostname": header_row = row break if header_row: break if not header_row: raise ValueError("Could not find header row with 'Hostname'") header_map = {} for col in range(1, sheet.max_column + 1): value, is_formula = _cell_value(sheet, formula_sheet, header_row, col) formula_missing += 1 if is_formula else 0 name = _clean(value).lower() if name in SERVER_HEADER_ALIASES: header_map[SERVER_HEADER_ALIASES[name]] = col servers = [] skipped = 0 for row in range(header_row + 1, sheet.max_row + 1): alias_value, alias_formula = _cell_value(sheet, formula_sheet, row, header_map.get("alias", 0)) host_value, host_formula = _cell_value(sheet, formula_sheet, row, header_map.get("host", 0)) formula_missing += int(alias_formula) + int(host_formula) alias = _clean(alias_value) host = _clean(host_value) if not alias or not host: skipped += 1 continue type_value, type_formula = _cell_value(sheet, formula_sheet, row, header_map.get("type", 0)) user_value, user_formula = _cell_value(sheet, formula_sheet, row, header_map.get("user", 0)) pass_value, pass_formula = _cell_value(sheet, formula_sheet, row, header_map.get("password", 0)) formula_missing += int(type_formula) + int(user_formula) + int(pass_formula) server_type = _normalize_type(_clean(type_value)) user = _clean(user_value) or "sysadmin" password = _clean(pass_value) servers.append({ "alias": alias, "host": host, "user": user, "password": password, "type": server_type, "enabled": True, "deactivateEcho": False, "deactivateLiveCheck": False, }) return {"servers": servers, "skipped": skipped, "formula_cells_missing": formula_missing} def parse_actions_xlsx(contents: bytes) -> Dict[str, Any]: sheet, formula_sheet = _load_sheets(contents) formula_missing = 0 header_row = None for row in range(1, min(15, sheet.max_row + 1)): for col in range(1, min(20, sheet.max_column + 1)): value, is_formula = _cell_value(sheet, formula_sheet, row, col) formula_missing += 1 if is_formula else 0 if _clean(value).lower() == "camera id": header_row = row break if header_row: break if not header_row: raise ValueError("Could not find header row with 'Camera ID'") headers = {} for col in range(1, sheet.max_column + 1): value, is_formula = _cell_value(sheet, formula_sheet, header_row, col) formula_missing += 1 if is_formula else 0 name = _clean(value) if name: headers.setdefault(name, []).append(col) def col(name, idx=0): values = headers.get(name, []) if idx < len(values): return values[idx] return None compact_mode = all( name in headers for name in [ "Input Action", "Input Category", "Input Caption", "Output Action", "Output Caption", "Output Server Alias", ] ) rows: List[Dict[str, Any]] = [] for row in range(header_row + 1, sheet.max_row + 1): camera_value, camera_formula = _cell_value(sheet, formula_sheet, row, col("Camera ID")) formula_missing += 1 if camera_formula else 0 camera_id = _clean(camera_value) if not camera_id: continue if compact_mode: input_caption = _clean(_cell_value(sheet, formula_sheet, row, col("Input Caption"))[0]) output_server = _clean(_cell_value(sheet, formula_sheet, row, col("Output Server Alias"))[0]) entry = { "cameraId": camera_id, "server": _clean(_cell_value(sheet, formula_sheet, row, col("Server"))[0]) or output_server, "serverType": _clean(_cell_value(sheet, formula_sheet, row, col("Server Type"))[0]), "ptz": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ"))[0]), "caption": input_caption, "outputs": [ { "kind": "output1", "category": _clean(_cell_value(sheet, formula_sheet, row, col("Input Category"))[0]), "action": _clean(_cell_value(sheet, formula_sheet, row, col("Input Action"))[0]), "videoInput": camera_id, }, ], } server_type = _clean(_cell_value(sheet, formula_sheet, row, col("Server Type"))[0]).lower() output_action = _clean(_cell_value(sheet, formula_sheet, row, col("Output Action"))[0]) output_caption = _clean(_cell_value(sheet, formula_sheet, row, col("Output Caption"))[0]) output_entry = { "category": "", "action": output_action, "caption": output_caption, "server": output_server, "ptzHead": camera_id, "speed": "", } if "g-core" in server_type or "gcore" in server_type or "gng" in server_type: output_entry["kind"] = "gcore" else: output_entry["kind"] = "gsc" entry["outputs"].append(output_entry) else: input_value, input_formula = _cell_value(sheet, formula_sheet, row, col("Caption", 0)) formula_missing += 1 if input_formula else 0 input_caption = _clean(input_value) entry = { "cameraId": camera_id, "server": _clean(_cell_value(sheet, formula_sheet, row, col("Server"))[0]), "serverType": _clean(_cell_value(sheet, formula_sheet, row, col("Server Type"))[0]), "ptz": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ"))[0]), "caption": input_caption, "outputs": [ { "kind": "output1", "category": _clean(_cell_value(sheet, formula_sheet, row, col("Category", 0))[0]), "action": _clean(_cell_value(sheet, formula_sheet, row, col("Action", 0))[0]), "videoInput": _clean(_cell_value(sheet, formula_sheet, row, col("VideoInput", 0))[0]), }, { "kind": "gsc", "category": _clean(_cell_value(sheet, formula_sheet, row, col("Category", 1))[0]), "action": _clean(_cell_value(sheet, formula_sheet, row, col("Action", 1))[0]), "caption": _clean(_cell_value(sheet, formula_sheet, row, col("Caption", 1))[0]), "server": _clean(_cell_value(sheet, formula_sheet, row, col("GeviScope alias", 0))[0]), "ptzHead": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ head", 0))[0]), "speed": _clean(_cell_value(sheet, formula_sheet, row, col("speed", 0))[0]), }, { "kind": "gcore", "category": _clean(_cell_value(sheet, formula_sheet, row, col("Category", 2))[0]), "action": _clean(_cell_value(sheet, formula_sheet, row, col("Action", 2))[0]), "caption": _clean(_cell_value(sheet, formula_sheet, row, col("Caption", 2))[0]), "server": _clean(_cell_value(sheet, formula_sheet, row, col("G-Core alias", 0))[0]), "ptzHead": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ head", 1))[0]), "speed": _clean(_cell_value(sheet, formula_sheet, row, col("speed", 1))[0]), }, ], } rows.append(entry) return {"rows": rows, "formula_cells_missing": formula_missing} def parse_keyboards_xlsx(contents: bytes) -> Dict[str, Any]: sheet, formula_sheet = _load_sheets(contents) formula_missing = 0 header_row = None for row in range(1, min(15, sheet.max_row + 1)): for col in range(1, min(10, sheet.max_column + 1)): value, is_formula = _cell_value(sheet, formula_sheet, row, col) formula_missing += 1 if is_formula else 0 cell = _clean(value).lower() if cell in ("description", "name"): header_row = row break if header_row: break if not header_row: raise ValueError("Could not find header row with 'Name' or 'Description'") header_map = {} for col in range(1, sheet.max_column + 1): value, is_formula = _cell_value(sheet, formula_sheet, header_row, col) formula_missing += 1 if is_formula else 0 name = _clean(value).lower() if name in KEYBOARD_HEADER_ALIASES: header_map[KEYBOARD_HEADER_ALIASES[name]] = col keyboards = [] skipped = 0 for row in range(header_row + 1, sheet.max_row + 1): name_value, name_formula = _cell_value(sheet, formula_sheet, row, header_map.get("name", 0)) desc_value, desc_formula = _cell_value(sheet, formula_sheet, row, header_map.get("description", 0)) host_value, host_formula = _cell_value(sheet, formula_sheet, row, header_map.get("host", 0)) port_value, port_formula = _cell_value(sheet, formula_sheet, row, header_map.get("port", 0)) formula_missing += int(name_formula) + int(desc_formula) + int(host_formula) + int(port_formula) name = _clean(name_value) description = _clean(desc_value) host = _clean(host_value) port_raw = _clean(port_value) if not name and not description and not host and not port_raw: skipped += 1 continue if not host or not port_raw: skipped += 1 continue try: port = int(float(port_raw)) except ValueError: skipped += 1 continue keyboards.append({"name": name, "description": description, "host": host, "port": port}) return {"keyboards": keyboards, "skipped": skipped, "formula_cells_missing": formula_missing} def parse_video_outputs_xlsx(contents: bytes) -> Dict[str, Any]: sheet, formula_sheet = _load_sheets(contents) formula_missing = 0 header_row = None for row in range(1, min(15, sheet.max_row + 1)): for col in range(1, min(10, sheet.max_column + 1)): value, is_formula = _cell_value(sheet, formula_sheet, row, col) formula_missing += 1 if is_formula else 0 cell = _clean(value).lower().replace(" ", "") if cell in ("localid", "local_id", "local"): header_row = row break if header_row: break if not header_row: raise ValueError("Could not find header row with 'Local ID'") header_map = {} for col in range(1, sheet.max_column + 1): value, is_formula = _cell_value(sheet, formula_sheet, header_row, col) formula_missing += 1 if is_formula else 0 name = _clean(value).lower().replace(" ", "") if name in ("localid", "local_id", "local"): header_map["localId"] = col elif name in ("globalid", "global_id", "global"): header_map["globalId"] = col elif name == "name": header_map["name"] = col elif name in ("description", "desc"): header_map["description"] = col outputs = [] skipped = 0 for row in range(header_row + 1, sheet.max_row + 1): local_value, local_formula = _cell_value(sheet, formula_sheet, row, header_map.get("localId", 0)) global_value, global_formula = _cell_value(sheet, formula_sheet, row, header_map.get("globalId", 0)) name_value, name_formula = _cell_value(sheet, formula_sheet, row, header_map.get("name", 0)) desc_value, desc_formula = _cell_value(sheet, formula_sheet, row, header_map.get("description", 0)) formula_missing += int(local_formula) + int(global_formula) + int(name_formula) + int(desc_formula) local_raw = _clean(local_value) global_raw = _clean(global_value) if not local_raw and not global_raw and not name_value and not desc_value: skipped += 1 continue try: local_id = int(float(local_raw)) if local_raw else None except ValueError: local_id = None try: global_id = int(float(global_raw)) if global_raw else None except ValueError: global_id = None outputs.append({ "localId": local_id, "globalId": global_id, "name": _clean(name_value), "description": _clean(desc_value), }) return {"outputs": outputs, "skipped": skipped, "formula_cells_missing": formula_missing} def build_video_outputs_xlsx(outputs: List[Dict[str, Any]]) -> bytes: wb = openpyxl.Workbook() ws = wb.active ws.title = "VideoOutputs" ws.append(["Local ID", "Global ID", "Name", "Description"]) for row in outputs: ws.append([ row.get("localId"), row.get("globalId"), row.get("name", ""), row.get("description", ""), ]) out = BytesIO() wb.save(out) return out.getvalue() def build_logging_xlsx(rows: List[Dict[str, Any]]) -> bytes: wb = openpyxl.Workbook() ws = wb.active ws.title = "Logging" ws.append(["Action name", "Log into database", "Notify user", "Notify GeViCom"]) for row in rows: ws.append([ row.get("actionName", ""), "X" if row.get("db") else "", "X" if row.get("user") else "", "X" if row.get("com") else "", ]) out = BytesIO() wb.save(out) return out.getvalue() def parse_logging_xlsx(contents: bytes) -> Dict[str, Any]: sheet, formula_sheet = _load_sheets(contents) formula_missing = 0 header_row = None for row in range(1, min(15, sheet.max_row + 1)): for col in range(1, min(10, sheet.max_column + 1)): value, is_formula = _cell_value(sheet, formula_sheet, row, col) formula_missing += 1 if is_formula else 0 cell = _clean(value).lower().replace(" ", "") if cell in ("actionname", "action", "actionid", "action_id"): header_row = row break if header_row: break if not header_row: raise ValueError("Could not find header row with 'Action name'") header_map = {} for col in range(1, sheet.max_column + 1): value, is_formula = _cell_value(sheet, formula_sheet, header_row, col) formula_missing += 1 if is_formula else 0 name = _clean(value).lower().replace(" ", "") if name in ("actionname", "action", "actionid", "action_id"): header_map["actionName"] = col elif name in ("logintodatabase", "logtodatabase", "database", "db"): header_map["db"] = col elif name in ("notifyuser", "user"): header_map["user"] = col elif name in ("notifygevicom", "gevicom", "com"): header_map["com"] = col def _to_bool(value: Any) -> bool: if value is None: return False if isinstance(value, bool): return value if isinstance(value, (int, float)): return value != 0 text = _clean(value).lower() return text in ("x", "yes", "true", "1", "y") rows = [] skipped = 0 for row in range(header_row + 1, sheet.max_row + 1): action_value, action_formula = _cell_value(sheet, formula_sheet, row, header_map.get("actionName", 0)) db_value, db_formula = _cell_value(sheet, formula_sheet, row, header_map.get("db", 0)) user_value, user_formula = _cell_value(sheet, formula_sheet, row, header_map.get("user", 0)) com_value, com_formula = _cell_value(sheet, formula_sheet, row, header_map.get("com", 0)) formula_missing += int(action_formula) + int(db_formula) + int(user_formula) + int(com_formula) action_name = _clean(action_value) if not action_name and not db_value and not user_value and not com_value: skipped += 1 continue if not action_name: skipped += 1 continue rows.append({ "actionName": action_name, "db": _to_bool(db_value), "user": _to_bool(user_value), "com": _to_bool(com_value), }) return {"rows": rows, "skipped": skipped, "formula_cells_missing": formula_missing}