import argparse import copy import json import sys from pathlib import Path TYPE_NAMES = { 0: "folder", 1: "bool", 2: "byte", 3: "int16", 4: "int32", 5: "int64", 7: "string", } NAME_TO_TYPE = {v: k for k, v in TYPE_NAMES.items()} INT_SIZES = {1: 1, 2: 1, 3: 2, 4: 4, 5: 8} GSC_GLOBAL_KEYS = { "DialUpCPADay0", "DialUpCPADay1", "DialUpCPADay2", "DialUpCPADay3", "DialUpCPADay4", "DialUpCPADay5", "DialUpCPADay6", "DialUpCPAResponseWindow", "DialUpCPAThreshold", "DialUpLimitActionCount", "DialUpLimitTimeDepth", "DialUpWorkerThreads", "FailoverActive", } GSC_ENTRY_FIELDS = { "Alias", "Host", "User", "Password", "Enabled", "DeactivateEcho", "DeactivateLiveCheck", "DialUpBroadcastAware", "DialUpConnection", "DialUpCPAConnection", "DialUpCPAConnectionInterval", "DialUpCPATimeSettings", "DialUpKeepAlive", "DialUpKeepAliveRetrigger", "DialUpKeepAliveTime", } # ---------- Binary read/write ---------- def _read_node(blob: bytes, offset: int): node_type = blob[offset] try: type_name = TYPE_NAMES[node_type] except KeyError: raise ValueError(f"Unsupported type byte {node_type} at offset {offset}") name_len = blob[offset + 1] name_start = offset + 2 name_end = name_start + name_len name = blob[name_start:name_end].decode("utf-8") cursor = name_end if node_type == 0: child_count = int.from_bytes(blob[cursor:cursor + 4], "little") cursor += 4 children = [] for _ in range(child_count): child, cursor = _read_node(blob, cursor) children.append(child) return {"name": name, "type": type_name, "children": children}, cursor if node_type == 7: str_len = int.from_bytes(blob[cursor:cursor + 2], "little") cursor += 2 str_end = cursor + str_len value = blob[cursor:str_end].decode("utf-8") cursor = str_end return {"name": name, "type": type_name, "value": value}, cursor size = INT_SIZES[node_type] value = int.from_bytes(blob[cursor:cursor + size], "little", signed=False) cursor += size if node_type == 1: value = bool(value) return {"name": name, "type": type_name, "value": value}, cursor def load_set(path: Path): data = path.read_bytes() return load_set_bytes(data) def load_set_bytes(data: bytes): tree, cursor = _read_node(data, 0) if cursor != len(data): raise ValueError(f"Did not consume complete file, stopped at {cursor} of {len(data)} bytes") return tree def _write_node(node, out: bytearray): try: node_type = NAME_TO_TYPE[node["type"]] except KeyError: raise ValueError(f"Unknown node type {node.get('type')}") name_bytes = node["name"].encode("utf-8") if len(name_bytes) > 0xFF: raise ValueError(f"Name too long: {node['name']}") out.append(node_type) out.append(len(name_bytes)) out.extend(name_bytes) if node_type == 0: children = node.get("children", []) out.extend(len(children).to_bytes(4, "little")) for child in children: _write_node(child, out) return if node_type == 7: value_bytes = str(node.get("value", "")).encode("utf-8") if len(value_bytes) > 0xFFFF: raise ValueError(f"String too long on node {node['name']}") out.extend(len(value_bytes).to_bytes(2, "little")) out.extend(value_bytes) return size = INT_SIZES[node_type] value = node.get("value") if value is None: raise ValueError(f"Missing value for node {node['name']}") if node_type == 1: value = 1 if value else 0 max_val = (1 << (size * 8)) - 1 if not 0 <= int(value) <= max_val: raise ValueError(f"Value out of range for node {node['name']}: {value}") out.extend(int(value).to_bytes(size, "little", signed=False)) def save_set(tree, path: Path): path.write_bytes(save_set_bytes(tree)) def save_set_bytes(tree) -> bytes: out = bytearray() _write_node(tree, out) return bytes(out) # ---------- CLI helpers ---------- def dump_command(args): tree = load_set(Path(args.input)) payload = tree if args.output: Path(args.output).write_text(json.dumps(payload, indent=2)) else: json.dump(payload, sys.stdout, indent=2) sys.stdout.write("\n") def build_command(args): payload = json.loads(Path(args.input).read_text()) save_set(payload, Path(args.output)) # ---------- Mapping rules helpers ---------- def _find_folder(root, path): node = root for name in path: if not isinstance(node, dict) or node.get("type") != "folder": return None node = next((c for c in node.get("children", []) if c.get("name") == name), None) if node is None: return None return node def _child_by_name(folder, name): if folder is None or folder.get("type") != "folder": return None return next((c for c in folder.get("children", []) if c.get("name") == name), None) def _parse_output(folder): children = folder.get("children", []) name_node = _child_by_name(folder, "@") if _child_by_name(folder, "GscAction"): action_key = "GscAction" elif _child_by_name(folder, "GCoreAction"): action_key = "GCoreAction" else: action_key = None if _child_by_name(folder, "GscServer"): server_key = "GscServer" elif _child_by_name(folder, "GCoreServer"): server_key = "GCoreServer" else: server_key = None order = [c.get("name") for c in children] output = { "id": folder.get("name"), "name": name_node.get("value") if name_node else "", "flags": { "primary": (_child_by_name(folder, "@!") or {}).get("value", 0), "secondary": (_child_by_name(folder, "@@") or {}).get("value", 0), }, "actionField": action_key, "serverField": server_key, "_order": order, } if action_key: output["action"] = (_child_by_name(folder, action_key) or {}).get("value", "") if server_key: output["server"] = (_child_by_name(folder, server_key) or {}).get("value", "") used = {"@", "@!", "@@", action_key, server_key} extras = [copy.deepcopy(c) for c in children if c.get("name") not in used] if extras: output["extras"] = extras return output def _parse_rule(folder): children = folder.get("children", []) order = [c.get("name") for c in children] inputs = { "name": (_child_by_name(folder, "@") or {}).get("value", ""), "flags": { "primary": (_child_by_name(folder, "@!") or {}).get("value", 0), "secondary": (_child_by_name(folder, "@@") or {}).get("value", 0), } } video_input_field = _child_by_name(folder, "VideoInput") if video_input_field is not None: inputs["videoInputId"] = video_input_field.get("value", 0) temp_field = _child_by_name(folder, "Temp") if temp_field is not None: inputs["temp"] = temp_field.get("value", 0) filters = [copy.deepcopy(c) for c in children if isinstance(c.get("name"), str) and c["name"].startswith(".")] other_fields = [ copy.deepcopy(c) for c in children if c.get("name") not in {".Temp", ".VideoInput", "@", "@!", "@@", "Rules"} and not c.get("name", "").startswith(".") ] rules_folder = _child_by_name(folder, "Rules") outputs = [] if rules_folder and rules_folder.get("type") == "folder": for child in rules_folder.get("children", []): if child.get("type") == "folder": outputs.append(_parse_output(child)) rule = {"id": folder.get("name"), "input": inputs, "outputs": outputs, "_order": order} if filters: rule["filters"] = filters if other_fields: rule["fields"] = other_fields return rule def _make_bool(name, value): return {"name": name, "type": "bool", "value": bool(value)} def _make_int32(name, value): return {"name": name, "type": "int32", "value": int(value)} def _make_string(name, value): return {"name": name, "type": "string", "value": str(value)} def _build_output(output): action_field = output.get("actionField") server_field = output.get("serverField") children = [ _make_string("@", output.get("name", "")), _make_int32("@!", output.get("flags", {}).get("primary", 0)), _make_int32("@@", output.get("flags", {}).get("secondary", 0)), ] if action_field: children.append(_make_string(action_field, output.get("action", ""))) if server_field: children.append(_make_string(server_field, output.get("server", ""))) for extra in output.get("extras", []): children.append(copy.deepcopy(extra)) order = output.get("_order") if order: children = _apply_order(children, order) return {"name": output.get("id", ""), "type": "folder", "children": children} def _build_rule(rule): inp = rule.get("input", {}) children = [ _make_string("@", inp.get("name", "")), _make_int32("@!", inp.get("flags", {}).get("primary", 0)), _make_int32("@@", inp.get("flags", {}).get("secondary", 0)), ] for filt in rule.get("filters", []): children.append(copy.deepcopy(filt)) # Outputs outputs_children = [] for out in rule.get("outputs", []): outputs_children.append(_build_output(out)) children.append({"name": "Rules", "type": "folder", "children": outputs_children}) # Tail fields for field in rule.get("fields", []): fcopy = copy.deepcopy(field) if fcopy.get("name") == "VideoInput" and "videoInputId" in inp: fcopy["value"] = inp["videoInputId"] if fcopy.get("name") == "Temp" and "temp" in inp: fcopy["value"] = inp["temp"] children.append(fcopy) order = rule.get("_order") if order: children = _apply_order(children, order) return {"name": rule.get("id", ""), "type": "folder", "children": children} def _apply_order(children, order): if not order: return children name_to_child = {c.get("name"): c for c in children} ordered = [] used = set() for name in order: child = name_to_child.get(name) if child is not None: ordered.append(child) used.add(name) for c in children: if c.get("name") not in used: ordered.append(c) return ordered def extract_mapping(tree): mapping_root = _find_folder(tree, ["MappingRules"]) if not mapping_root: raise ValueError("MappingRules folder not found in the .set file") rules = [] extras = [] order = [c.get("name") for c in mapping_root.get("children", [])] for child in mapping_root.get("children", []): if child.get("type") == "folder": rules.append(_parse_rule(child)) else: extras.append(copy.deepcopy(child)) return {"rules": rules, "extras": extras, "_order": order} def apply_mapping(tree, mapping): mapping_root = _find_folder(tree, ["MappingRules"]) if not mapping_root: raise ValueError("MappingRules folder not found in the .set file") new_children = [] for rule in mapping.get("rules", []): new_children.append(_build_rule(rule)) for extra in mapping.get("extras", []): new_children.append(copy.deepcopy(extra)) order = mapping.get("_order") if order: new_children = _apply_order(new_children, order) mapping_root["children"] = new_children # ---------- GCore server helpers ---------- def extract_gcore(tree): gcore_root = _find_folder(tree, ["GeViGCoreServer"]) if not gcore_root: raise ValueError("GeViGCoreServer folder not found in the .set file") order = [c.get("name") for c in gcore_root.get("children", [])] servers = [] extras = [] for child in gcore_root.get("children", []): if child.get("type") == "folder": servers.append(_parse_gcore_entry(child)) else: extras.append(copy.deepcopy(child)) return {"servers": servers, "extras": extras, "_order": order} def _parse_gcore_entry(folder): children = folder.get("children", []) order = [c.get("name") for c in children] entry = { "id": folder.get("name"), "alias": (_child_by_name(folder, "Alias") or {}).get("value", ""), "host": (_child_by_name(folder, "Host") or {}).get("value", ""), "user": (_child_by_name(folder, "User") or {}).get("value", ""), "password": (_child_by_name(folder, "Password") or {}).get("value", ""), "enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)), "deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)), "deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)), "_order": order, } used = {"Alias", "Host", "User", "Password", "Enabled", "DeactivateEcho", "DeactivateLiveCheck"} extras = [copy.deepcopy(c) for c in children if c.get("name") not in used] if extras: entry["extras"] = extras return entry def _build_gcore_entry(entry): children = [ _make_string("Alias", entry.get("alias", "")), _make_bool("DeactivateEcho", entry.get("deactivateEcho", False)), _make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)), _make_bool("Enabled", entry.get("enabled", False)), _make_string("Host", entry.get("host", "")), _make_string("Password", entry.get("password", "")), _make_string("User", entry.get("user", "")), ] for extra in entry.get("extras", []): children.append(copy.deepcopy(extra)) order = entry.get("_order") if order: children = _apply_order(children, order) return {"name": entry.get("id", ""), "type": "folder", "children": children} def apply_gcore(tree, gcore): gcore_root = _find_folder(tree, ["GeViGCoreServer"]) if not gcore_root: raise ValueError("GeViGCoreServer folder not found in the .set file") children = [] for entry in gcore.get("servers", []): children.append(_build_gcore_entry(entry)) for extra in gcore.get("extras", []): children.append(copy.deepcopy(extra)) order = gcore.get("_order") if order: children = _apply_order(children, order) gcore_root["children"] = children # ---------- GeViScope server helpers ---------- def extract_gsc(tree): gsc_root = _find_folder(tree, ["GeViGscServer"]) if not gsc_root: raise ValueError("GeViGscServer folder not found in the .set file") order = [c.get("name") for c in gsc_root.get("children", [])] servers = [] extras = [] global_settings = {} for child in gsc_root.get("children", []): if child.get("type") == "folder": servers.append(_parse_gsc_entry(child)) else: if child.get("name") in GSC_GLOBAL_KEYS: global_settings[child.get("name")] = child.get("value") else: extras.append(copy.deepcopy(child)) return { "servers": servers, "globalSettings": global_settings, "extras": extras, "_order": order, } def _parse_gsc_entry(folder): children = folder.get("children", []) order = [c.get("name") for c in children] entry = { "id": folder.get("name"), "alias": (_child_by_name(folder, "Alias") or {}).get("value", ""), "host": (_child_by_name(folder, "Host") or {}).get("value", ""), "user": (_child_by_name(folder, "User") or {}).get("value", ""), "password": (_child_by_name(folder, "Password") or {}).get("value", ""), "enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)), "deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)), "deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)), "dialUpBroadcastAware": bool((_child_by_name(folder, "DialUpBroadcastAware") or {}).get("value", False)), "dialUpConnection": bool((_child_by_name(folder, "DialUpConnection") or {}).get("value", False)), "dialUpCPAConnection": bool((_child_by_name(folder, "DialUpCPAConnection") or {}).get("value", False)), "dialUpCPAConnectionInterval": (_child_by_name(folder, "DialUpCPAConnectionInterval") or {}).get("value", 0), "dialUpCPATimeSettings": (_child_by_name(folder, "DialUpCPATimeSettings") or {}).get("value", 0), "dialUpKeepAlive": bool((_child_by_name(folder, "DialUpKeepAlive") or {}).get("value", False)), "dialUpKeepAliveRetrigger": bool((_child_by_name(folder, "DialUpKeepAliveRetrigger") or {}).get("value", False)), "dialUpKeepAliveTime": (_child_by_name(folder, "DialUpKeepAliveTime") or {}).get("value", 0), "_order": order, } extras = [copy.deepcopy(c) for c in children if c.get("name") not in GSC_ENTRY_FIELDS] if extras: entry["extras"] = extras return entry def _build_gsc_entry(entry): children = [ _make_string("Alias", entry.get("alias", "")), _make_bool("DeactivateEcho", entry.get("deactivateEcho", False)), _make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)), _make_bool("Enabled", entry.get("enabled", False)), _make_string("Host", entry.get("host", "")), _make_string("Password", entry.get("password", "")), _make_string("User", entry.get("user", "")), _make_bool("DialUpBroadcastAware", entry.get("dialUpBroadcastAware", False)), _make_bool("DialUpConnection", entry.get("dialUpConnection", False)), _make_bool("DialUpCPAConnection", entry.get("dialUpCPAConnection", False)), _make_int32("DialUpCPAConnectionInterval", entry.get("dialUpCPAConnectionInterval", 0)), _make_int32("DialUpCPATimeSettings", entry.get("dialUpCPATimeSettings", 0)), _make_bool("DialUpKeepAlive", entry.get("dialUpKeepAlive", False)), _make_bool("DialUpKeepAliveRetrigger", entry.get("dialUpKeepAliveRetrigger", False)), _make_int32("DialUpKeepAliveTime", entry.get("dialUpKeepAliveTime", 0)), ] for extra in entry.get("extras", []): children.append(copy.deepcopy(extra)) order = entry.get("_order") if order: children = _apply_order(children, order) return {"name": entry.get("id", ""), "type": "folder", "children": children} def apply_gsc(tree, gsc): gsc_root = _find_folder(tree, ["GeViGscServer"]) if not gsc_root: raise ValueError("GeViGscServer folder not found in the .set file") children = [] for entry in gsc.get("servers", []): children.append(_build_gsc_entry(entry)) for key, value in (gsc.get("globalSettings") or {}).items(): if isinstance(value, bool): children.append(_make_bool(key, value)) else: children.append(_make_int32(key, value)) for extra in gsc.get("extras", []): children.append(copy.deepcopy(extra)) order = gsc.get("_order") if order: children = _apply_order(children, order) gsc_root["children"] = children def extract_video_outputs(tree): paths = [ ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"], ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"], ["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoOutputs"], ] folder = None for path in paths: folder = _find_folder(tree, path) if folder: break if not folder: return [] outputs = [] for child in folder.get("children", []): if child.get("type") != "folder": continue gid = _child_by_name(child, "GlobalID") lid = _child_by_name(child, "LocalID") name = _child_by_name(child, "Name") desc = _child_by_name(child, "Description") outputs.append({ "globalId": gid.get("value") if gid else None, "localId": lid.get("value") if lid else None, "name": name.get("value") if name else "", "description": desc.get("value") if desc else "", }) outputs.sort(key=lambda x: (x["localId"] or 0, x["globalId"] or 0)) return outputs def _build_video_output_children(template_children, output): children = [] has_global = False has_local = False has_name = False has_desc = False if template_children: for node in template_children: node_copy = copy.deepcopy(node) if node_copy.get("name") == "GlobalID": node_copy["value"] = output.get("globalId", node_copy.get("value")) has_global = True elif node_copy.get("name") == "LocalID": node_copy["value"] = output.get("localId", node_copy.get("value")) has_local = True elif node_copy.get("name") == "Name": node_copy["value"] = output.get("name", node_copy.get("value", "")) has_name = True elif node_copy.get("name") == "Description": node_copy["value"] = output.get("description", node_copy.get("value", "")) has_desc = True elif node_copy.get("name") == "Enabled": node_copy["value"] = True children.append(node_copy) else: children = [ _make_bool("Enabled", True), _make_int32("GlobalID", output.get("globalId", 0)), _make_int32("LocalID", output.get("localId", 0)), _make_string("Name", output.get("name", "")), _make_string("Description", output.get("description", "")), ] has_global = has_local = has_name = has_desc = True if not has_global: children.append(_make_int32("GlobalID", output.get("globalId", 0))) if not has_local: children.append(_make_int32("LocalID", output.get("localId", 0))) if not has_name and output.get("name"): children.append(_make_string("Name", output.get("name", ""))) if not has_desc and output.get("description"): children.append(_make_string("Description", output.get("description", ""))) return children def _apply_video_outputs_path(tree, outputs, path): folder = _find_folder(tree, path) if not folder: return template_children = None for child in folder.get("children", []): if child.get("type") == "folder": template_children = child.get("children", []) break new_children = [] for idx, output in enumerate(outputs, start=1): name = str(idx) new_children.append({ "name": name, "type": "folder", "children": _build_video_output_children(template_children, output), }) folder["children"] = new_children def apply_video_outputs(tree, outputs): if not isinstance(outputs, list) or not outputs: return cleaned = [] for row in outputs: if not isinstance(row, dict): continue local_id = row.get("localId") global_id = row.get("globalId") if local_id is None and global_id is None: continue cleaned.append({ "localId": local_id, "globalId": global_id, "name": row.get("name", ""), "description": row.get("description", ""), }) if not cleaned: return cleaned.sort(key=lambda x: (x.get("localId") or 0, x.get("globalId") or 0)) _apply_video_outputs_path( tree, cleaned, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"], ) _apply_video_outputs_path( tree, cleaned, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"], ) geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"]) if geviio: for child in geviio.get("children", []): if _is_keyboard_interface(child): _apply_video_outputs_path( tree, cleaned, ["Clients", "GeViIO", "GeViIO_01", child.get("name"), "VideoOutputs"], ) def extract_logging(tree): log_root = _find_folder(tree, ["Globals", "LogData"]) if not log_root: return [] rows = [] for child in log_root.get("children", []): if child.get("type") != "folder": continue action_name = str(child.get("name", "")) db = _child_by_name(child, "DB") user = _child_by_name(child, "User") com = _child_by_name(child, "Com") rows.append({ "actionName": action_name, "db": bool(db.get("value")) if db else False, "user": bool(user.get("value")) if user else False, "com": bool(com.get("value")) if com else False, }) return rows def apply_logging(tree, rows): if not isinstance(rows, list) or not rows: return globals_folder = _find_folder(tree, ["Globals"]) if not globals_folder: return log_root = _child_by_name(globals_folder, "LogData") if not log_root: log_root = {"name": "LogData", "type": "folder", "children": []} globals_folder.setdefault("children", []).append(log_root) existing = {} for child in log_root.get("children", []): if child.get("type") == "folder": existing[str(child.get("name", ""))] = child for row in rows: if not isinstance(row, dict): continue action_name = str(row.get("actionName", "")).strip() if not action_name: continue target = existing.get(action_name) if not target: target = {"name": action_name, "type": "folder", "children": []} log_root.setdefault("children", []).append(target) existing[action_name] = target children = target.setdefault("children", []) _ensure_children_fields( children, [ _make_bool("Com", bool(row.get("com"))), _make_bool("DB", bool(row.get("db"))), _make_bool("User", bool(row.get("user"))), ], ) for child in children: if child.get("name") == "Com": child["value"] = bool(row.get("com")) elif child.get("name") == "DB": child["value"] = bool(row.get("db")) elif child.get("name") == "User": child["value"] = bool(row.get("user")) def _ensure_video_inputs_path(tree, global_ids, ptz_by_id, path): folder = _find_folder(tree, path) if not folder: return ptz_by_id = ptz_by_id or {} target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0} existing = {} max_name = 0 max_local_id = 0 template_children = None for child in folder.get("children", []): if child.get("type") != "folder": continue name = child.get("name") try: max_name = max(max_name, int(name)) except (TypeError, ValueError): pass gid = _child_by_name(child, "GlobalID") lid = _child_by_name(child, "LocalID") if gid is not None: gid_val = int(gid.get("value", 0)) existing[gid_val] = child _ensure_children_fields( child.get("children", []), [ _make_bool("Active", True), _make_string("Name", f"Video input{gid_val}"), _make_string("Description", f"Video input{gid_val}"), ], ) if gid_val in ptz_by_id: ptz_value = bool(ptz_by_id[gid_val]) ptz_node = _child_by_name(child, "PTZ") if ptz_node is not None: ptz_node["value"] = ptz_value ptz_link = _child_by_name(child, "- PTZ Linked") if ptz_link is not None: ptz_link["value"] = True enabled_node = _child_by_name(child, "Enabled") if enabled_node is not None: enabled_node["value"] = True active_node = _child_by_name(child, "Active") if active_node is not None: active_node["value"] = True if lid is not None: max_local_id = max(max_local_id, int(lid.get("value", 0))) if template_children is None: template_children = child.get("children", []) if target_ids: kept = [] for child in folder.get("children", []): if child.get("type") != "folder": kept.append(child) continue gid_node = _child_by_name(child, "GlobalID") if gid_node is None: kept.append(child) continue try: gid_val = int(gid_node.get("value", 0)) except (TypeError, ValueError): kept.append(child) continue if gid_val in target_ids: kept.append(child) folder["children"] = kept # Reassign LocalID sequentially for consistency. ordered = [] for child in kept: if child.get("type") != "folder": continue gid_node = _child_by_name(child, "GlobalID") if gid_node is None: continue try: gid_val = int(gid_node.get("value", 0)) except (TypeError, ValueError): continue ordered.append((gid_val, child)) ordered.sort(key=lambda item: item[0]) local_id = 1 for _, child in ordered: lid_node = _child_by_name(child, "LocalID") if lid_node is not None: lid_node["value"] = local_id local_id += 1 for gid in sorted(target_ids): if gid in existing: continue max_name += 1 max_local_id += 1 if template_children: children = [] has_name = False has_desc = False for node in template_children: node_copy = copy.deepcopy(node) if node_copy.get("name") == "GlobalID": node_copy["value"] = gid elif node_copy.get("name") == "LocalID": node_copy["value"] = max_local_id elif node_copy.get("name") == "Name": node_copy["value"] = f"Video input{gid}" has_name = True elif node_copy.get("name") == "Description": node_copy["value"] = f"Video input{gid}" has_desc = True elif node_copy.get("name") in ["PTZ", "Enabled", "- Linked", "- PTZ Linked", "Active"]: if node_copy.get("name") == "PTZ": node_copy["value"] = bool(ptz_by_id.get(gid, True)) else: node_copy["value"] = True children.append(node_copy) if not has_name: children.append(_make_string("Name", f"Video input{gid}")) if not has_desc: children.append(_make_string("Description", f"Video input{gid}")) else: ptz_value = bool(ptz_by_id.get(gid, True)) children = [ _make_bool("- Linked", True), _make_bool("- PTZ Linked", True), _make_bool("Active", True), _make_bool("Enabled", True), _make_int32("GlobalID", gid), _make_int32("LocalID", max_local_id), _make_string("Name", f"Video input{gid}"), _make_string("Description", f"Video input{gid}"), _make_bool("PTZ", ptz_value), ] folder.get("children", []).append( {"name": str(max_name), "type": "folder", "children": children} ) def ensure_video_inputs(tree, global_ids, ptz_by_id=None): ptz_by_id = ptz_by_id or {} _ensure_video_inputs_path( tree, global_ids, ptz_by_id, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec", "VideoInputs"], ) def _ensure_children_fields(children, required): existing = {c.get("name") for c in children} for node in required: if node.get("name") not in existing: children.append(node) def _vx3_required_fields(gid, local_id, name_prefix, has_ptz=True): return [ _make_int32("- Count Of MBeg Links", 3), _make_bool("- Has PTZ", bool(has_ptz)), _make_bool("Active", True), _make_int32("DefaultTour", 0), _make_string("Description", f"{name_prefix}{gid}"), _make_bool("Enabled", True), _make_int32("GlobalID", gid), _make_int32("LocalID", local_id), _make_string("Name", f"{name_prefix}{gid}"), _make_bool("PictureDetection", True), _make_bool("ReactionRescann", True), _make_bool("ReactionSetup", True), _make_int32("SignalType", 1), _make_int32("StartupPosition", 0), _make_int32("StartupState", 1), _make_int32("StartupTour", 0), _make_bool("SyncDetection", True), _make_int32("ThresholdValue", 0), ] def _ensure_camera_inputs(tree, path, global_ids, name_prefix, ptz_by_id=None): folder = _find_folder(tree, path) if not folder: return ptz_by_id = ptz_by_id or {} target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0} existing = {} max_name = 0 max_local_id = 0 template_children = None for child in folder.get("children", []): if child.get("type") != "folder": continue name = child.get("name") try: max_name = max(max_name, int(name)) except (TypeError, ValueError): pass gid = _child_by_name(child, "GlobalID") lid = _child_by_name(child, "LocalID") if gid is not None: existing[int(gid.get("value", 0))] = child if lid is not None: max_local_id = max(max_local_id, int(lid.get("value", 0))) if template_children is None: template_children = child.get("children", []) if target_ids: kept = [] for child in folder.get("children", []): if child.get("type") != "folder": kept.append(child) continue gid_node = _child_by_name(child, "GlobalID") if gid_node is None: kept.append(child) continue try: gid_val = int(gid_node.get("value", 0)) except (TypeError, ValueError): kept.append(child) continue if gid_val in target_ids: kept.append(child) folder["children"] = kept for child in folder.get("children", []): if child.get("type") != "folder": continue gid = _child_by_name(child, "GlobalID") lid = _child_by_name(child, "LocalID") if gid is None or lid is None: continue required = _vx3_required_fields( int(gid.get("value", 0)), int(lid.get("value", 0)), name_prefix, ptz_by_id.get(int(gid.get("value", 0)), True), ) _ensure_children_fields(child.get("children", []), required) ptz_node = _child_by_name(child, "- Has PTZ") if ptz_node is not None: ptz_node["value"] = bool(ptz_by_id.get(int(gid.get("value", 0)), True)) enabled_node = _child_by_name(child, "Enabled") if enabled_node is not None: enabled_node["value"] = True active_node = _child_by_name(child, "Active") if active_node is not None: active_node["value"] = True for gid in sorted(target_ids): if gid in existing: continue max_name += 1 max_local_id += 1 if template_children: children = [] for node in template_children: node_copy = copy.deepcopy(node) if node_copy.get("name") == "GlobalID": node_copy["value"] = gid elif node_copy.get("name") == "LocalID": node_copy["value"] = max_local_id elif node_copy.get("name") in {"Name", "Description"}: node_copy["value"] = f"{name_prefix}{gid}" elif node_copy.get("name") == "- Has PTZ": node_copy["value"] = bool(ptz_by_id.get(gid, True)) elif node_copy.get("name") in {"Enabled", "Active"}: node_copy["value"] = True children.append(node_copy) else: has_ptz = bool(ptz_by_id.get(gid, True)) children = [ _make_bool("- Has PTZ", has_ptz), _make_bool("Active", True), _make_string("Description", f"{name_prefix}{gid}"), _make_bool("Enabled", True), _make_int32("GlobalID", gid), _make_int32("LocalID", max_local_id), _make_string("Name", f"{name_prefix}{gid}"), ] required = _vx3_required_fields(gid, max_local_id, name_prefix, ptz_by_id.get(gid, True)) _ensure_children_fields(children, required) folder.get("children", []).append( {"name": str(max_name), "type": "folder", "children": children} ) def ensure_vx3_video_inputs(tree, global_ids, ptz_by_id=None): _ensure_camera_inputs( tree, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoInputs"], global_ids, "Video input", ptz_by_id, ) _ensure_camera_inputs( tree, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoInputs"], global_ids, "Video input", ptz_by_id, ) def ensure_global_video_inputs(tree, global_ids, ptz_by_id=None): ptz_by_id = ptz_by_id or {} geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"]) if geviio: for child in geviio.get("children", []): if child.get("type") != "folder": continue name = str(child.get("name", "")) if not name.startswith("MBeg"): continue _ensure_video_inputs_path( tree, global_ids, ptz_by_id, ["Clients", "GeViIO", "GeViIO_01", name, "VideoInputs"], ) return _ensure_video_inputs_path( tree, global_ids, ptz_by_id, ["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoInputs"], ) _ensure_video_inputs_path( tree, global_ids, ptz_by_id, ["Clients", "GeViIO", "GeViIO_01", "MBeg2", "VideoInputs"], ) def prune_video_inputs(tree, global_ids): target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0} if not target_ids: return def walk(node): if not isinstance(node, dict) or node.get("type") != "folder": return if node.get("name") == "VideoInputs": kept = [] for child in node.get("children", []): if child.get("type") != "folder": kept.append(child) continue gid_node = _child_by_name(child, "GlobalID") if gid_node is None: kept.append(child) continue try: gid_val = int(gid_node.get("value", 0)) except (TypeError, ValueError): kept.append(child) continue if gid_val in target_ids: kept.append(child) node["children"] = kept ordered = [] for child in kept: if child.get("type") != "folder": continue gid_node = _child_by_name(child, "GlobalID") if gid_node is None: continue try: gid_val = int(gid_node.get("value", 0)) except (TypeError, ValueError): continue ordered.append((gid_val, child)) ordered.sort(key=lambda item: item[0]) local_id = 1 for _, child in ordered: lid_node = _child_by_name(child, "LocalID") if lid_node is not None: lid_node["value"] = local_id local_id += 1 for child in node.get("children", []): walk(child) walk(tree) def _set_child_value(folder, name, value): node = _child_by_name(folder, name) if node is not None: node["value"] = value def _replace_named_children(folder, prefix, replacements): children = folder.get("children", []) first_idx = None for idx, child in enumerate(children): if str(child.get("name", "")).startswith(prefix): first_idx = idx break if first_idx is None: first_idx = len(children) kept = [] for idx, child in enumerate(children): if idx < first_idx: kept.append(child) continue if str(child.get("name", "")).startswith(prefix): continue kept.append(child) folder["children"] = kept[:first_idx] + replacements + kept[first_idx:] def _set_mbeg_link_counts(tree, count): def walk(node): if not isinstance(node, dict): return if node.get("name") == "- Count Of MBeg Links": node["value"] = int(count) if node.get("type") == "folder": for child in node.get("children", []): walk(child) walk(tree) def _is_keyboard_interface(node): if not isinstance(node, dict) or node.get("type") != "folder": return False if _child_by_name(node, "SerialParameters") is None: return False if _child_by_name(node, "VideoInputs") is None: return False type_node = _child_by_name(node, "Type") try: return int(type_node.get("value", 0)) == 5 if type_node else False except (TypeError, ValueError): return False def _replace_keyboard_children(folder, new_children): children = folder.get("children", []) first_idx = None kept = [] for idx, child in enumerate(children): if _is_keyboard_interface(child): if first_idx is None: first_idx = idx continue kept.append(child) if first_idx is None: first_idx = len(kept) folder["children"] = kept[:first_idx] + new_children + kept[first_idx:] def _replace_keyboard_users(users, new_children): children = users.get("children", []) first_idx = None kept = [] for idx, child in enumerate(children): if child.get("type") == "folder" and child.get("name") not in { "GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3" }: if first_idx is None: first_idx = idx continue kept.append(child) if first_idx is None: first_idx = len(kept) users["children"] = kept[:first_idx] + new_children + kept[first_idx:] def apply_keyboards(tree, keyboards): if not isinstance(keyboards, list): return entries = [] for item in keyboards: if not isinstance(item, dict): continue description = str(item.get("description", "")).strip() host = str(item.get("host", "")).strip() port = item.get("port") if not host or port in (None, ""): continue try: port_int = int(port) except (TypeError, ValueError): continue entries.append({ "name": str(item.get("name", "")).strip(), "description": description, "host": host, "port": port_int, }) if not entries: return client = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"]) if not client: raise ValueError("Clients/GeViIO/GeViIO_01 not found in the .set file") users = _find_folder(tree, ["Users"]) if not users: raise ValueError("Users folder not found in the .set file") template = _child_by_name(client, "MBeg") if not template: raise ValueError("MBeg interface not found in the .set file") template_user = _child_by_name(users, "MBeg") if not template_user: raise ValueError("Users/MBeg not found in the .set file") base_interface_id = _child_by_name(template, "InterfaceID") try: base_interface_id = int(base_interface_id.get("value", 1)) if base_interface_id else 1 except (TypeError, ValueError): base_interface_id = 1 base_user_id = _child_by_name(template_user, "UserID") try: base_user_id = int(base_user_id.get("value", 1)) if base_user_id else 1 except (TypeError, ValueError): base_user_id = 1 existing_interfaces = [c for c in client.get("children", []) if _is_keyboard_interface(c)] existing_users = [ c for c in users.get("children", []) if c.get("type") == "folder" and c.get("name") not in { "GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3" } ] user_by_name = {c.get("name"): c for c in existing_users} host_to_iface = {} for iface in existing_interfaces: serial = _child_by_name(iface, "SerialParameters") host_node = _child_by_name(serial, "IpHost") if serial else None host = str(host_node.get("value", "")).strip() if host_node else "" if host: host_to_iface[host] = iface template_iface = existing_interfaces[0] if existing_interfaces else template template_user = user_by_name.get(template_iface.get("name")) or template_user virtual_kdec = _find_folder( tree, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec"] ) virtual_kdec_id = None if virtual_kdec: kid = _child_by_name(virtual_kdec, "InterfaceID") try: virtual_kdec_id = int(kid.get("value", 0)) if kid else None except (TypeError, ValueError): virtual_kdec_id = None virtual_user = _child_by_name(users, "VirtualKDec") virtual_user_id = None if virtual_user: uid = _child_by_name(virtual_user, "UserID") try: virtual_user_id = int(uid.get("value", 0)) if uid else None except (TypeError, ValueError): virtual_user_id = None start_interface_id = (virtual_kdec_id or base_interface_id) + 1 start_user_id = (virtual_user_id or base_user_id) + 1 new_ifaces = [] new_users = [] for idx, entry in enumerate(entries, start=1): iface = host_to_iface.get(entry["host"]) iface = copy.deepcopy(iface) if iface else copy.deepcopy(template_iface) iface_name = entry.get("name") or f"Mbeg{idx}" iface["name"] = iface_name _set_child_value(iface, "Enabled", True) _set_child_value(iface, "Protocol", 0) _set_child_value(iface, "MBegMode", True) _set_child_value(iface, "InterfaceID", start_interface_id + idx - 1) serial = _child_by_name(iface, "SerialParameters") if serial: _set_child_value(serial, "IpHost", entry["host"]) _set_child_value(serial, "IpPort", entry["port"]) new_ifaces.append(iface) user = user_by_name.get(iface_name) user = copy.deepcopy(user) if user else copy.deepcopy(template_user) user["name"] = iface_name _set_child_value(user, "Username", iface_name) _set_child_value(user, "Description", "") _set_child_value(user, "UserID", start_user_id + idx - 1) new_users.append(user) _replace_keyboard_children(client, new_ifaces) _replace_keyboard_users(users, new_users) if virtual_kdec: serial = _child_by_name(virtual_kdec, "SerialParameters") if serial: _set_child_value(serial, "IpHost", entries[0]["host"]) _set_child_value(serial, "IpPort", entries[0]["port"]) _set_child_value(serial, "ConnectionType", 1) _set_child_value(serial, "PortNr", 0) virtual_user = _child_by_name(users, "VirtualKDec") if virtual_user: _set_child_value(virtual_user, "Description", "") mbeg_entries = [c for c in client.get("children", []) if _is_keyboard_interface(c)] mbeg_entries.sort(key=lambda c: str(c.get("name", ""))) for iface in mbeg_entries: _set_child_value(iface, "MBegMode", True) mbeg_users = [ c for c in users.get("children", []) if c.get("type") == "folder" and c.get("name") not in { "GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3" } ] mbeg_users.sort(key=lambda c: str(c.get("name", ""))) total_mbeg = len(mbeg_entries) _set_mbeg_link_counts(tree, total_mbeg) def main(argv=None): parser = argparse.ArgumentParser(description="Convert GeViSoft .set files to/from JSON.") sub = parser.add_subparsers(dest="command", required=True) dump = sub.add_parser("dump", help="Parse a .set file into JSON.") dump.add_argument("input", help="Path to .set file") dump.add_argument("output", nargs="?", help="Optional path for JSON output (stdout if omitted)") dump.set_defaults(func=dump_command) build = sub.add_parser("build", help="Create a .set file from JSON produced by dump.") build.add_argument("input", help="Path to JSON file") build.add_argument("output", help="Path to write .set file") build.set_defaults(func=build_command) args = parser.parse_args(argv) args.func(args) if __name__ == "__main__": main()