Files
GeViSetWEB/backend/app/geviset.py
Docker Config Backup 18084ec9d7 fix: clear PTZ flag on all MBeg keyboard video inputs
Keyboard interfaces under Clients/GeViIO/GeViIO_01 should never have the
PTZ flag set on their video inputs. Previously PTZ cameras (e.g. 101027)
were exported with PTZ=True on every keyboard, unlike fixed cameras.

Adds geviset.disable_keyboard_ptz() and runs it at the end of the video
input pipeline in both /api/set/export and /api/batch/build. Only the
keyboard interfaces are touched; virtual decoders/servers under
GeViIO_Virtual keep their PTZ flags so PTZ control still works.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-28 23:33:14 +02:00

1427 lines
51 KiB
Python

import argparse
import copy
import json
import sys
from pathlib import Path
TYPE_NAMES = {
0: "folder",
1: "bool",
2: "byte",
3: "int16",
4: "int32",
5: "int64",
7: "string",
}
NAME_TO_TYPE = {v: k for k, v in TYPE_NAMES.items()}
INT_SIZES = {1: 1, 2: 1, 3: 2, 4: 4, 5: 8}
GSC_GLOBAL_KEYS = {
"DialUpCPADay0",
"DialUpCPADay1",
"DialUpCPADay2",
"DialUpCPADay3",
"DialUpCPADay4",
"DialUpCPADay5",
"DialUpCPADay6",
"DialUpCPAResponseWindow",
"DialUpCPAThreshold",
"DialUpLimitActionCount",
"DialUpLimitTimeDepth",
"DialUpWorkerThreads",
"FailoverActive",
}
GSC_ENTRY_FIELDS = {
"Alias",
"Host",
"User",
"Password",
"Enabled",
"DeactivateEcho",
"DeactivateLiveCheck",
"DialUpBroadcastAware",
"DialUpConnection",
"DialUpCPAConnection",
"DialUpCPAConnectionInterval",
"DialUpCPATimeSettings",
"DialUpKeepAlive",
"DialUpKeepAliveRetrigger",
"DialUpKeepAliveTime",
}
# ---------- Binary read/write ----------
def _read_node(blob: bytes, offset: int):
node_type = blob[offset]
try:
type_name = TYPE_NAMES[node_type]
except KeyError:
raise ValueError(f"Unsupported type byte {node_type} at offset {offset}")
name_len = blob[offset + 1]
name_start = offset + 2
name_end = name_start + name_len
name = blob[name_start:name_end].decode("utf-8")
cursor = name_end
if node_type == 0:
child_count = int.from_bytes(blob[cursor:cursor + 4], "little")
cursor += 4
children = []
for _ in range(child_count):
child, cursor = _read_node(blob, cursor)
children.append(child)
return {"name": name, "type": type_name, "children": children}, cursor
if node_type == 7:
str_len = int.from_bytes(blob[cursor:cursor + 2], "little")
cursor += 2
str_end = cursor + str_len
value = blob[cursor:str_end].decode("utf-8")
cursor = str_end
return {"name": name, "type": type_name, "value": value}, cursor
size = INT_SIZES[node_type]
value = int.from_bytes(blob[cursor:cursor + size], "little", signed=False)
cursor += size
if node_type == 1:
value = bool(value)
return {"name": name, "type": type_name, "value": value}, cursor
def load_set(path: Path):
data = path.read_bytes()
return load_set_bytes(data)
def load_set_bytes(data: bytes):
tree, cursor = _read_node(data, 0)
if cursor != len(data):
raise ValueError(f"Did not consume complete file, stopped at {cursor} of {len(data)} bytes")
return tree
def _write_node(node, out: bytearray):
try:
node_type = NAME_TO_TYPE[node["type"]]
except KeyError:
raise ValueError(f"Unknown node type {node.get('type')}")
name_bytes = node["name"].encode("utf-8")
if len(name_bytes) > 0xFF:
raise ValueError(f"Name too long: {node['name']}")
out.append(node_type)
out.append(len(name_bytes))
out.extend(name_bytes)
if node_type == 0:
children = node.get("children", [])
out.extend(len(children).to_bytes(4, "little"))
for child in children:
_write_node(child, out)
return
if node_type == 7:
value_bytes = str(node.get("value", "")).encode("utf-8")
if len(value_bytes) > 0xFFFF:
raise ValueError(f"String too long on node {node['name']}")
out.extend(len(value_bytes).to_bytes(2, "little"))
out.extend(value_bytes)
return
size = INT_SIZES[node_type]
value = node.get("value")
if value is None:
raise ValueError(f"Missing value for node {node['name']}")
if node_type == 1:
value = 1 if value else 0
max_val = (1 << (size * 8)) - 1
if not 0 <= int(value) <= max_val:
raise ValueError(f"Value out of range for node {node['name']}: {value}")
out.extend(int(value).to_bytes(size, "little", signed=False))
def save_set(tree, path: Path):
path.write_bytes(save_set_bytes(tree))
def save_set_bytes(tree) -> bytes:
out = bytearray()
_write_node(tree, out)
return bytes(out)
# ---------- CLI helpers ----------
def dump_command(args):
tree = load_set(Path(args.input))
payload = tree
if args.output:
Path(args.output).write_text(json.dumps(payload, indent=2))
else:
json.dump(payload, sys.stdout, indent=2)
sys.stdout.write("\n")
def build_command(args):
payload = json.loads(Path(args.input).read_text())
save_set(payload, Path(args.output))
# ---------- Mapping rules helpers ----------
def _find_folder(root, path):
node = root
for name in path:
if not isinstance(node, dict) or node.get("type") != "folder":
return None
node = next((c for c in node.get("children", []) if c.get("name") == name), None)
if node is None:
return None
return node
def _child_by_name(folder, name):
if folder is None or folder.get("type") != "folder":
return None
return next((c for c in folder.get("children", []) if c.get("name") == name), None)
def _parse_output(folder):
children = folder.get("children", [])
name_node = _child_by_name(folder, "@")
if _child_by_name(folder, "GscAction"):
action_key = "GscAction"
elif _child_by_name(folder, "GCoreAction"):
action_key = "GCoreAction"
else:
action_key = None
if _child_by_name(folder, "GscServer"):
server_key = "GscServer"
elif _child_by_name(folder, "GCoreServer"):
server_key = "GCoreServer"
else:
server_key = None
order = [c.get("name") for c in children]
output = {
"id": folder.get("name"),
"name": name_node.get("value") if name_node else "",
"flags": {
"primary": (_child_by_name(folder, "@!") or {}).get("value", 0),
"secondary": (_child_by_name(folder, "@@") or {}).get("value", 0),
},
"actionField": action_key,
"serverField": server_key,
"_order": order,
}
if action_key:
output["action"] = (_child_by_name(folder, action_key) or {}).get("value", "")
if server_key:
output["server"] = (_child_by_name(folder, server_key) or {}).get("value", "")
used = {"@", "@!", "@@", action_key, server_key}
extras = [copy.deepcopy(c) for c in children if c.get("name") not in used]
if extras:
output["extras"] = extras
return output
def _parse_rule(folder):
children = folder.get("children", [])
order = [c.get("name") for c in children]
inputs = {
"name": (_child_by_name(folder, "@") or {}).get("value", ""),
"flags": {
"primary": (_child_by_name(folder, "@!") or {}).get("value", 0),
"secondary": (_child_by_name(folder, "@@") or {}).get("value", 0),
}
}
video_input_field = _child_by_name(folder, "VideoInput")
if video_input_field is not None:
inputs["videoInputId"] = video_input_field.get("value", 0)
temp_field = _child_by_name(folder, "Temp")
if temp_field is not None:
inputs["temp"] = temp_field.get("value", 0)
filters = [copy.deepcopy(c) for c in children if isinstance(c.get("name"), str) and c["name"].startswith(".")]
other_fields = [
copy.deepcopy(c)
for c in children
if c.get("name") not in {".Temp", ".VideoInput", "@", "@!", "@@", "Rules"} and not c.get("name", "").startswith(".")
]
rules_folder = _child_by_name(folder, "Rules")
outputs = []
if rules_folder and rules_folder.get("type") == "folder":
for child in rules_folder.get("children", []):
if child.get("type") == "folder":
outputs.append(_parse_output(child))
rule = {"id": folder.get("name"), "input": inputs, "outputs": outputs, "_order": order}
if filters:
rule["filters"] = filters
if other_fields:
rule["fields"] = other_fields
return rule
def _make_bool(name, value):
return {"name": name, "type": "bool", "value": bool(value)}
def _make_int32(name, value):
return {"name": name, "type": "int32", "value": int(value)}
def _make_string(name, value):
return {"name": name, "type": "string", "value": str(value)}
def _build_output(output):
action_field = output.get("actionField")
server_field = output.get("serverField")
children = [
_make_string("@", output.get("name", "")),
_make_int32("@!", output.get("flags", {}).get("primary", 0)),
_make_int32("@@", output.get("flags", {}).get("secondary", 0)),
]
if action_field:
children.append(_make_string(action_field, output.get("action", "")))
if server_field:
children.append(_make_string(server_field, output.get("server", "")))
for extra in output.get("extras", []):
children.append(copy.deepcopy(extra))
order = output.get("_order")
if order:
children = _apply_order(children, order)
return {"name": output.get("id", ""), "type": "folder", "children": children}
def _build_rule(rule):
inp = rule.get("input", {})
children = [
_make_string("@", inp.get("name", "")),
_make_int32("@!", inp.get("flags", {}).get("primary", 0)),
_make_int32("@@", inp.get("flags", {}).get("secondary", 0)),
]
for filt in rule.get("filters", []):
children.append(copy.deepcopy(filt))
# Outputs
outputs_children = []
for out in rule.get("outputs", []):
outputs_children.append(_build_output(out))
children.append({"name": "Rules", "type": "folder", "children": outputs_children})
# Tail fields
for field in rule.get("fields", []):
fcopy = copy.deepcopy(field)
if fcopy.get("name") == "VideoInput" and "videoInputId" in inp:
fcopy["value"] = inp["videoInputId"]
if fcopy.get("name") == "Temp" and "temp" in inp:
fcopy["value"] = inp["temp"]
children.append(fcopy)
order = rule.get("_order")
if order:
children = _apply_order(children, order)
return {"name": rule.get("id", ""), "type": "folder", "children": children}
def _apply_order(children, order):
if not order:
return children
name_to_child = {c.get("name"): c for c in children}
ordered = []
used = set()
for name in order:
child = name_to_child.get(name)
if child is not None:
ordered.append(child)
used.add(name)
for c in children:
if c.get("name") not in used:
ordered.append(c)
return ordered
def extract_mapping(tree):
mapping_root = _find_folder(tree, ["MappingRules"])
if not mapping_root:
raise ValueError("MappingRules folder not found in the .set file")
rules = []
extras = []
order = [c.get("name") for c in mapping_root.get("children", [])]
for child in mapping_root.get("children", []):
if child.get("type") == "folder":
rules.append(_parse_rule(child))
else:
extras.append(copy.deepcopy(child))
return {"rules": rules, "extras": extras, "_order": order}
def apply_mapping(tree, mapping):
mapping_root = _find_folder(tree, ["MappingRules"])
if not mapping_root:
raise ValueError("MappingRules folder not found in the .set file")
new_children = []
for rule in mapping.get("rules", []):
new_children.append(_build_rule(rule))
for extra in mapping.get("extras", []):
new_children.append(copy.deepcopy(extra))
order = mapping.get("_order")
if order:
new_children = _apply_order(new_children, order)
mapping_root["children"] = new_children
# ---------- GCore server helpers ----------
def extract_gcore(tree):
gcore_root = _find_folder(tree, ["GeViGCoreServer"])
if not gcore_root:
raise ValueError("GeViGCoreServer folder not found in the .set file")
order = [c.get("name") for c in gcore_root.get("children", [])]
servers = []
extras = []
for child in gcore_root.get("children", []):
if child.get("type") == "folder":
servers.append(_parse_gcore_entry(child))
else:
extras.append(copy.deepcopy(child))
return {"servers": servers, "extras": extras, "_order": order}
def _parse_gcore_entry(folder):
children = folder.get("children", [])
order = [c.get("name") for c in children]
entry = {
"id": folder.get("name"),
"alias": (_child_by_name(folder, "Alias") or {}).get("value", ""),
"host": (_child_by_name(folder, "Host") or {}).get("value", ""),
"user": (_child_by_name(folder, "User") or {}).get("value", ""),
"password": (_child_by_name(folder, "Password") or {}).get("value", ""),
"enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)),
"deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)),
"deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)),
"_order": order,
}
used = {"Alias", "Host", "User", "Password", "Enabled", "DeactivateEcho", "DeactivateLiveCheck"}
extras = [copy.deepcopy(c) for c in children if c.get("name") not in used]
if extras:
entry["extras"] = extras
return entry
def _build_gcore_entry(entry):
children = [
_make_string("Alias", entry.get("alias", "")),
_make_bool("DeactivateEcho", entry.get("deactivateEcho", False)),
_make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)),
_make_bool("Enabled", entry.get("enabled", False)),
_make_string("Host", entry.get("host", "")),
_make_string("Password", entry.get("password", "")),
_make_string("User", entry.get("user", "")),
]
for extra in entry.get("extras", []):
children.append(copy.deepcopy(extra))
order = entry.get("_order")
if order:
children = _apply_order(children, order)
return {"name": entry.get("id", ""), "type": "folder", "children": children}
def apply_gcore(tree, gcore):
gcore_root = _find_folder(tree, ["GeViGCoreServer"])
if not gcore_root:
raise ValueError("GeViGCoreServer folder not found in the .set file")
children = []
for entry in gcore.get("servers", []):
children.append(_build_gcore_entry(entry))
for extra in gcore.get("extras", []):
children.append(copy.deepcopy(extra))
order = gcore.get("_order")
if order:
children = _apply_order(children, order)
gcore_root["children"] = children
# ---------- GeViScope server helpers ----------
def extract_gsc(tree):
gsc_root = _find_folder(tree, ["GeViGscServer"])
if not gsc_root:
raise ValueError("GeViGscServer folder not found in the .set file")
order = [c.get("name") for c in gsc_root.get("children", [])]
servers = []
extras = []
global_settings = {}
for child in gsc_root.get("children", []):
if child.get("type") == "folder":
servers.append(_parse_gsc_entry(child))
else:
if child.get("name") in GSC_GLOBAL_KEYS:
global_settings[child.get("name")] = child.get("value")
else:
extras.append(copy.deepcopy(child))
return {
"servers": servers,
"globalSettings": global_settings,
"extras": extras,
"_order": order,
}
def _parse_gsc_entry(folder):
children = folder.get("children", [])
order = [c.get("name") for c in children]
entry = {
"id": folder.get("name"),
"alias": (_child_by_name(folder, "Alias") or {}).get("value", ""),
"host": (_child_by_name(folder, "Host") or {}).get("value", ""),
"user": (_child_by_name(folder, "User") or {}).get("value", ""),
"password": (_child_by_name(folder, "Password") or {}).get("value", ""),
"enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)),
"deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)),
"deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)),
"dialUpBroadcastAware": bool((_child_by_name(folder, "DialUpBroadcastAware") or {}).get("value", False)),
"dialUpConnection": bool((_child_by_name(folder, "DialUpConnection") or {}).get("value", False)),
"dialUpCPAConnection": bool((_child_by_name(folder, "DialUpCPAConnection") or {}).get("value", False)),
"dialUpCPAConnectionInterval": (_child_by_name(folder, "DialUpCPAConnectionInterval") or {}).get("value", 0),
"dialUpCPATimeSettings": (_child_by_name(folder, "DialUpCPATimeSettings") or {}).get("value", 0),
"dialUpKeepAlive": bool((_child_by_name(folder, "DialUpKeepAlive") or {}).get("value", False)),
"dialUpKeepAliveRetrigger": bool((_child_by_name(folder, "DialUpKeepAliveRetrigger") or {}).get("value", False)),
"dialUpKeepAliveTime": (_child_by_name(folder, "DialUpKeepAliveTime") or {}).get("value", 0),
"_order": order,
}
extras = [copy.deepcopy(c) for c in children if c.get("name") not in GSC_ENTRY_FIELDS]
if extras:
entry["extras"] = extras
return entry
def _build_gsc_entry(entry):
children = [
_make_string("Alias", entry.get("alias", "")),
_make_bool("DeactivateEcho", entry.get("deactivateEcho", False)),
_make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)),
_make_bool("Enabled", entry.get("enabled", False)),
_make_string("Host", entry.get("host", "")),
_make_string("Password", entry.get("password", "")),
_make_string("User", entry.get("user", "")),
_make_bool("DialUpBroadcastAware", entry.get("dialUpBroadcastAware", False)),
_make_bool("DialUpConnection", entry.get("dialUpConnection", False)),
_make_bool("DialUpCPAConnection", entry.get("dialUpCPAConnection", False)),
_make_int32("DialUpCPAConnectionInterval", entry.get("dialUpCPAConnectionInterval", 0)),
_make_int32("DialUpCPATimeSettings", entry.get("dialUpCPATimeSettings", 0)),
_make_bool("DialUpKeepAlive", entry.get("dialUpKeepAlive", False)),
_make_bool("DialUpKeepAliveRetrigger", entry.get("dialUpKeepAliveRetrigger", False)),
_make_int32("DialUpKeepAliveTime", entry.get("dialUpKeepAliveTime", 0)),
]
for extra in entry.get("extras", []):
children.append(copy.deepcopy(extra))
order = entry.get("_order")
if order:
children = _apply_order(children, order)
return {"name": entry.get("id", ""), "type": "folder", "children": children}
def apply_gsc(tree, gsc):
gsc_root = _find_folder(tree, ["GeViGscServer"])
if not gsc_root:
raise ValueError("GeViGscServer folder not found in the .set file")
children = []
for entry in gsc.get("servers", []):
children.append(_build_gsc_entry(entry))
for key, value in (gsc.get("globalSettings") or {}).items():
if isinstance(value, bool):
children.append(_make_bool(key, value))
else:
children.append(_make_int32(key, value))
for extra in gsc.get("extras", []):
children.append(copy.deepcopy(extra))
order = gsc.get("_order")
if order:
children = _apply_order(children, order)
gsc_root["children"] = children
def extract_video_outputs(tree):
paths = [
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"],
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"],
["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoOutputs"],
]
folder = None
for path in paths:
folder = _find_folder(tree, path)
if folder:
break
if not folder:
return []
outputs = []
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
name = _child_by_name(child, "Name")
desc = _child_by_name(child, "Description")
outputs.append({
"globalId": gid.get("value") if gid else None,
"localId": lid.get("value") if lid else None,
"name": name.get("value") if name else "",
"description": desc.get("value") if desc else "",
})
outputs.sort(key=lambda x: (x["localId"] or 0, x["globalId"] or 0))
return outputs
def _build_video_output_children(template_children, output):
children = []
has_global = False
has_local = False
has_name = False
has_desc = False
if template_children:
for node in template_children:
node_copy = copy.deepcopy(node)
if node_copy.get("name") == "GlobalID":
node_copy["value"] = output.get("globalId", node_copy.get("value"))
has_global = True
elif node_copy.get("name") == "LocalID":
node_copy["value"] = output.get("localId", node_copy.get("value"))
has_local = True
elif node_copy.get("name") == "Name":
node_copy["value"] = output.get("name", node_copy.get("value", ""))
has_name = True
elif node_copy.get("name") == "Description":
node_copy["value"] = output.get("description", node_copy.get("value", ""))
has_desc = True
elif node_copy.get("name") == "Enabled":
node_copy["value"] = True
children.append(node_copy)
else:
children = [
_make_bool("Enabled", True),
_make_int32("GlobalID", output.get("globalId", 0)),
_make_int32("LocalID", output.get("localId", 0)),
_make_string("Name", output.get("name", "")),
_make_string("Description", output.get("description", "")),
]
has_global = has_local = has_name = has_desc = True
if not has_global:
children.append(_make_int32("GlobalID", output.get("globalId", 0)))
if not has_local:
children.append(_make_int32("LocalID", output.get("localId", 0)))
if not has_name and output.get("name"):
children.append(_make_string("Name", output.get("name", "")))
if not has_desc and output.get("description"):
children.append(_make_string("Description", output.get("description", "")))
return children
def _apply_video_outputs_path(tree, outputs, path):
folder = _find_folder(tree, path)
if not folder:
return
template_children = None
for child in folder.get("children", []):
if child.get("type") == "folder":
template_children = child.get("children", [])
break
new_children = []
for idx, output in enumerate(outputs, start=1):
name = str(idx)
new_children.append({
"name": name,
"type": "folder",
"children": _build_video_output_children(template_children, output),
})
folder["children"] = new_children
def apply_video_outputs(tree, outputs):
if not isinstance(outputs, list) or not outputs:
return
cleaned = []
for row in outputs:
if not isinstance(row, dict):
continue
local_id = row.get("localId")
global_id = row.get("globalId")
if local_id is None and global_id is None:
continue
cleaned.append({
"localId": local_id,
"globalId": global_id,
"name": row.get("name", ""),
"description": row.get("description", ""),
})
if not cleaned:
return
cleaned.sort(key=lambda x: (x.get("localId") or 0, x.get("globalId") or 0))
_apply_video_outputs_path(
tree,
cleaned,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"],
)
_apply_video_outputs_path(
tree,
cleaned,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"],
)
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if geviio:
for child in geviio.get("children", []):
if _is_keyboard_interface(child):
_apply_video_outputs_path(
tree,
cleaned,
["Clients", "GeViIO", "GeViIO_01", child.get("name"), "VideoOutputs"],
)
def extract_logging(tree):
log_root = _find_folder(tree, ["Globals", "LogData"])
if not log_root:
return []
rows = []
for child in log_root.get("children", []):
if child.get("type") != "folder":
continue
action_name = str(child.get("name", ""))
db = _child_by_name(child, "DB")
user = _child_by_name(child, "User")
com = _child_by_name(child, "Com")
rows.append({
"actionName": action_name,
"db": bool(db.get("value")) if db else False,
"user": bool(user.get("value")) if user else False,
"com": bool(com.get("value")) if com else False,
})
return rows
def apply_logging(tree, rows):
if not isinstance(rows, list) or not rows:
return
globals_folder = _find_folder(tree, ["Globals"])
if not globals_folder:
return
log_root = _child_by_name(globals_folder, "LogData")
if not log_root:
log_root = {"name": "LogData", "type": "folder", "children": []}
globals_folder.setdefault("children", []).append(log_root)
existing = {}
for child in log_root.get("children", []):
if child.get("type") == "folder":
existing[str(child.get("name", ""))] = child
for row in rows:
if not isinstance(row, dict):
continue
action_name = str(row.get("actionName", "")).strip()
if not action_name:
continue
target = existing.get(action_name)
if not target:
target = {"name": action_name, "type": "folder", "children": []}
log_root.setdefault("children", []).append(target)
existing[action_name] = target
children = target.setdefault("children", [])
_ensure_children_fields(
children,
[
_make_bool("Com", bool(row.get("com"))),
_make_bool("DB", bool(row.get("db"))),
_make_bool("User", bool(row.get("user"))),
],
)
for child in children:
if child.get("name") == "Com":
child["value"] = bool(row.get("com"))
elif child.get("name") == "DB":
child["value"] = bool(row.get("db"))
elif child.get("name") == "User":
child["value"] = bool(row.get("user"))
def _ensure_video_inputs_path(tree, global_ids, ptz_by_id, path):
folder = _find_folder(tree, path)
if not folder:
return
ptz_by_id = ptz_by_id or {}
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
existing = {}
max_name = 0
max_local_id = 0
template_children = None
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
name = child.get("name")
try:
max_name = max(max_name, int(name))
except (TypeError, ValueError):
pass
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
if gid is not None:
gid_val = int(gid.get("value", 0))
existing[gid_val] = child
_ensure_children_fields(
child.get("children", []),
[
_make_bool("Active", True),
_make_string("Name", f"Video input{gid_val}"),
_make_string("Description", f"Video input{gid_val}"),
],
)
if gid_val in ptz_by_id:
ptz_value = bool(ptz_by_id[gid_val])
ptz_node = _child_by_name(child, "PTZ")
if ptz_node is not None:
ptz_node["value"] = ptz_value
ptz_link = _child_by_name(child, "- PTZ Linked")
if ptz_link is not None:
ptz_link["value"] = True
enabled_node = _child_by_name(child, "Enabled")
if enabled_node is not None:
enabled_node["value"] = True
active_node = _child_by_name(child, "Active")
if active_node is not None:
active_node["value"] = True
if lid is not None:
max_local_id = max(max_local_id, int(lid.get("value", 0)))
if template_children is None:
template_children = child.get("children", [])
if target_ids:
kept = []
for child in folder.get("children", []):
if child.get("type") != "folder":
kept.append(child)
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
kept.append(child)
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
kept.append(child)
continue
if gid_val in target_ids:
kept.append(child)
folder["children"] = kept
# Reassign LocalID sequentially for consistency.
ordered = []
for child in kept:
if child.get("type") != "folder":
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
continue
ordered.append((gid_val, child))
ordered.sort(key=lambda item: item[0])
local_id = 1
for _, child in ordered:
lid_node = _child_by_name(child, "LocalID")
if lid_node is not None:
lid_node["value"] = local_id
local_id += 1
for gid in sorted(target_ids):
if gid in existing:
continue
max_name += 1
max_local_id += 1
if template_children:
children = []
has_name = False
has_desc = False
for node in template_children:
node_copy = copy.deepcopy(node)
if node_copy.get("name") == "GlobalID":
node_copy["value"] = gid
elif node_copy.get("name") == "LocalID":
node_copy["value"] = max_local_id
elif node_copy.get("name") == "Name":
node_copy["value"] = f"Video input{gid}"
has_name = True
elif node_copy.get("name") == "Description":
node_copy["value"] = f"Video input{gid}"
has_desc = True
elif node_copy.get("name") in ["PTZ", "Enabled", "- Linked", "- PTZ Linked", "Active"]:
if node_copy.get("name") == "PTZ":
node_copy["value"] = bool(ptz_by_id.get(gid, True))
else:
node_copy["value"] = True
children.append(node_copy)
if not has_name:
children.append(_make_string("Name", f"Video input{gid}"))
if not has_desc:
children.append(_make_string("Description", f"Video input{gid}"))
else:
ptz_value = bool(ptz_by_id.get(gid, True))
children = [
_make_bool("- Linked", True),
_make_bool("- PTZ Linked", True),
_make_bool("Active", True),
_make_bool("Enabled", True),
_make_int32("GlobalID", gid),
_make_int32("LocalID", max_local_id),
_make_string("Name", f"Video input{gid}"),
_make_string("Description", f"Video input{gid}"),
_make_bool("PTZ", ptz_value),
]
folder.get("children", []).append(
{"name": str(max_name), "type": "folder", "children": children}
)
def ensure_video_inputs(tree, global_ids, ptz_by_id=None):
ptz_by_id = ptz_by_id or {}
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec", "VideoInputs"],
)
def _ensure_children_fields(children, required):
existing = {c.get("name") for c in children}
for node in required:
if node.get("name") not in existing:
children.append(node)
def _vx3_required_fields(gid, local_id, name_prefix, has_ptz=True):
return [
_make_int32("- Count Of MBeg Links", 3),
_make_bool("- Has PTZ", bool(has_ptz)),
_make_bool("Active", True),
_make_int32("DefaultTour", 0),
_make_string("Description", f"{name_prefix}{gid}"),
_make_bool("Enabled", True),
_make_int32("GlobalID", gid),
_make_int32("LocalID", local_id),
_make_string("Name", f"{name_prefix}{gid}"),
_make_bool("PictureDetection", True),
_make_bool("ReactionRescann", True),
_make_bool("ReactionSetup", True),
_make_int32("SignalType", 1),
_make_int32("StartupPosition", 0),
_make_int32("StartupState", 1),
_make_int32("StartupTour", 0),
_make_bool("SyncDetection", True),
_make_int32("ThresholdValue", 0),
]
def _ensure_camera_inputs(tree, path, global_ids, name_prefix, ptz_by_id=None):
folder = _find_folder(tree, path)
if not folder:
return
ptz_by_id = ptz_by_id or {}
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
existing = {}
max_name = 0
max_local_id = 0
template_children = None
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
name = child.get("name")
try:
max_name = max(max_name, int(name))
except (TypeError, ValueError):
pass
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
if gid is not None:
existing[int(gid.get("value", 0))] = child
if lid is not None:
max_local_id = max(max_local_id, int(lid.get("value", 0)))
if template_children is None:
template_children = child.get("children", [])
if target_ids:
kept = []
for child in folder.get("children", []):
if child.get("type") != "folder":
kept.append(child)
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
kept.append(child)
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
kept.append(child)
continue
if gid_val in target_ids:
kept.append(child)
folder["children"] = kept
for child in folder.get("children", []):
if child.get("type") != "folder":
continue
gid = _child_by_name(child, "GlobalID")
lid = _child_by_name(child, "LocalID")
if gid is None or lid is None:
continue
required = _vx3_required_fields(
int(gid.get("value", 0)),
int(lid.get("value", 0)),
name_prefix,
ptz_by_id.get(int(gid.get("value", 0)), True),
)
_ensure_children_fields(child.get("children", []), required)
ptz_node = _child_by_name(child, "- Has PTZ")
if ptz_node is not None:
ptz_node["value"] = bool(ptz_by_id.get(int(gid.get("value", 0)), True))
enabled_node = _child_by_name(child, "Enabled")
if enabled_node is not None:
enabled_node["value"] = True
active_node = _child_by_name(child, "Active")
if active_node is not None:
active_node["value"] = True
for gid in sorted(target_ids):
if gid in existing:
continue
max_name += 1
max_local_id += 1
if template_children:
children = []
for node in template_children:
node_copy = copy.deepcopy(node)
if node_copy.get("name") == "GlobalID":
node_copy["value"] = gid
elif node_copy.get("name") == "LocalID":
node_copy["value"] = max_local_id
elif node_copy.get("name") in {"Name", "Description"}:
node_copy["value"] = f"{name_prefix}{gid}"
elif node_copy.get("name") == "- Has PTZ":
node_copy["value"] = bool(ptz_by_id.get(gid, True))
elif node_copy.get("name") in {"Enabled", "Active"}:
node_copy["value"] = True
children.append(node_copy)
else:
has_ptz = bool(ptz_by_id.get(gid, True))
children = [
_make_bool("- Has PTZ", has_ptz),
_make_bool("Active", True),
_make_string("Description", f"{name_prefix}{gid}"),
_make_bool("Enabled", True),
_make_int32("GlobalID", gid),
_make_int32("LocalID", max_local_id),
_make_string("Name", f"{name_prefix}{gid}"),
]
required = _vx3_required_fields(gid, max_local_id, name_prefix, ptz_by_id.get(gid, True))
_ensure_children_fields(children, required)
folder.get("children", []).append(
{"name": str(max_name), "type": "folder", "children": children}
)
def ensure_vx3_video_inputs(tree, global_ids, ptz_by_id=None):
_ensure_camera_inputs(
tree,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoInputs"],
global_ids,
"Video input",
ptz_by_id,
)
_ensure_camera_inputs(
tree,
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoInputs"],
global_ids,
"Video input",
ptz_by_id,
)
def ensure_global_video_inputs(tree, global_ids, ptz_by_id=None):
ptz_by_id = ptz_by_id or {}
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if geviio:
for child in geviio.get("children", []):
if child.get("type") != "folder":
continue
name = str(child.get("name", ""))
if not name.startswith("MBeg"):
continue
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_01", name, "VideoInputs"],
)
return
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoInputs"],
)
_ensure_video_inputs_path(
tree,
global_ids,
ptz_by_id,
["Clients", "GeViIO", "GeViIO_01", "MBeg2", "VideoInputs"],
)
def disable_keyboard_ptz(tree):
"""Force PTZ off for every video input on every MBeg keyboard interface.
Keyboards live directly under Clients/GeViIO/GeViIO_01 (each is an
interface folder with its own VideoInputs). PTZ control on the keyboard
side is never wanted, so the PTZ flag of each video input is cleared.
Virtual decoders/servers under GeViIO_Virtual are left untouched.
"""
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if not geviio:
return 0
cleared = 0
for interface in geviio.get("children", []):
if not isinstance(interface, dict) or interface.get("type") != "folder":
continue
video_inputs = _child_by_name(interface, "VideoInputs")
if video_inputs is None:
continue
for entry in video_inputs.get("children", []):
if entry.get("type") != "folder":
continue
ptz_node = _child_by_name(entry, "PTZ")
if ptz_node is not None and ptz_node.get("value") is not False:
ptz_node["value"] = False
cleared += 1
return cleared
def prune_video_inputs(tree, global_ids):
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
if not target_ids:
return
def walk(node):
if not isinstance(node, dict) or node.get("type") != "folder":
return
if node.get("name") == "VideoInputs":
kept = []
for child in node.get("children", []):
if child.get("type") != "folder":
kept.append(child)
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
kept.append(child)
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
kept.append(child)
continue
if gid_val in target_ids:
kept.append(child)
node["children"] = kept
ordered = []
for child in kept:
if child.get("type") != "folder":
continue
gid_node = _child_by_name(child, "GlobalID")
if gid_node is None:
continue
try:
gid_val = int(gid_node.get("value", 0))
except (TypeError, ValueError):
continue
ordered.append((gid_val, child))
ordered.sort(key=lambda item: item[0])
local_id = 1
for _, child in ordered:
lid_node = _child_by_name(child, "LocalID")
if lid_node is not None:
lid_node["value"] = local_id
local_id += 1
for child in node.get("children", []):
walk(child)
walk(tree)
def _set_child_value(folder, name, value):
node = _child_by_name(folder, name)
if node is not None:
node["value"] = value
def _replace_named_children(folder, prefix, replacements):
children = folder.get("children", [])
first_idx = None
for idx, child in enumerate(children):
if str(child.get("name", "")).startswith(prefix):
first_idx = idx
break
if first_idx is None:
first_idx = len(children)
kept = []
for idx, child in enumerate(children):
if idx < first_idx:
kept.append(child)
continue
if str(child.get("name", "")).startswith(prefix):
continue
kept.append(child)
folder["children"] = kept[:first_idx] + replacements + kept[first_idx:]
def _set_mbeg_link_counts(tree, count):
def walk(node):
if not isinstance(node, dict):
return
if node.get("name") == "- Count Of MBeg Links":
node["value"] = int(count)
if node.get("type") == "folder":
for child in node.get("children", []):
walk(child)
walk(tree)
def _is_keyboard_interface(node):
if not isinstance(node, dict) or node.get("type") != "folder":
return False
if _child_by_name(node, "SerialParameters") is None:
return False
if _child_by_name(node, "VideoInputs") is None:
return False
type_node = _child_by_name(node, "Type")
try:
return int(type_node.get("value", 0)) == 5 if type_node else False
except (TypeError, ValueError):
return False
def _replace_keyboard_children(folder, new_children):
children = folder.get("children", [])
first_idx = None
kept = []
for idx, child in enumerate(children):
if _is_keyboard_interface(child):
if first_idx is None:
first_idx = idx
continue
kept.append(child)
if first_idx is None:
first_idx = len(kept)
folder["children"] = kept[:first_idx] + new_children + kept[first_idx:]
def _replace_keyboard_users(users, new_children):
children = users.get("children", [])
first_idx = None
kept = []
for idx, child in enumerate(children):
if child.get("type") == "folder" and child.get("name") not in {
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
}:
if first_idx is None:
first_idx = idx
continue
kept.append(child)
if first_idx is None:
first_idx = len(kept)
users["children"] = kept[:first_idx] + new_children + kept[first_idx:]
def apply_keyboards(tree, keyboards):
if not isinstance(keyboards, list):
return
entries = []
for item in keyboards:
if not isinstance(item, dict):
continue
description = str(item.get("description", "")).strip()
host = str(item.get("host", "")).strip()
port = item.get("port")
if not host or port in (None, ""):
continue
try:
port_int = int(port)
except (TypeError, ValueError):
continue
entries.append({
"name": str(item.get("name", "")).strip(),
"description": description,
"host": host,
"port": port_int,
})
if not entries:
return
client = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
if not client:
raise ValueError("Clients/GeViIO/GeViIO_01 not found in the .set file")
users = _find_folder(tree, ["Users"])
if not users:
raise ValueError("Users folder not found in the .set file")
template = _child_by_name(client, "MBeg")
if not template:
raise ValueError("MBeg interface not found in the .set file")
template_user = _child_by_name(users, "MBeg")
if not template_user:
raise ValueError("Users/MBeg not found in the .set file")
base_interface_id = _child_by_name(template, "InterfaceID")
try:
base_interface_id = int(base_interface_id.get("value", 1)) if base_interface_id else 1
except (TypeError, ValueError):
base_interface_id = 1
base_user_id = _child_by_name(template_user, "UserID")
try:
base_user_id = int(base_user_id.get("value", 1)) if base_user_id else 1
except (TypeError, ValueError):
base_user_id = 1
existing_interfaces = [c for c in client.get("children", []) if _is_keyboard_interface(c)]
existing_users = [
c for c in users.get("children", [])
if c.get("type") == "folder" and c.get("name") not in {
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
}
]
user_by_name = {c.get("name"): c for c in existing_users}
host_to_iface = {}
for iface in existing_interfaces:
serial = _child_by_name(iface, "SerialParameters")
host_node = _child_by_name(serial, "IpHost") if serial else None
host = str(host_node.get("value", "")).strip() if host_node else ""
if host:
host_to_iface[host] = iface
template_iface = existing_interfaces[0] if existing_interfaces else template
template_user = user_by_name.get(template_iface.get("name")) or template_user
virtual_kdec = _find_folder(
tree, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec"]
)
virtual_kdec_id = None
if virtual_kdec:
kid = _child_by_name(virtual_kdec, "InterfaceID")
try:
virtual_kdec_id = int(kid.get("value", 0)) if kid else None
except (TypeError, ValueError):
virtual_kdec_id = None
virtual_user = _child_by_name(users, "VirtualKDec")
virtual_user_id = None
if virtual_user:
uid = _child_by_name(virtual_user, "UserID")
try:
virtual_user_id = int(uid.get("value", 0)) if uid else None
except (TypeError, ValueError):
virtual_user_id = None
start_interface_id = (virtual_kdec_id or base_interface_id) + 1
start_user_id = (virtual_user_id or base_user_id) + 1
new_ifaces = []
new_users = []
for idx, entry in enumerate(entries, start=1):
iface = host_to_iface.get(entry["host"])
iface = copy.deepcopy(iface) if iface else copy.deepcopy(template_iface)
iface_name = entry.get("name") or f"Mbeg{idx}"
iface["name"] = iface_name
_set_child_value(iface, "Enabled", True)
_set_child_value(iface, "Protocol", 0)
_set_child_value(iface, "MBegMode", True)
_set_child_value(iface, "InterfaceID", start_interface_id + idx - 1)
serial = _child_by_name(iface, "SerialParameters")
if serial:
_set_child_value(serial, "IpHost", entry["host"])
_set_child_value(serial, "IpPort", entry["port"])
new_ifaces.append(iface)
user = user_by_name.get(iface_name)
user = copy.deepcopy(user) if user else copy.deepcopy(template_user)
user["name"] = iface_name
_set_child_value(user, "Username", iface_name)
_set_child_value(user, "Description", "")
_set_child_value(user, "UserID", start_user_id + idx - 1)
new_users.append(user)
_replace_keyboard_children(client, new_ifaces)
_replace_keyboard_users(users, new_users)
if virtual_kdec:
serial = _child_by_name(virtual_kdec, "SerialParameters")
if serial:
_set_child_value(serial, "IpHost", entries[0]["host"])
_set_child_value(serial, "IpPort", entries[0]["port"])
_set_child_value(serial, "ConnectionType", 1)
_set_child_value(serial, "PortNr", 0)
virtual_user = _child_by_name(users, "VirtualKDec")
if virtual_user:
_set_child_value(virtual_user, "Description", "")
mbeg_entries = [c for c in client.get("children", []) if _is_keyboard_interface(c)]
mbeg_entries.sort(key=lambda c: str(c.get("name", "")))
for iface in mbeg_entries:
_set_child_value(iface, "MBegMode", True)
mbeg_users = [
c for c in users.get("children", [])
if c.get("type") == "folder" and c.get("name") not in {
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
}
]
mbeg_users.sort(key=lambda c: str(c.get("name", "")))
total_mbeg = len(mbeg_entries)
_set_mbeg_link_counts(tree, total_mbeg)
def main(argv=None):
parser = argparse.ArgumentParser(description="Convert GeViSoft .set files to/from JSON.")
sub = parser.add_subparsers(dest="command", required=True)
dump = sub.add_parser("dump", help="Parse a .set file into JSON.")
dump.add_argument("input", help="Path to .set file")
dump.add_argument("output", nargs="?", help="Optional path for JSON output (stdout if omitted)")
dump.set_defaults(func=dump_command)
build = sub.add_parser("build", help="Create a .set file from JSON produced by dump.")
build.add_argument("input", help="Path to JSON file")
build.add_argument("output", help="Path to write .set file")
build.set_defaults(func=build_command)
args = parser.parse_args(argv)
args.func(args)
if __name__ == "__main__":
main()