Files
GeViSetWEB/backend/app/geviset.py
2026-02-10 09:58:11 +01:00

1399 lines
50 KiB
Python

import argparse
import copy
import json
import sys
from pathlib import Path
TYPE_NAMES = {
0: "folder",
1: "bool",
2: "byte",
3: "int16",
4: "int32",
5: "int64",
7: "string",
}
NAME_TO_TYPE = {v: k for k, v in TYPE_NAMES.items()}
INT_SIZES = {1: 1, 2: 1, 3: 2, 4: 4, 5: 8}
GSC_GLOBAL_KEYS = {
"DialUpCPADay0",
"DialUpCPADay1",
"DialUpCPADay2",
"DialUpCPADay3",
"DialUpCPADay4",
"DialUpCPADay5",
"DialUpCPADay6",
"DialUpCPAResponseWindow",
"DialUpCPAThreshold",
"DialUpLimitActionCount",
"DialUpLimitTimeDepth",
"DialUpWorkerThreads",
"FailoverActive",
}
GSC_ENTRY_FIELDS = {
"Alias",
"Host",
"User",
"Password",
"Enabled",
"DeactivateEcho",
"DeactivateLiveCheck",
"DialUpBroadcastAware",
"DialUpConnection",
"DialUpCPAConnection",
"DialUpCPAConnectionInterval",
"DialUpCPATimeSettings",
"DialUpKeepAlive",
"DialUpKeepAliveRetrigger",
"DialUpKeepAliveTime",
}
# ---------- Binary read/write ----------
def _read_node(blob: bytes, offset: int):
node_type = blob[offset]
try:
type_name = TYPE_NAMES[node_type]
except KeyError:
raise ValueError(f"Unsupported type byte {node_type} at offset {offset}")
name_len = blob[offset + 1]
name_start = offset + 2
name_end = name_start + name_len
name = blob[name_start:name_end].decode("utf-8")
cursor = name_end
if node_type == 0:
child_count = int.from_bytes(blob[cursor:cursor + 4], "little")
cursor += 4
children = []
for _ in range(child_count):
child, cursor = _read_node(blob, cursor)
children.append(child)
return {"name": name, "type": type_name, "children": children}, cursor
if node_type == 7:
str_len = int.from_bytes(blob[cursor:cursor + 2], "little")
cursor += 2
str_end = cursor + str_len
value = blob[cursor:str_end].decode("utf-8")
cursor = str_end
return {"name": name, "type": type_name, "value": value}, cursor
size = INT_SIZES[node_type]
value = int.from_bytes(blob[cursor:cursor + size], "little", signed=False)
cursor += size
if node_type == 1:
value = bool(value)
return {"name": name, "type": type_name, "value": value}, cursor
def load_set(path: Path):
data = path.read_bytes()
return load_set_bytes(data)
def load_set_bytes(data: bytes):
tree, cursor = _read_node(data, 0)
if cursor != len(data):
raise ValueError(f"Did not consume complete file, stopped at {cursor} of {len(data)} bytes")
return tree
def _write_node(node, out: bytearray):
try:
node_type = NAME_TO_TYPE[node["type"]]
except KeyError:
raise ValueError(f"Unknown node type {node.get('type')}")
name_bytes = node["name"].encode("utf-8")
if len(name_bytes) > 0xFF:
raise ValueError(f"Name too long: {node['name']}")
out.append(node_type)
out.append(len(name_bytes))
out.extend(name_bytes)
if node_type == 0:
children = node.get("children", [])
out.extend(len(children).to_bytes(4, "little"))
for child in children:
_write_node(child, out)
return
if node_type == 7:
value_bytes = str(node.get("value", "")).encode("utf-8")
if len(value_bytes) > 0xFFFF:
raise ValueError(f"String too long on node {node['name']}")
out.extend(len(value_bytes).to_bytes(2, "little"))
out.extend(value_bytes)
return
size = INT_SIZES[node_type]
value = node.get("value")
if value is None:
raise ValueError(f"Missing value for node {node['name']}")
if node_type == 1:
value = 1 if value else 0
max_val = (1 << (size * 8)) - 1
if not 0 <= int(value) <= max_val:
raise ValueError(f"Value out of range for node {node['name']}: {value}")
out.extend(int(value).to_bytes(size, "little", signed=False))
def save_set(tree, path: Path):
path.write_bytes(save_set_bytes(tree))
def save_set_bytes(tree) -> bytes:
out = bytearray()
_write_node(tree, out)
return bytes(out)
# ---------- CLI helpers ----------
def dump_command(args):
tree = load_set(Path(args.input))
payload = tree
if args.output:
Path(args.output).write_text(json.dumps(payload, indent=2))
else:
json.dump(payload, sys.stdout, indent=2)
sys.stdout.write("\n")
def build_command(args):
payload = json.loads(Path(args.input).read_text())
save_set(payload, Path(args.output))
# ---------- Mapping rules helpers ----------
def _find_folder(root, path):
node = root
for name in path:
if not isinstance(node, dict) or node.get("type") != "folder":
return None
node = next((c for c in node.get("children", []) if c.get("name") == name), None)
if node is None:
return None
return node
def _child_by_name(folder, name):
if folder is None or folder.get("type") != "folder":
return None
return next((c for c in folder.get("children", []) if c.get("name") == name), None)
def _parse_output(folder):
children = folder.get("children", [])
name_node = _child_by_name(folder, "@")
if _child_by_name(folder, "GscAction"):
action_key = "GscAction"
elif _child_by_name(folder, "GCoreAction"):
action_key = "GCoreAction"
else:
action_key = None
if _child_by_name(folder, "GscServer"):
server_key = "GscServer"
elif _child_by_name(folder, "GCoreServer"):
server_key = "GCoreServer"
else:
server_key = None
order = [c.get("name") for c in children]
output = {
"id": folder.get("name"),
"name": name_node.get("value") if name_node else "",
"flags": {
"primary": (_child_by_name(folder, "@!") or {}).get("value", 0),
"secondary": (_child_by_name(folder, "@@") or {}).get("value", 0),
},
"actionField": action_key,
"serverField": server_key,
"_order": order,
}
if action_key:
output["action"] = (_child_by_name(folder, action_key) or {}).get("value", "")
if server_key:
output["server"] = (_child_by_name(folder, server_key) or {}).get("value", "")
used = {"@", "@!", "@@", action_key, server_key}
extras = [copy.deepcopy(c) for c in children if c.get("name") not in used]
if extras:
output["extras"] = extras
return output
def _parse_rule(folder):
children = folder.get("children", [])
order = [c.get("name") for c in children]
inputs = {
"name": (_child_by_name(folder, "@") or {}).get("value", ""),
"flags": {
"primary": (_child_by_name(folder, "@!") or {}).get("value", 0),
"secondary": (_child_by_name(folder, "@@") or {}).get("value", 0),
}
}
video_input_field = _child_by_name(folder, "VideoInput")
if video_input_field is not None:
inputs["videoInputId"] = video_input_field.get("value", 0)
temp_field = _child_by_name(folder, "Temp")
if temp_field is not None:
inputs["temp"] = temp_field.get("value", 0)
filters = [copy.deepcopy(c) for c in children if isinstance(c.get("name"), str) and c["name"].startswith(".")]
other_fields = [
copy.deepcopy(c)
for c in children
if c.get("name") not in {".Temp", ".VideoInput", "@", "@!", "@@", "Rules"} and not c.get("name", "").startswith(".")
]
rules_folder = _child_by_name(folder, "Rules")
outputs = []
if rules_folder and rules_folder.get("type") == "folder":
for child in rules_folder.get("children", []):
if child.get("type") == "folder":
outputs.append(_parse_output(child))
rule = {"id": folder.get("name"), "input": inputs, "outputs": outputs, "_order": order}
if filters:
rule["filters"] = filters
if other_fields:
rule["fields"] = other_fields
return rule
def _make_bool(name, value):
return {"name": name, "type": "bool", "value": bool(value)}
def _make_int32(name, value):
return {"name": name, "type": "int32", "value": int(value)}
def _make_string(name, value):
return {"name": name, "type": "string", "value": str(value)}
def _build_output(output):
action_field = output.get("actionField")
server_field = output.get("serverField")
children = [
_make_string("@", output.get("name", "")),
_make_int32("@!", output.get("flags", {}).get("primary", 0)),
_make_int32("@@", output.get("flags", {}).get("secondary", 0)),
]
if action_field:
children.append(_make_string(action_field, output.get("action", "")))
if server_field:
children.append(_make_string(server_field, output.get("server", "")))
for extra in output.get("extras", []):
children.append(copy.deepcopy(extra))
order = output.get("_order")
if order:
children = _apply_order(children, order)
return {"name": output.get("id", ""), "type": "folder", "children": children}
def _build_rule(rule):
inp = rule.get("input", {})
children = [
_make_string("@", inp.get("name", "")),
_make_int32("@!", inp.get("flags", {}).get("primary", 0)),
_make_int32("@@", inp.get("flags", {}).get("secondary", 0)),
]
for filt in rule.get("filters", []):
children.append(copy.deepcopy(filt))
# Outputs
outputs_children = []
for out in rule.get("outputs", []):
outputs_children.append(_build_output(out))
children.append({"name": "Rules", "type": "folder", "children": outputs_children})
# Tail fields
for field in rule.get("fields", []):
fcopy = copy.deepcopy(field)
if fcopy.get("name") == "VideoInput" and "videoInputId" in inp:
fcopy["value"] = inp["videoInputId"]
if fcopy.get("name") == "Temp" and "temp" in inp:
fcopy["value"] = inp["temp"]
children.append(fcopy)
order = rule.get("_order")
if order:
children = _apply_order(children, order)
return {"name": rule.get("id", ""), "type": "folder", "children": children}
def _apply_order(children, order):
if not order:
return children
name_to_child = {c.get("name"): c for c in children}
ordered = []
used = set()
for name in order:
child = name_to_child.get(name)
if child is not None:
ordered.append(child)
used.add(name)
for c in children:
if c.get("name") not in used:
ordered.append(c)
return ordered
def extract_mapping(tree):
mapping_root = _find_folder(tree, ["MappingRules"])
if not mapping_root:
raise ValueError("MappingRules folder not found in the .set file")
rules = []
extras = []
order = [c.get("name") for c in mapping_root.get("children", [])]
for child in mapping_root.get("children", []):
if child.get("type") == "folder":
rules.append(_parse_rule(child))
else:
extras.append(copy.deepcopy(child))
return {"rules": rules, "extras": extras, "_order": order}
def apply_mapping(tree, mapping):
mapping_root = _find_folder(tree, ["MappingRules"])
if not mapping_root:
raise ValueError("MappingRules folder not found in the .set file")
new_children = []
for rule in mapping.get("rules", []):
new_children.append(_build_rule(rule))
for extra in mapping.get("extras", []):
new_children.append(copy.deepcopy(extra))
order = mapping.get("_order")
if order:
new_children = _apply_order(new_children, order)
mapping_root["children"] = new_children
# ---------- GCore server helpers ----------
def extract_gcore(tree):
gcore_root = _find_folder(tree, ["GeViGCoreServer"])
if not gcore_root:
raise ValueError("GeViGCoreServer folder not found in the .set file")
order = [c.get("name") for c in gcore_root.get("children", [])]
servers = []
extras = []
for child in gcore_root.get("children", []):
if child.get("type") == "folder":
servers.append(_parse_gcore_entry(child))
else:
extras.append(copy.deepcopy(child))
return {"servers": servers, "extras": extras, "_order": order}
def _parse_gcore_entry(folder):
children = folder.get("children", [])
order = [c.get("name") for c in children]
entry = {
"id": folder.get("name"),
"alias": (_child_by_name(folder, "Alias") or {}).get("value", ""),
"host": (_child_by_name(folder, "Host") or {}).get("value", ""),
"user": (_child_by_name(folder, "User") or {}).get("value", ""),
"password": (_child_by_name(folder, "Password") or {}).get("value", ""),
"enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)),
"deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)),
"deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)),
"_order": order,
}
used = {"Alias", "Host", "User", "Password", "Enabled", "DeactivateEcho", "DeactivateLiveCheck"}
extras = [copy.deepcopy(c) for c in children if c.get("name") not in used]
if extras:
entry["extras"] = extras
return entry
def _build_gcore_entry(entry):
children = [
_make_string("Alias", entry.get("alias", "")),
_make_bool("DeactivateEcho", entry.get("deactivateEcho", False)),
_make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)),
_make_bool("Enabled", entry.get("enabled", False)),
_make_string("Host", entry.get("host", "")),
_make_string("Password", entry.get("password", "")),
_make_string("User", entry.get("user", "")),
]
for extra in entry.get("extras", []):
children.append(copy.deepcopy(extra))
order = entry.get("_order")
if order:
children = _apply_order(children, order)
return {"name": entry.get("id", ""), "type": "folder", "children": children}
def apply_gcore(tree, gcore):
gcore_root = _find_folder(tree, ["GeViGCoreServer"])
if not gcore_root:
raise ValueError("GeViGCoreServer folder not found in the .set file")
children = []
for entry in gcore.get("servers", []):
children.append(_build_gcore_entry(entry))
for extra in gcore.get("extras", []):
children.append(copy.deepcopy(extra))
order = gcore.get("_order")
if order:
children = _apply_order(children, order)
gcore_root["children"] = children
# ---------- GeViScope server helpers ----------
def extract_gsc(tree):
gsc_root = _find_folder(tree, ["GeViGscServer"])
if not gsc_root:
raise ValueError("GeViGscServer folder not found in the .set file")
order = [c.get("name") for c in gsc_root.get("children", [])]
servers = []
extras = []
global_settings = {}
for child in gsc_root.get("children", []):
if child.get("type") == "folder":
servers.append(_parse_gsc_entry(child))
else:
if child.get("name") in GSC_GLOBAL_KEYS:
global_settings[child.get("name")] = child.get("value")
else:
extras.append(copy.deepcopy(child))
return {
"servers": servers,
"globalSettings": global_settings,
"extras": extras,
"_order": order,
}
def _parse_gsc_entry(folder):
children = folder.get("children", [])
order = [c.get("name") for c in children]
entry = {
"id": folder.get("name"),
"alias": (_child_by_name(folder, "Alias") or {}).get("value", ""),
"host": (_child_by_name(folder, "Host") or {}).get("value", ""),
"user": (_child_by_name(folder, "User") or {}).get("value", ""),
"password": (_child_by_name(folder, "Password") or {}).get("value", ""),
"enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)),
"deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)),
"deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)),
"dialUpBroadcastAware": bool((_child_by_name(folder, "DialUpBroadcastAware") or {}).get("value", False)),
"dialUpConnection": bool((_child_by_name(folder, "DialUpConnection") or {}).get("value", False)),
"dialUpCPAConnection": bool((_child_by_name(folder, "DialUpCPAConnection") or {}).get("value", False)),
"dialUpCPAConnectionInterval": (_child_by_name(folder, "DialUpCPAConnectionInterval") or {}).get("value", 0),
"dialUpCPATimeSettings": (_child_by_name(folder, "DialUpCPATimeSettings") or {}).get("value", 0),
"dialUpKeepAlive": bool((_child_by_name(folder, "DialUpKeepAlive") or {}).get("value", False)),
"dialUpKeepAliveRetrigger": bool((_child_by_name(folder, "DialUpKeepAliveRetrigger") or {}).get("value", False)),
"dialUpKeepAliveTime": (_child_by_name(folder, "DialUpKeepAliveTime") or {}).get("value", 0),
"_order": order,
}
extras = [copy.deepcopy(c) for c in children if c.get("name") not in GSC_ENTRY_FIELDS]
if extras:
entry["extras"] = extras
return entry
def _build_gsc_entry(entry):
children = [
_make_string("Alias", entry.get("alias", "")),
_make_bool("DeactivateEcho", entry.get("deactivateEcho", False)),
_make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)),
_make_bool("Enabled", entry.get("enabled", False)),
_make_string("Host", entry.get("host", "")),
_make_string("Password", entry.get("password", "")),
_make_string("User", entry.get("user", "")),
_make_bool("DialUpBroadcastAware", entry.get("dialUpBroadcastAware", False)),
_make_bool("DialUpConnection", entry.get("dialUpConnection", False)),
_make_bool("DialUpCPAConnection", entry.get("dialUpCPAConnection", False)),
_make_int32("DialUpCPAConnectionInterval", entry.get("dialUpCPAConnectionInterval", 0)),
_make_int32("DialUpCPATimeSettings", entry.get("dialUpCPATimeSettings", 0)),
_make_bool("DialUpKeepAlive", entry.get("dialUpKeepAlive", False)),
_make_bool("DialUpKeepAliveRetrigger", entry.get("dialUpKeepAliveRetrigger", False)),
_make_int32("DialUpKeepAliveTime", entry.get("dialUpKeepAliveTime", 0)),
]
for extra in entry.get("extras", []):
children.append(copy.deepcopy(extra))
order = entry.get("_order")
if order:
children = _apply_order(children, order)
return {"name": entry.get("id", ""), "type": "folder", "children": children}
def apply_gsc(tree, gsc):
gsc_root = _find_folder(tree, ["GeViGscServer"])
if not gsc_root:
raise ValueError("GeViGscServer folder not found in the .set file")
children = []
for entry in gsc.get("servers", []):
children.append(_build_gsc_entry(entry))
for key, value in (gsc.get("globalSettings") or {}).items():
if isinstance(value, bool):
children.append(_make_bool(key, value))
else:
children.append(_make_int32(key, value))
for extra in gsc.get("extras", []):
children.append(copy.deepcopy(extra))
order = gsc.get("_order")
if order:
children = _apply_order(children, order)
gsc_root["children"] = children
def extract_video_outputs(tree):
paths = [
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"],
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"],
["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoOutputs"],
]
folder = None
for path in paths:
folder = _find_folder(tree, path)
if folder:
break
if not folder:
return []
outputs = []
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
name = _child_by_name(child, "Name")
desc = _child_by_name(child, "Description")
outputs.append({
"globalId": gid.get("value") if gid else None,
"localId": lid.get("value") if lid else None,
"name": name.get("value") if name else "",
"description": desc.get("value") if desc else "",
})
outputs.sort(key=lambda x: (x["localId"] or 0, x["globalId"] or 0))
return outputs
def _build_video_output_children(template_children, output):
children = []
has_global = False
has_local = False
has_name = False
has_desc = False
if template_children:
for node in template_children:
node_copy = copy.deepcopy(node)
if node_copy.get("name") == "GlobalID":
node_copy["value"] = output.get("globalId", node_copy.get("value"))
has_global = True
elif node_copy.get("name") == "LocalID":
node_copy["value"] = output.get("localId", node_copy.get("value"))
has_local = True
elif node_copy.get("name") == "Name":
node_copy["value"] = output.get("name", node_copy.get("value", ""))
has_name = True
elif node_copy.get("name") == "Description":
node_copy["value"] = output.get("description", node_copy.get("value", ""))
has_desc = True
elif node_copy.get("name") == "Enabled":
node_copy["value"] = True
children.append(node_copy)
else:
children = [
_make_bool("Enabled", True),
_make_int32("GlobalID", output.get("globalId", 0)),
_make_int32("LocalID", output.get("localId", 0)),
_make_string("Name", output.get("name", "")),
_make_string("Description", output.get("description", "")),
]
has_global = has_local = has_name = has_desc = True
if not has_global:
children.append(_make_int32("GlobalID", output.get("globalId", 0)))
if not has_local:
children.append(_make_int32("LocalID", output.get("localId", 0)))
if not has_name and output.get("name"):
children.append(_make_string("Name", output.get("name", "")))
if not has_desc and output.get("description"):
children.append(_make_string("Description", output.get("description", "")))
return children
def _apply_video_outputs_path(tree, outputs, path):
folder = _find_folder(tree, path)
if not folder:
return
template_children = None
for child in folder.get("children", []):
if child.get("type") == "folder":
template_children = child.get("children", [])
break
new_children = []
for idx, output in enumerate(outputs, start=1):
name = str(idx)
new_children.append({
"name": name,
"type": "folder",
"children": _build_video_output_children(template_children, output),
})
folder["children"] = new_children
def apply_video_outputs(tree, outputs):
if not isinstance(outputs, list) or not outputs:
return
cleaned = []
for row in outputs:
if not isinstance(row, dict):
continue
local_id = row.get("localId")
global_id = row.get("globalId")
if local_id is None and global_id is None:
continue
cleaned.append({
"localId": local_id,
"globalId": global_id,
"name": row.get("name", ""),
"description": row.get("description", ""),
})
if not cleaned:
return
cleaned.sort(key=lambda x: (x.get("localId") or 0, x.get("globalId") or 0))
_apply_video_outputs_path(
tree,
cleaned,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"],
)
_apply_video_outputs_path(
tree,
cleaned,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"],
)
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if geviio:
for child in geviio.get("children", []):
if _is_keyboard_interface(child):
_apply_video_outputs_path(
tree,
cleaned,
["Clients", "GeViIO", "GeViIO_01", child.get("name"), "VideoOutputs"],
)
def extract_logging(tree):
log_root = _find_folder(tree, ["Globals", "LogData"])
if not log_root:
return []
rows = []
for child in log_root.get("children", []):
if child.get("type") != "folder":
continue
action_name = str(child.get("name", ""))
db = _child_by_name(child, "DB")
user = _child_by_name(child, "User")
com = _child_by_name(child, "Com")
rows.append({
"actionName": action_name,
"db": bool(db.get("value")) if db else False,
"user": bool(user.get("value")) if user else False,
"com": bool(com.get("value")) if com else False,
})
return rows
def apply_logging(tree, rows):
if not isinstance(rows, list) or not rows:
return
globals_folder = _find_folder(tree, ["Globals"])
if not globals_folder:
return
log_root = _child_by_name(globals_folder, "LogData")
if not log_root:
log_root = {"name": "LogData", "type": "folder", "children": []}
globals_folder.setdefault("children", []).append(log_root)
existing = {}
for child in log_root.get("children", []):
if child.get("type") == "folder":
existing[str(child.get("name", ""))] = child
for row in rows:
if not isinstance(row, dict):
continue
action_name = str(row.get("actionName", "")).strip()
if not action_name:
continue
target = existing.get(action_name)
if not target:
target = {"name": action_name, "type": "folder", "children": []}
log_root.setdefault("children", []).append(target)
existing[action_name] = target
children = target.setdefault("children", [])
_ensure_children_fields(
children,
[
_make_bool("Com", bool(row.get("com"))),
_make_bool("DB", bool(row.get("db"))),
_make_bool("User", bool(row.get("user"))),
],
)
for child in children:
if child.get("name") == "Com":
child["value"] = bool(row.get("com"))
elif child.get("name") == "DB":
child["value"] = bool(row.get("db"))
elif child.get("name") == "User":
child["value"] = bool(row.get("user"))
def _ensure_video_inputs_path(tree, global_ids, ptz_by_id, path):
folder = _find_folder(tree, path)
if not folder:
return
ptz_by_id = ptz_by_id or {}
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
existing = {}
max_name = 0
max_local_id = 0
template_children = None
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
name = child.get("name")
try:
max_name = max(max_name, int(name))
except (TypeError, ValueError):
pass
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
if gid is not None:
gid_val = int(gid.get("value", 0))
existing[gid_val] = child
_ensure_children_fields(
child.get("children", []),
[
_make_bool("Active", True),
_make_string("Name", f"Video input{gid_val}"),
_make_string("Description", f"Video input{gid_val}"),
],
)
if gid_val in ptz_by_id:
ptz_value = bool(ptz_by_id[gid_val])
ptz_node = _child_by_name(child, "PTZ")
if ptz_node is not None:
ptz_node["value"] = ptz_value
ptz_link = _child_by_name(child, "- PTZ Linked")
if ptz_link is not None:
ptz_link["value"] = True
enabled_node = _child_by_name(child, "Enabled")
if enabled_node is not None:
enabled_node["value"] = True
active_node = _child_by_name(child, "Active")
if active_node is not None:
active_node["value"] = True
if lid is not None:
max_local_id = max(max_local_id, int(lid.get("value", 0)))
if template_children is None:
template_children = child.get("children", [])
if target_ids:
kept = []
for child in folder.get("children", []):
if child.get("type") != "folder":
kept.append(child)
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
kept.append(child)
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
kept.append(child)
continue
if gid_val in target_ids:
kept.append(child)
folder["children"] = kept
# Reassign LocalID sequentially for consistency.
ordered = []
for child in kept:
if child.get("type") != "folder":
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
continue
ordered.append((gid_val, child))
ordered.sort(key=lambda item: item[0])
local_id = 1
for _, child in ordered:
lid_node = _child_by_name(child, "LocalID")
if lid_node is not None:
lid_node["value"] = local_id
local_id += 1
for gid in sorted(target_ids):
if gid in existing:
continue
max_name += 1
max_local_id += 1
if template_children:
children = []
has_name = False
has_desc = False
for node in template_children:
node_copy = copy.deepcopy(node)
if node_copy.get("name") == "GlobalID":
node_copy["value"] = gid
elif node_copy.get("name") == "LocalID":
node_copy["value"] = max_local_id
elif node_copy.get("name") == "Name":
node_copy["value"] = f"Video input{gid}"
has_name = True
elif node_copy.get("name") == "Description":
node_copy["value"] = f"Video input{gid}"
has_desc = True
elif node_copy.get("name") in ["PTZ", "Enabled", "- Linked", "- PTZ Linked", "Active"]:
if node_copy.get("name") == "PTZ":
node_copy["value"] = bool(ptz_by_id.get(gid, True))
else:
node_copy["value"] = True
children.append(node_copy)
if not has_name:
children.append(_make_string("Name", f"Video input{gid}"))
if not has_desc:
children.append(_make_string("Description", f"Video input{gid}"))
else:
ptz_value = bool(ptz_by_id.get(gid, True))
children = [
_make_bool("- Linked", True),
_make_bool("- PTZ Linked", True),
_make_bool("Active", True),
_make_bool("Enabled", True),
_make_int32("GlobalID", gid),
_make_int32("LocalID", max_local_id),
_make_string("Name", f"Video input{gid}"),
_make_string("Description", f"Video input{gid}"),
_make_bool("PTZ", ptz_value),
]
folder.get("children", []).append(
{"name": str(max_name), "type": "folder", "children": children}
)
def ensure_video_inputs(tree, global_ids, ptz_by_id=None):
ptz_by_id = ptz_by_id or {}
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec", "VideoInputs"],
)
def _ensure_children_fields(children, required):
existing = {c.get("name") for c in children}
for node in required:
if node.get("name") not in existing:
children.append(node)
def _vx3_required_fields(gid, local_id, name_prefix, has_ptz=True):
return [
_make_int32("- Count Of MBeg Links", 3),
_make_bool("- Has PTZ", bool(has_ptz)),
_make_bool("Active", True),
_make_int32("DefaultTour", 0),
_make_string("Description", f"{name_prefix}{gid}"),
_make_bool("Enabled", True),
_make_int32("GlobalID", gid),
_make_int32("LocalID", local_id),
_make_string("Name", f"{name_prefix}{gid}"),
_make_bool("PictureDetection", True),
_make_bool("ReactionRescann", True),
_make_bool("ReactionSetup", True),
_make_int32("SignalType", 1),
_make_int32("StartupPosition", 0),
_make_int32("StartupState", 1),
_make_int32("StartupTour", 0),
_make_bool("SyncDetection", True),
_make_int32("ThresholdValue", 0),
]
def _ensure_camera_inputs(tree, path, global_ids, name_prefix, ptz_by_id=None):
folder = _find_folder(tree, path)
if not folder:
return
ptz_by_id = ptz_by_id or {}
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
existing = {}
max_name = 0
max_local_id = 0
template_children = None
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
name = child.get("name")
try:
max_name = max(max_name, int(name))
except (TypeError, ValueError):
pass
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
if gid is not None:
existing[int(gid.get("value", 0))] = child
if lid is not None:
max_local_id = max(max_local_id, int(lid.get("value", 0)))
if template_children is None:
template_children = child.get("children", [])
if target_ids:
kept = []
for child in folder.get("children", []):
if child.get("type") != "folder":
kept.append(child)
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
kept.append(child)
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
kept.append(child)
continue
if gid_val in target_ids:
kept.append(child)
folder["children"] = kept
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
if gid is None or lid is None:
continue
required = _vx3_required_fields(
int(gid.get("value", 0)),
int(lid.get("value", 0)),
name_prefix,
ptz_by_id.get(int(gid.get("value", 0)), True),
)
_ensure_children_fields(child.get("children", []), required)
ptz_node = _child_by_name(child, "- Has PTZ")
if ptz_node is not None:
ptz_node["value"] = bool(ptz_by_id.get(int(gid.get("value", 0)), True))
enabled_node = _child_by_name(child, "Enabled")
if enabled_node is not None:
enabled_node["value"] = True
active_node = _child_by_name(child, "Active")
if active_node is not None:
active_node["value"] = True
for gid in sorted(target_ids):
if gid in existing:
continue
max_name += 1
max_local_id += 1
if template_children:
children = []
for node in template_children:
node_copy = copy.deepcopy(node)
if node_copy.get("name") == "GlobalID":
node_copy["value"] = gid
elif node_copy.get("name") == "LocalID":
node_copy["value"] = max_local_id
elif node_copy.get("name") in {"Name", "Description"}:
node_copy["value"] = f"{name_prefix}{gid}"
elif node_copy.get("name") == "- Has PTZ":
node_copy["value"] = bool(ptz_by_id.get(gid, True))
elif node_copy.get("name") in {"Enabled", "Active"}:
node_copy["value"] = True
children.append(node_copy)
else:
has_ptz = bool(ptz_by_id.get(gid, True))
children = [
_make_bool("- Has PTZ", has_ptz),
_make_bool("Active", True),
_make_string("Description", f"{name_prefix}{gid}"),
_make_bool("Enabled", True),
_make_int32("GlobalID", gid),
_make_int32("LocalID", max_local_id),
_make_string("Name", f"{name_prefix}{gid}"),
]
required = _vx3_required_fields(gid, max_local_id, name_prefix, ptz_by_id.get(gid, True))
_ensure_children_fields(children, required)
folder.get("children", []).append(
{"name": str(max_name), "type": "folder", "children": children}
)
def ensure_vx3_video_inputs(tree, global_ids, ptz_by_id=None):
_ensure_camera_inputs(
tree,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoInputs"],
global_ids,
"Video input",
ptz_by_id,
)
_ensure_camera_inputs(
tree,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoInputs"],
global_ids,
"Video input",
ptz_by_id,
)
def ensure_global_video_inputs(tree, global_ids, ptz_by_id=None):
ptz_by_id = ptz_by_id or {}
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if geviio:
for child in geviio.get("children", []):
if child.get("type") != "folder":
continue
name = str(child.get("name", ""))
if not name.startswith("MBeg"):
continue
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_01", name, "VideoInputs"],
)
return
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoInputs"],
)
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_01", "MBeg2", "VideoInputs"],
)
def prune_video_inputs(tree, global_ids):
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
if not target_ids:
return
def walk(node):
if not isinstance(node, dict) or node.get("type") != "folder":
return
if node.get("name") == "VideoInputs":
kept = []
for child in node.get("children", []):
if child.get("type") != "folder":
kept.append(child)
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
kept.append(child)
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
kept.append(child)
continue
if gid_val in target_ids:
kept.append(child)
node["children"] = kept
ordered = []
for child in kept:
if child.get("type") != "folder":
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
continue
ordered.append((gid_val, child))
ordered.sort(key=lambda item: item[0])
local_id = 1
for _, child in ordered:
lid_node = _child_by_name(child, "LocalID")
if lid_node is not None:
lid_node["value"] = local_id
local_id += 1
for child in node.get("children", []):
walk(child)
walk(tree)
def _set_child_value(folder, name, value):
node = _child_by_name(folder, name)
if node is not None:
node["value"] = value
def _replace_named_children(folder, prefix, replacements):
children = folder.get("children", [])
first_idx = None
for idx, child in enumerate(children):
if str(child.get("name", "")).startswith(prefix):
first_idx = idx
break
if first_idx is None:
first_idx = len(children)
kept = []
for idx, child in enumerate(children):
if idx < first_idx:
kept.append(child)
continue
if str(child.get("name", "")).startswith(prefix):
continue
kept.append(child)
folder["children"] = kept[:first_idx] + replacements + kept[first_idx:]
def _set_mbeg_link_counts(tree, count):
def walk(node):
if not isinstance(node, dict):
return
if node.get("name") == "- Count Of MBeg Links":
node["value"] = int(count)
if node.get("type") == "folder":
for child in node.get("children", []):
walk(child)
walk(tree)
def _is_keyboard_interface(node):
if not isinstance(node, dict) or node.get("type") != "folder":
return False
if _child_by_name(node, "SerialParameters") is None:
return False
if _child_by_name(node, "VideoInputs") is None:
return False
type_node = _child_by_name(node, "Type")
try:
return int(type_node.get("value", 0)) == 5 if type_node else False
except (TypeError, ValueError):
return False
def _replace_keyboard_children(folder, new_children):
children = folder.get("children", [])
first_idx = None
kept = []
for idx, child in enumerate(children):
if _is_keyboard_interface(child):
if first_idx is None:
first_idx = idx
continue
kept.append(child)
if first_idx is None:
first_idx = len(kept)
folder["children"] = kept[:first_idx] + new_children + kept[first_idx:]
def _replace_keyboard_users(users, new_children):
children = users.get("children", [])
first_idx = None
kept = []
for idx, child in enumerate(children):
if child.get("type") == "folder" and child.get("name") not in {
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
}:
if first_idx is None:
first_idx = idx
continue
kept.append(child)
if first_idx is None:
first_idx = len(kept)
users["children"] = kept[:first_idx] + new_children + kept[first_idx:]
def apply_keyboards(tree, keyboards):
if not isinstance(keyboards, list):
return
entries = []
for item in keyboards:
if not isinstance(item, dict):
continue
description = str(item.get("description", "")).strip()
host = str(item.get("host", "")).strip()
port = item.get("port")
if not host or port in (None, ""):
continue
try:
port_int = int(port)
except (TypeError, ValueError):
continue
entries.append({
"name": str(item.get("name", "")).strip(),
"description": description,
"host": host,
"port": port_int,
})
if not entries:
return
client = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if not client:
raise ValueError("Clients/GeViIO/GeViIO_01 not found in the .set file")
users = _find_folder(tree, ["Users"])
if not users:
raise ValueError("Users folder not found in the .set file")
template = _child_by_name(client, "MBeg")
if not template:
raise ValueError("MBeg interface not found in the .set file")
template_user = _child_by_name(users, "MBeg")
if not template_user:
raise ValueError("Users/MBeg not found in the .set file")
base_interface_id = _child_by_name(template, "InterfaceID")
try:
base_interface_id = int(base_interface_id.get("value", 1)) if base_interface_id else 1
except (TypeError, ValueError):
base_interface_id = 1
base_user_id = _child_by_name(template_user, "UserID")
try:
base_user_id = int(base_user_id.get("value", 1)) if base_user_id else 1
except (TypeError, ValueError):
base_user_id = 1
existing_interfaces = [c for c in client.get("children", []) if _is_keyboard_interface(c)]
existing_users = [
c for c in users.get("children", [])
if c.get("type") == "folder" and c.get("name") not in {
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
}
]
user_by_name = {c.get("name"): c for c in existing_users}
host_to_iface = {}
for iface in existing_interfaces:
serial = _child_by_name(iface, "SerialParameters")
host_node = _child_by_name(serial, "IpHost") if serial else None
host = str(host_node.get("value", "")).strip() if host_node else ""
if host:
host_to_iface[host] = iface
template_iface = existing_interfaces[0] if existing_interfaces else template
template_user = user_by_name.get(template_iface.get("name")) or template_user
virtual_kdec = _find_folder(
tree, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec"]
)
virtual_kdec_id = None
if virtual_kdec:
kid = _child_by_name(virtual_kdec, "InterfaceID")
try:
virtual_kdec_id = int(kid.get("value", 0)) if kid else None
except (TypeError, ValueError):
virtual_kdec_id = None
virtual_user = _child_by_name(users, "VirtualKDec")
virtual_user_id = None
if virtual_user:
uid = _child_by_name(virtual_user, "UserID")
try:
virtual_user_id = int(uid.get("value", 0)) if uid else None
except (TypeError, ValueError):
virtual_user_id = None
start_interface_id = (virtual_kdec_id or base_interface_id) + 1
start_user_id = (virtual_user_id or base_user_id) + 1
new_ifaces = []
new_users = []
for idx, entry in enumerate(entries, start=1):
iface = host_to_iface.get(entry["host"])
iface = copy.deepcopy(iface) if iface else copy.deepcopy(template_iface)
iface_name = entry.get("name") or f"Mbeg{idx}"
iface["name"] = iface_name
_set_child_value(iface, "Enabled", True)
_set_child_value(iface, "Protocol", 0)
_set_child_value(iface, "MBegMode", True)
_set_child_value(iface, "InterfaceID", start_interface_id + idx - 1)
serial = _child_by_name(iface, "SerialParameters")
if serial:
_set_child_value(serial, "IpHost", entry["host"])
_set_child_value(serial, "IpPort", entry["port"])
new_ifaces.append(iface)
user = user_by_name.get(iface_name)
user = copy.deepcopy(user) if user else copy.deepcopy(template_user)
user["name"] = iface_name
_set_child_value(user, "Username", iface_name)
_set_child_value(user, "Description", "")
_set_child_value(user, "UserID", start_user_id + idx - 1)
new_users.append(user)
_replace_keyboard_children(client, new_ifaces)
_replace_keyboard_users(users, new_users)
if virtual_kdec:
serial = _child_by_name(virtual_kdec, "SerialParameters")
if serial:
_set_child_value(serial, "IpHost", entries[0]["host"])
_set_child_value(serial, "IpPort", entries[0]["port"])
_set_child_value(serial, "ConnectionType", 1)
_set_child_value(serial, "PortNr", 0)
virtual_user = _child_by_name(users, "VirtualKDec")
if virtual_user:
_set_child_value(virtual_user, "Description", "")
mbeg_entries = [c for c in client.get("children", []) if _is_keyboard_interface(c)]
mbeg_entries.sort(key=lambda c: str(c.get("name", "")))
for iface in mbeg_entries:
_set_child_value(iface, "MBegMode", True)
mbeg_users = [
c for c in users.get("children", [])
if c.get("type") == "folder" and c.get("name") not in {
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
}
]
mbeg_users.sort(key=lambda c: str(c.get("name", "")))
total_mbeg = len(mbeg_entries)
_set_mbeg_link_counts(tree, total_mbeg)
def main(argv=None):
parser = argparse.ArgumentParser(description="Convert GeViSoft .set files to/from JSON.")
sub = parser.add_subparsers(dest="command", required=True)
dump = sub.add_parser("dump", help="Parse a .set file into JSON.")
dump.add_argument("input", help="Path to .set file")
dump.add_argument("output", nargs="?", help="Optional path for JSON output (stdout if omitted)")
dump.set_defaults(func=dump_command)
build = sub.add_parser("build", help="Create a .set file from JSON produced by dump.")
build.add_argument("input", help="Path to JSON file")
build.add_argument("output", help="Path to write .set file")
build.set_defaults(func=build_command)
args = parser.parse_args(argv)
args.func(args)
if __name__ == "__main__":
main()