1399 lines
50 KiB
Python
1399 lines
50 KiB
Python
import argparse
|
|
import copy
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
TYPE_NAMES = {
|
|
0: "folder",
|
|
1: "bool",
|
|
2: "byte",
|
|
3: "int16",
|
|
4: "int32",
|
|
5: "int64",
|
|
7: "string",
|
|
}
|
|
|
|
NAME_TO_TYPE = {v: k for k, v in TYPE_NAMES.items()}
|
|
INT_SIZES = {1: 1, 2: 1, 3: 2, 4: 4, 5: 8}
|
|
|
|
GSC_GLOBAL_KEYS = {
|
|
"DialUpCPADay0",
|
|
"DialUpCPADay1",
|
|
"DialUpCPADay2",
|
|
"DialUpCPADay3",
|
|
"DialUpCPADay4",
|
|
"DialUpCPADay5",
|
|
"DialUpCPADay6",
|
|
"DialUpCPAResponseWindow",
|
|
"DialUpCPAThreshold",
|
|
"DialUpLimitActionCount",
|
|
"DialUpLimitTimeDepth",
|
|
"DialUpWorkerThreads",
|
|
"FailoverActive",
|
|
}
|
|
|
|
GSC_ENTRY_FIELDS = {
|
|
"Alias",
|
|
"Host",
|
|
"User",
|
|
"Password",
|
|
"Enabled",
|
|
"DeactivateEcho",
|
|
"DeactivateLiveCheck",
|
|
"DialUpBroadcastAware",
|
|
"DialUpConnection",
|
|
"DialUpCPAConnection",
|
|
"DialUpCPAConnectionInterval",
|
|
"DialUpCPATimeSettings",
|
|
"DialUpKeepAlive",
|
|
"DialUpKeepAliveRetrigger",
|
|
"DialUpKeepAliveTime",
|
|
}
|
|
|
|
|
|
# ---------- Binary read/write ----------
|
|
|
|
def _read_node(blob: bytes, offset: int):
|
|
node_type = blob[offset]
|
|
try:
|
|
type_name = TYPE_NAMES[node_type]
|
|
except KeyError:
|
|
raise ValueError(f"Unsupported type byte {node_type} at offset {offset}")
|
|
|
|
name_len = blob[offset + 1]
|
|
name_start = offset + 2
|
|
name_end = name_start + name_len
|
|
name = blob[name_start:name_end].decode("utf-8")
|
|
cursor = name_end
|
|
|
|
if node_type == 0:
|
|
child_count = int.from_bytes(blob[cursor:cursor + 4], "little")
|
|
cursor += 4
|
|
children = []
|
|
for _ in range(child_count):
|
|
child, cursor = _read_node(blob, cursor)
|
|
children.append(child)
|
|
return {"name": name, "type": type_name, "children": children}, cursor
|
|
|
|
if node_type == 7:
|
|
str_len = int.from_bytes(blob[cursor:cursor + 2], "little")
|
|
cursor += 2
|
|
str_end = cursor + str_len
|
|
value = blob[cursor:str_end].decode("utf-8")
|
|
cursor = str_end
|
|
return {"name": name, "type": type_name, "value": value}, cursor
|
|
|
|
size = INT_SIZES[node_type]
|
|
value = int.from_bytes(blob[cursor:cursor + size], "little", signed=False)
|
|
cursor += size
|
|
if node_type == 1:
|
|
value = bool(value)
|
|
return {"name": name, "type": type_name, "value": value}, cursor
|
|
|
|
|
|
def load_set(path: Path):
|
|
data = path.read_bytes()
|
|
return load_set_bytes(data)
|
|
|
|
|
|
def load_set_bytes(data: bytes):
|
|
tree, cursor = _read_node(data, 0)
|
|
if cursor != len(data):
|
|
raise ValueError(f"Did not consume complete file, stopped at {cursor} of {len(data)} bytes")
|
|
return tree
|
|
|
|
|
|
def _write_node(node, out: bytearray):
|
|
try:
|
|
node_type = NAME_TO_TYPE[node["type"]]
|
|
except KeyError:
|
|
raise ValueError(f"Unknown node type {node.get('type')}")
|
|
|
|
name_bytes = node["name"].encode("utf-8")
|
|
if len(name_bytes) > 0xFF:
|
|
raise ValueError(f"Name too long: {node['name']}")
|
|
|
|
out.append(node_type)
|
|
out.append(len(name_bytes))
|
|
out.extend(name_bytes)
|
|
|
|
if node_type == 0:
|
|
children = node.get("children", [])
|
|
out.extend(len(children).to_bytes(4, "little"))
|
|
for child in children:
|
|
_write_node(child, out)
|
|
return
|
|
|
|
if node_type == 7:
|
|
value_bytes = str(node.get("value", "")).encode("utf-8")
|
|
if len(value_bytes) > 0xFFFF:
|
|
raise ValueError(f"String too long on node {node['name']}")
|
|
out.extend(len(value_bytes).to_bytes(2, "little"))
|
|
out.extend(value_bytes)
|
|
return
|
|
|
|
size = INT_SIZES[node_type]
|
|
value = node.get("value")
|
|
if value is None:
|
|
raise ValueError(f"Missing value for node {node['name']}")
|
|
if node_type == 1:
|
|
value = 1 if value else 0
|
|
max_val = (1 << (size * 8)) - 1
|
|
if not 0 <= int(value) <= max_val:
|
|
raise ValueError(f"Value out of range for node {node['name']}: {value}")
|
|
out.extend(int(value).to_bytes(size, "little", signed=False))
|
|
|
|
|
|
def save_set(tree, path: Path):
|
|
path.write_bytes(save_set_bytes(tree))
|
|
|
|
|
|
def save_set_bytes(tree) -> bytes:
|
|
out = bytearray()
|
|
_write_node(tree, out)
|
|
return bytes(out)
|
|
|
|
|
|
# ---------- CLI helpers ----------
|
|
|
|
def dump_command(args):
|
|
tree = load_set(Path(args.input))
|
|
payload = tree
|
|
if args.output:
|
|
Path(args.output).write_text(json.dumps(payload, indent=2))
|
|
else:
|
|
json.dump(payload, sys.stdout, indent=2)
|
|
sys.stdout.write("\n")
|
|
|
|
|
|
def build_command(args):
|
|
payload = json.loads(Path(args.input).read_text())
|
|
save_set(payload, Path(args.output))
|
|
|
|
|
|
# ---------- Mapping rules helpers ----------
|
|
|
|
def _find_folder(root, path):
|
|
node = root
|
|
for name in path:
|
|
if not isinstance(node, dict) or node.get("type") != "folder":
|
|
return None
|
|
node = next((c for c in node.get("children", []) if c.get("name") == name), None)
|
|
if node is None:
|
|
return None
|
|
return node
|
|
|
|
|
|
def _child_by_name(folder, name):
|
|
if folder is None or folder.get("type") != "folder":
|
|
return None
|
|
return next((c for c in folder.get("children", []) if c.get("name") == name), None)
|
|
|
|
|
|
def _parse_output(folder):
|
|
children = folder.get("children", [])
|
|
name_node = _child_by_name(folder, "@")
|
|
if _child_by_name(folder, "GscAction"):
|
|
action_key = "GscAction"
|
|
elif _child_by_name(folder, "GCoreAction"):
|
|
action_key = "GCoreAction"
|
|
else:
|
|
action_key = None
|
|
|
|
if _child_by_name(folder, "GscServer"):
|
|
server_key = "GscServer"
|
|
elif _child_by_name(folder, "GCoreServer"):
|
|
server_key = "GCoreServer"
|
|
else:
|
|
server_key = None
|
|
order = [c.get("name") for c in children]
|
|
output = {
|
|
"id": folder.get("name"),
|
|
"name": name_node.get("value") if name_node else "",
|
|
"flags": {
|
|
"primary": (_child_by_name(folder, "@!") or {}).get("value", 0),
|
|
"secondary": (_child_by_name(folder, "@@") or {}).get("value", 0),
|
|
},
|
|
"actionField": action_key,
|
|
"serverField": server_key,
|
|
"_order": order,
|
|
}
|
|
if action_key:
|
|
output["action"] = (_child_by_name(folder, action_key) or {}).get("value", "")
|
|
if server_key:
|
|
output["server"] = (_child_by_name(folder, server_key) or {}).get("value", "")
|
|
used = {"@", "@!", "@@", action_key, server_key}
|
|
extras = [copy.deepcopy(c) for c in children if c.get("name") not in used]
|
|
if extras:
|
|
output["extras"] = extras
|
|
return output
|
|
|
|
|
|
def _parse_rule(folder):
|
|
children = folder.get("children", [])
|
|
order = [c.get("name") for c in children]
|
|
inputs = {
|
|
"name": (_child_by_name(folder, "@") or {}).get("value", ""),
|
|
"flags": {
|
|
"primary": (_child_by_name(folder, "@!") or {}).get("value", 0),
|
|
"secondary": (_child_by_name(folder, "@@") or {}).get("value", 0),
|
|
}
|
|
}
|
|
video_input_field = _child_by_name(folder, "VideoInput")
|
|
if video_input_field is not None:
|
|
inputs["videoInputId"] = video_input_field.get("value", 0)
|
|
temp_field = _child_by_name(folder, "Temp")
|
|
if temp_field is not None:
|
|
inputs["temp"] = temp_field.get("value", 0)
|
|
filters = [copy.deepcopy(c) for c in children if isinstance(c.get("name"), str) and c["name"].startswith(".")]
|
|
other_fields = [
|
|
copy.deepcopy(c)
|
|
for c in children
|
|
if c.get("name") not in {".Temp", ".VideoInput", "@", "@!", "@@", "Rules"} and not c.get("name", "").startswith(".")
|
|
]
|
|
rules_folder = _child_by_name(folder, "Rules")
|
|
outputs = []
|
|
if rules_folder and rules_folder.get("type") == "folder":
|
|
for child in rules_folder.get("children", []):
|
|
if child.get("type") == "folder":
|
|
outputs.append(_parse_output(child))
|
|
|
|
rule = {"id": folder.get("name"), "input": inputs, "outputs": outputs, "_order": order}
|
|
if filters:
|
|
rule["filters"] = filters
|
|
if other_fields:
|
|
rule["fields"] = other_fields
|
|
return rule
|
|
|
|
|
|
def _make_bool(name, value):
|
|
return {"name": name, "type": "bool", "value": bool(value)}
|
|
|
|
|
|
def _make_int32(name, value):
|
|
return {"name": name, "type": "int32", "value": int(value)}
|
|
|
|
|
|
def _make_string(name, value):
|
|
return {"name": name, "type": "string", "value": str(value)}
|
|
|
|
|
|
def _build_output(output):
|
|
action_field = output.get("actionField")
|
|
server_field = output.get("serverField")
|
|
children = [
|
|
_make_string("@", output.get("name", "")),
|
|
_make_int32("@!", output.get("flags", {}).get("primary", 0)),
|
|
_make_int32("@@", output.get("flags", {}).get("secondary", 0)),
|
|
]
|
|
if action_field:
|
|
children.append(_make_string(action_field, output.get("action", "")))
|
|
if server_field:
|
|
children.append(_make_string(server_field, output.get("server", "")))
|
|
for extra in output.get("extras", []):
|
|
children.append(copy.deepcopy(extra))
|
|
order = output.get("_order")
|
|
if order:
|
|
children = _apply_order(children, order)
|
|
return {"name": output.get("id", ""), "type": "folder", "children": children}
|
|
|
|
|
|
def _build_rule(rule):
|
|
inp = rule.get("input", {})
|
|
children = [
|
|
_make_string("@", inp.get("name", "")),
|
|
_make_int32("@!", inp.get("flags", {}).get("primary", 0)),
|
|
_make_int32("@@", inp.get("flags", {}).get("secondary", 0)),
|
|
]
|
|
for filt in rule.get("filters", []):
|
|
children.append(copy.deepcopy(filt))
|
|
# Outputs
|
|
outputs_children = []
|
|
for out in rule.get("outputs", []):
|
|
outputs_children.append(_build_output(out))
|
|
children.append({"name": "Rules", "type": "folder", "children": outputs_children})
|
|
|
|
# Tail fields
|
|
for field in rule.get("fields", []):
|
|
fcopy = copy.deepcopy(field)
|
|
if fcopy.get("name") == "VideoInput" and "videoInputId" in inp:
|
|
fcopy["value"] = inp["videoInputId"]
|
|
if fcopy.get("name") == "Temp" and "temp" in inp:
|
|
fcopy["value"] = inp["temp"]
|
|
children.append(fcopy)
|
|
|
|
order = rule.get("_order")
|
|
if order:
|
|
children = _apply_order(children, order)
|
|
return {"name": rule.get("id", ""), "type": "folder", "children": children}
|
|
|
|
|
|
def _apply_order(children, order):
|
|
if not order:
|
|
return children
|
|
name_to_child = {c.get("name"): c for c in children}
|
|
ordered = []
|
|
used = set()
|
|
for name in order:
|
|
child = name_to_child.get(name)
|
|
if child is not None:
|
|
ordered.append(child)
|
|
used.add(name)
|
|
for c in children:
|
|
if c.get("name") not in used:
|
|
ordered.append(c)
|
|
return ordered
|
|
|
|
|
|
def extract_mapping(tree):
|
|
mapping_root = _find_folder(tree, ["MappingRules"])
|
|
if not mapping_root:
|
|
raise ValueError("MappingRules folder not found in the .set file")
|
|
rules = []
|
|
extras = []
|
|
order = [c.get("name") for c in mapping_root.get("children", [])]
|
|
for child in mapping_root.get("children", []):
|
|
if child.get("type") == "folder":
|
|
rules.append(_parse_rule(child))
|
|
else:
|
|
extras.append(copy.deepcopy(child))
|
|
return {"rules": rules, "extras": extras, "_order": order}
|
|
|
|
|
|
def apply_mapping(tree, mapping):
|
|
mapping_root = _find_folder(tree, ["MappingRules"])
|
|
if not mapping_root:
|
|
raise ValueError("MappingRules folder not found in the .set file")
|
|
new_children = []
|
|
for rule in mapping.get("rules", []):
|
|
new_children.append(_build_rule(rule))
|
|
for extra in mapping.get("extras", []):
|
|
new_children.append(copy.deepcopy(extra))
|
|
order = mapping.get("_order")
|
|
if order:
|
|
new_children = _apply_order(new_children, order)
|
|
mapping_root["children"] = new_children
|
|
|
|
|
|
# ---------- GCore server helpers ----------
|
|
|
|
def extract_gcore(tree):
|
|
gcore_root = _find_folder(tree, ["GeViGCoreServer"])
|
|
if not gcore_root:
|
|
raise ValueError("GeViGCoreServer folder not found in the .set file")
|
|
order = [c.get("name") for c in gcore_root.get("children", [])]
|
|
servers = []
|
|
extras = []
|
|
for child in gcore_root.get("children", []):
|
|
if child.get("type") == "folder":
|
|
servers.append(_parse_gcore_entry(child))
|
|
else:
|
|
extras.append(copy.deepcopy(child))
|
|
return {"servers": servers, "extras": extras, "_order": order}
|
|
|
|
|
|
def _parse_gcore_entry(folder):
|
|
children = folder.get("children", [])
|
|
order = [c.get("name") for c in children]
|
|
entry = {
|
|
"id": folder.get("name"),
|
|
"alias": (_child_by_name(folder, "Alias") or {}).get("value", ""),
|
|
"host": (_child_by_name(folder, "Host") or {}).get("value", ""),
|
|
"user": (_child_by_name(folder, "User") or {}).get("value", ""),
|
|
"password": (_child_by_name(folder, "Password") or {}).get("value", ""),
|
|
"enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)),
|
|
"deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)),
|
|
"deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)),
|
|
"_order": order,
|
|
}
|
|
used = {"Alias", "Host", "User", "Password", "Enabled", "DeactivateEcho", "DeactivateLiveCheck"}
|
|
extras = [copy.deepcopy(c) for c in children if c.get("name") not in used]
|
|
if extras:
|
|
entry["extras"] = extras
|
|
return entry
|
|
|
|
|
|
def _build_gcore_entry(entry):
|
|
children = [
|
|
_make_string("Alias", entry.get("alias", "")),
|
|
_make_bool("DeactivateEcho", entry.get("deactivateEcho", False)),
|
|
_make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)),
|
|
_make_bool("Enabled", entry.get("enabled", False)),
|
|
_make_string("Host", entry.get("host", "")),
|
|
_make_string("Password", entry.get("password", "")),
|
|
_make_string("User", entry.get("user", "")),
|
|
]
|
|
for extra in entry.get("extras", []):
|
|
children.append(copy.deepcopy(extra))
|
|
order = entry.get("_order")
|
|
if order:
|
|
children = _apply_order(children, order)
|
|
return {"name": entry.get("id", ""), "type": "folder", "children": children}
|
|
|
|
|
|
def apply_gcore(tree, gcore):
|
|
gcore_root = _find_folder(tree, ["GeViGCoreServer"])
|
|
if not gcore_root:
|
|
raise ValueError("GeViGCoreServer folder not found in the .set file")
|
|
children = []
|
|
for entry in gcore.get("servers", []):
|
|
children.append(_build_gcore_entry(entry))
|
|
for extra in gcore.get("extras", []):
|
|
children.append(copy.deepcopy(extra))
|
|
order = gcore.get("_order")
|
|
if order:
|
|
children = _apply_order(children, order)
|
|
gcore_root["children"] = children
|
|
|
|
|
|
# ---------- GeViScope server helpers ----------
|
|
|
|
def extract_gsc(tree):
|
|
gsc_root = _find_folder(tree, ["GeViGscServer"])
|
|
if not gsc_root:
|
|
raise ValueError("GeViGscServer folder not found in the .set file")
|
|
order = [c.get("name") for c in gsc_root.get("children", [])]
|
|
servers = []
|
|
extras = []
|
|
global_settings = {}
|
|
for child in gsc_root.get("children", []):
|
|
if child.get("type") == "folder":
|
|
servers.append(_parse_gsc_entry(child))
|
|
else:
|
|
if child.get("name") in GSC_GLOBAL_KEYS:
|
|
global_settings[child.get("name")] = child.get("value")
|
|
else:
|
|
extras.append(copy.deepcopy(child))
|
|
return {
|
|
"servers": servers,
|
|
"globalSettings": global_settings,
|
|
"extras": extras,
|
|
"_order": order,
|
|
}
|
|
|
|
|
|
def _parse_gsc_entry(folder):
|
|
children = folder.get("children", [])
|
|
order = [c.get("name") for c in children]
|
|
entry = {
|
|
"id": folder.get("name"),
|
|
"alias": (_child_by_name(folder, "Alias") or {}).get("value", ""),
|
|
"host": (_child_by_name(folder, "Host") or {}).get("value", ""),
|
|
"user": (_child_by_name(folder, "User") or {}).get("value", ""),
|
|
"password": (_child_by_name(folder, "Password") or {}).get("value", ""),
|
|
"enabled": bool((_child_by_name(folder, "Enabled") or {}).get("value", False)),
|
|
"deactivateEcho": bool((_child_by_name(folder, "DeactivateEcho") or {}).get("value", False)),
|
|
"deactivateLiveCheck": bool((_child_by_name(folder, "DeactivateLiveCheck") or {}).get("value", False)),
|
|
"dialUpBroadcastAware": bool((_child_by_name(folder, "DialUpBroadcastAware") or {}).get("value", False)),
|
|
"dialUpConnection": bool((_child_by_name(folder, "DialUpConnection") or {}).get("value", False)),
|
|
"dialUpCPAConnection": bool((_child_by_name(folder, "DialUpCPAConnection") or {}).get("value", False)),
|
|
"dialUpCPAConnectionInterval": (_child_by_name(folder, "DialUpCPAConnectionInterval") or {}).get("value", 0),
|
|
"dialUpCPATimeSettings": (_child_by_name(folder, "DialUpCPATimeSettings") or {}).get("value", 0),
|
|
"dialUpKeepAlive": bool((_child_by_name(folder, "DialUpKeepAlive") or {}).get("value", False)),
|
|
"dialUpKeepAliveRetrigger": bool((_child_by_name(folder, "DialUpKeepAliveRetrigger") or {}).get("value", False)),
|
|
"dialUpKeepAliveTime": (_child_by_name(folder, "DialUpKeepAliveTime") or {}).get("value", 0),
|
|
"_order": order,
|
|
}
|
|
extras = [copy.deepcopy(c) for c in children if c.get("name") not in GSC_ENTRY_FIELDS]
|
|
if extras:
|
|
entry["extras"] = extras
|
|
return entry
|
|
|
|
|
|
def _build_gsc_entry(entry):
|
|
children = [
|
|
_make_string("Alias", entry.get("alias", "")),
|
|
_make_bool("DeactivateEcho", entry.get("deactivateEcho", False)),
|
|
_make_bool("DeactivateLiveCheck", entry.get("deactivateLiveCheck", False)),
|
|
_make_bool("Enabled", entry.get("enabled", False)),
|
|
_make_string("Host", entry.get("host", "")),
|
|
_make_string("Password", entry.get("password", "")),
|
|
_make_string("User", entry.get("user", "")),
|
|
_make_bool("DialUpBroadcastAware", entry.get("dialUpBroadcastAware", False)),
|
|
_make_bool("DialUpConnection", entry.get("dialUpConnection", False)),
|
|
_make_bool("DialUpCPAConnection", entry.get("dialUpCPAConnection", False)),
|
|
_make_int32("DialUpCPAConnectionInterval", entry.get("dialUpCPAConnectionInterval", 0)),
|
|
_make_int32("DialUpCPATimeSettings", entry.get("dialUpCPATimeSettings", 0)),
|
|
_make_bool("DialUpKeepAlive", entry.get("dialUpKeepAlive", False)),
|
|
_make_bool("DialUpKeepAliveRetrigger", entry.get("dialUpKeepAliveRetrigger", False)),
|
|
_make_int32("DialUpKeepAliveTime", entry.get("dialUpKeepAliveTime", 0)),
|
|
]
|
|
for extra in entry.get("extras", []):
|
|
children.append(copy.deepcopy(extra))
|
|
order = entry.get("_order")
|
|
if order:
|
|
children = _apply_order(children, order)
|
|
return {"name": entry.get("id", ""), "type": "folder", "children": children}
|
|
|
|
|
|
def apply_gsc(tree, gsc):
|
|
gsc_root = _find_folder(tree, ["GeViGscServer"])
|
|
if not gsc_root:
|
|
raise ValueError("GeViGscServer folder not found in the .set file")
|
|
children = []
|
|
for entry in gsc.get("servers", []):
|
|
children.append(_build_gsc_entry(entry))
|
|
for key, value in (gsc.get("globalSettings") or {}).items():
|
|
if isinstance(value, bool):
|
|
children.append(_make_bool(key, value))
|
|
else:
|
|
children.append(_make_int32(key, value))
|
|
for extra in gsc.get("extras", []):
|
|
children.append(copy.deepcopy(extra))
|
|
order = gsc.get("_order")
|
|
if order:
|
|
children = _apply_order(children, order)
|
|
gsc_root["children"] = children
|
|
|
|
|
|
def extract_video_outputs(tree):
|
|
paths = [
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"],
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"],
|
|
["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoOutputs"],
|
|
]
|
|
folder = None
|
|
for path in paths:
|
|
folder = _find_folder(tree, path)
|
|
if folder:
|
|
break
|
|
if not folder:
|
|
return []
|
|
outputs = []
|
|
for child in folder.get("children", []):
|
|
if child.get("type") != "folder":
|
|
continue
|
|
gid = _child_by_name(child, "GlobalID")
|
|
lid = _child_by_name(child, "LocalID")
|
|
name = _child_by_name(child, "Name")
|
|
desc = _child_by_name(child, "Description")
|
|
outputs.append({
|
|
"globalId": gid.get("value") if gid else None,
|
|
"localId": lid.get("value") if lid else None,
|
|
"name": name.get("value") if name else "",
|
|
"description": desc.get("value") if desc else "",
|
|
})
|
|
outputs.sort(key=lambda x: (x["localId"] or 0, x["globalId"] or 0))
|
|
return outputs
|
|
|
|
|
|
def _build_video_output_children(template_children, output):
|
|
children = []
|
|
has_global = False
|
|
has_local = False
|
|
has_name = False
|
|
has_desc = False
|
|
if template_children:
|
|
for node in template_children:
|
|
node_copy = copy.deepcopy(node)
|
|
if node_copy.get("name") == "GlobalID":
|
|
node_copy["value"] = output.get("globalId", node_copy.get("value"))
|
|
has_global = True
|
|
elif node_copy.get("name") == "LocalID":
|
|
node_copy["value"] = output.get("localId", node_copy.get("value"))
|
|
has_local = True
|
|
elif node_copy.get("name") == "Name":
|
|
node_copy["value"] = output.get("name", node_copy.get("value", ""))
|
|
has_name = True
|
|
elif node_copy.get("name") == "Description":
|
|
node_copy["value"] = output.get("description", node_copy.get("value", ""))
|
|
has_desc = True
|
|
elif node_copy.get("name") == "Enabled":
|
|
node_copy["value"] = True
|
|
children.append(node_copy)
|
|
else:
|
|
children = [
|
|
_make_bool("Enabled", True),
|
|
_make_int32("GlobalID", output.get("globalId", 0)),
|
|
_make_int32("LocalID", output.get("localId", 0)),
|
|
_make_string("Name", output.get("name", "")),
|
|
_make_string("Description", output.get("description", "")),
|
|
]
|
|
has_global = has_local = has_name = has_desc = True
|
|
|
|
if not has_global:
|
|
children.append(_make_int32("GlobalID", output.get("globalId", 0)))
|
|
if not has_local:
|
|
children.append(_make_int32("LocalID", output.get("localId", 0)))
|
|
if not has_name and output.get("name"):
|
|
children.append(_make_string("Name", output.get("name", "")))
|
|
if not has_desc and output.get("description"):
|
|
children.append(_make_string("Description", output.get("description", "")))
|
|
return children
|
|
|
|
|
|
def _apply_video_outputs_path(tree, outputs, path):
|
|
folder = _find_folder(tree, path)
|
|
if not folder:
|
|
return
|
|
template_children = None
|
|
for child in folder.get("children", []):
|
|
if child.get("type") == "folder":
|
|
template_children = child.get("children", [])
|
|
break
|
|
|
|
new_children = []
|
|
for idx, output in enumerate(outputs, start=1):
|
|
name = str(idx)
|
|
new_children.append({
|
|
"name": name,
|
|
"type": "folder",
|
|
"children": _build_video_output_children(template_children, output),
|
|
})
|
|
folder["children"] = new_children
|
|
|
|
|
|
def apply_video_outputs(tree, outputs):
|
|
if not isinstance(outputs, list) or not outputs:
|
|
return
|
|
cleaned = []
|
|
for row in outputs:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
local_id = row.get("localId")
|
|
global_id = row.get("globalId")
|
|
if local_id is None and global_id is None:
|
|
continue
|
|
cleaned.append({
|
|
"localId": local_id,
|
|
"globalId": global_id,
|
|
"name": row.get("name", ""),
|
|
"description": row.get("description", ""),
|
|
})
|
|
if not cleaned:
|
|
return
|
|
|
|
cleaned.sort(key=lambda x: (x.get("localId") or 0, x.get("globalId") or 0))
|
|
|
|
_apply_video_outputs_path(
|
|
tree,
|
|
cleaned,
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoOutputs"],
|
|
)
|
|
_apply_video_outputs_path(
|
|
tree,
|
|
cleaned,
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoOutputs"],
|
|
)
|
|
|
|
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
|
|
if geviio:
|
|
for child in geviio.get("children", []):
|
|
if _is_keyboard_interface(child):
|
|
_apply_video_outputs_path(
|
|
tree,
|
|
cleaned,
|
|
["Clients", "GeViIO", "GeViIO_01", child.get("name"), "VideoOutputs"],
|
|
)
|
|
|
|
|
|
def extract_logging(tree):
|
|
log_root = _find_folder(tree, ["Globals", "LogData"])
|
|
if not log_root:
|
|
return []
|
|
rows = []
|
|
for child in log_root.get("children", []):
|
|
if child.get("type") != "folder":
|
|
continue
|
|
action_name = str(child.get("name", ""))
|
|
db = _child_by_name(child, "DB")
|
|
user = _child_by_name(child, "User")
|
|
com = _child_by_name(child, "Com")
|
|
rows.append({
|
|
"actionName": action_name,
|
|
"db": bool(db.get("value")) if db else False,
|
|
"user": bool(user.get("value")) if user else False,
|
|
"com": bool(com.get("value")) if com else False,
|
|
})
|
|
return rows
|
|
|
|
|
|
def apply_logging(tree, rows):
|
|
if not isinstance(rows, list) or not rows:
|
|
return
|
|
globals_folder = _find_folder(tree, ["Globals"])
|
|
if not globals_folder:
|
|
return
|
|
log_root = _child_by_name(globals_folder, "LogData")
|
|
if not log_root:
|
|
log_root = {"name": "LogData", "type": "folder", "children": []}
|
|
globals_folder.setdefault("children", []).append(log_root)
|
|
|
|
existing = {}
|
|
for child in log_root.get("children", []):
|
|
if child.get("type") == "folder":
|
|
existing[str(child.get("name", ""))] = child
|
|
|
|
for row in rows:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
action_name = str(row.get("actionName", "")).strip()
|
|
if not action_name:
|
|
continue
|
|
target = existing.get(action_name)
|
|
if not target:
|
|
target = {"name": action_name, "type": "folder", "children": []}
|
|
log_root.setdefault("children", []).append(target)
|
|
existing[action_name] = target
|
|
children = target.setdefault("children", [])
|
|
_ensure_children_fields(
|
|
children,
|
|
[
|
|
_make_bool("Com", bool(row.get("com"))),
|
|
_make_bool("DB", bool(row.get("db"))),
|
|
_make_bool("User", bool(row.get("user"))),
|
|
],
|
|
)
|
|
for child in children:
|
|
if child.get("name") == "Com":
|
|
child["value"] = bool(row.get("com"))
|
|
elif child.get("name") == "DB":
|
|
child["value"] = bool(row.get("db"))
|
|
elif child.get("name") == "User":
|
|
child["value"] = bool(row.get("user"))
|
|
def _ensure_video_inputs_path(tree, global_ids, ptz_by_id, path):
|
|
folder = _find_folder(tree, path)
|
|
if not folder:
|
|
return
|
|
ptz_by_id = ptz_by_id or {}
|
|
|
|
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
|
|
existing = {}
|
|
max_name = 0
|
|
max_local_id = 0
|
|
template_children = None
|
|
|
|
for child in folder.get("children", []):
|
|
if child.get("type") != "folder":
|
|
continue
|
|
name = child.get("name")
|
|
try:
|
|
max_name = max(max_name, int(name))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
gid = _child_by_name(child, "GlobalID")
|
|
lid = _child_by_name(child, "LocalID")
|
|
if gid is not None:
|
|
gid_val = int(gid.get("value", 0))
|
|
existing[gid_val] = child
|
|
_ensure_children_fields(
|
|
child.get("children", []),
|
|
[
|
|
_make_bool("Active", True),
|
|
_make_string("Name", f"Video input{gid_val}"),
|
|
_make_string("Description", f"Video input{gid_val}"),
|
|
],
|
|
)
|
|
if gid_val in ptz_by_id:
|
|
ptz_value = bool(ptz_by_id[gid_val])
|
|
ptz_node = _child_by_name(child, "PTZ")
|
|
if ptz_node is not None:
|
|
ptz_node["value"] = ptz_value
|
|
ptz_link = _child_by_name(child, "- PTZ Linked")
|
|
if ptz_link is not None:
|
|
ptz_link["value"] = True
|
|
enabled_node = _child_by_name(child, "Enabled")
|
|
if enabled_node is not None:
|
|
enabled_node["value"] = True
|
|
active_node = _child_by_name(child, "Active")
|
|
if active_node is not None:
|
|
active_node["value"] = True
|
|
if lid is not None:
|
|
max_local_id = max(max_local_id, int(lid.get("value", 0)))
|
|
if template_children is None:
|
|
template_children = child.get("children", [])
|
|
|
|
if target_ids:
|
|
kept = []
|
|
for child in folder.get("children", []):
|
|
if child.get("type") != "folder":
|
|
kept.append(child)
|
|
continue
|
|
gid_node = _child_by_name(child, "GlobalID")
|
|
if gid_node is None:
|
|
kept.append(child)
|
|
continue
|
|
try:
|
|
gid_val = int(gid_node.get("value", 0))
|
|
except (TypeError, ValueError):
|
|
kept.append(child)
|
|
continue
|
|
if gid_val in target_ids:
|
|
kept.append(child)
|
|
folder["children"] = kept
|
|
|
|
# Reassign LocalID sequentially for consistency.
|
|
ordered = []
|
|
for child in kept:
|
|
if child.get("type") != "folder":
|
|
continue
|
|
gid_node = _child_by_name(child, "GlobalID")
|
|
if gid_node is None:
|
|
continue
|
|
try:
|
|
gid_val = int(gid_node.get("value", 0))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
ordered.append((gid_val, child))
|
|
ordered.sort(key=lambda item: item[0])
|
|
local_id = 1
|
|
for _, child in ordered:
|
|
lid_node = _child_by_name(child, "LocalID")
|
|
if lid_node is not None:
|
|
lid_node["value"] = local_id
|
|
local_id += 1
|
|
|
|
for gid in sorted(target_ids):
|
|
if gid in existing:
|
|
continue
|
|
max_name += 1
|
|
max_local_id += 1
|
|
if template_children:
|
|
children = []
|
|
has_name = False
|
|
has_desc = False
|
|
for node in template_children:
|
|
node_copy = copy.deepcopy(node)
|
|
if node_copy.get("name") == "GlobalID":
|
|
node_copy["value"] = gid
|
|
elif node_copy.get("name") == "LocalID":
|
|
node_copy["value"] = max_local_id
|
|
elif node_copy.get("name") == "Name":
|
|
node_copy["value"] = f"Video input{gid}"
|
|
has_name = True
|
|
elif node_copy.get("name") == "Description":
|
|
node_copy["value"] = f"Video input{gid}"
|
|
has_desc = True
|
|
elif node_copy.get("name") in ["PTZ", "Enabled", "- Linked", "- PTZ Linked", "Active"]:
|
|
if node_copy.get("name") == "PTZ":
|
|
node_copy["value"] = bool(ptz_by_id.get(gid, True))
|
|
else:
|
|
node_copy["value"] = True
|
|
children.append(node_copy)
|
|
if not has_name:
|
|
children.append(_make_string("Name", f"Video input{gid}"))
|
|
if not has_desc:
|
|
children.append(_make_string("Description", f"Video input{gid}"))
|
|
else:
|
|
ptz_value = bool(ptz_by_id.get(gid, True))
|
|
children = [
|
|
_make_bool("- Linked", True),
|
|
_make_bool("- PTZ Linked", True),
|
|
_make_bool("Active", True),
|
|
_make_bool("Enabled", True),
|
|
_make_int32("GlobalID", gid),
|
|
_make_int32("LocalID", max_local_id),
|
|
_make_string("Name", f"Video input{gid}"),
|
|
_make_string("Description", f"Video input{gid}"),
|
|
_make_bool("PTZ", ptz_value),
|
|
]
|
|
folder.get("children", []).append(
|
|
{"name": str(max_name), "type": "folder", "children": children}
|
|
)
|
|
|
|
|
|
def ensure_video_inputs(tree, global_ids, ptz_by_id=None):
|
|
ptz_by_id = ptz_by_id or {}
|
|
_ensure_video_inputs_path(
|
|
tree,
|
|
global_ids,
|
|
ptz_by_id,
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec", "VideoInputs"],
|
|
)
|
|
|
|
|
|
def _ensure_children_fields(children, required):
|
|
existing = {c.get("name") for c in children}
|
|
for node in required:
|
|
if node.get("name") not in existing:
|
|
children.append(node)
|
|
|
|
|
|
def _vx3_required_fields(gid, local_id, name_prefix, has_ptz=True):
|
|
return [
|
|
_make_int32("- Count Of MBeg Links", 3),
|
|
_make_bool("- Has PTZ", bool(has_ptz)),
|
|
_make_bool("Active", True),
|
|
_make_int32("DefaultTour", 0),
|
|
_make_string("Description", f"{name_prefix}{gid}"),
|
|
_make_bool("Enabled", True),
|
|
_make_int32("GlobalID", gid),
|
|
_make_int32("LocalID", local_id),
|
|
_make_string("Name", f"{name_prefix}{gid}"),
|
|
_make_bool("PictureDetection", True),
|
|
_make_bool("ReactionRescann", True),
|
|
_make_bool("ReactionSetup", True),
|
|
_make_int32("SignalType", 1),
|
|
_make_int32("StartupPosition", 0),
|
|
_make_int32("StartupState", 1),
|
|
_make_int32("StartupTour", 0),
|
|
_make_bool("SyncDetection", True),
|
|
_make_int32("ThresholdValue", 0),
|
|
]
|
|
|
|
|
|
def _ensure_camera_inputs(tree, path, global_ids, name_prefix, ptz_by_id=None):
|
|
folder = _find_folder(tree, path)
|
|
if not folder:
|
|
return
|
|
|
|
ptz_by_id = ptz_by_id or {}
|
|
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
|
|
existing = {}
|
|
max_name = 0
|
|
max_local_id = 0
|
|
template_children = None
|
|
|
|
for child in folder.get("children", []):
|
|
if child.get("type") != "folder":
|
|
continue
|
|
name = child.get("name")
|
|
try:
|
|
max_name = max(max_name, int(name))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
gid = _child_by_name(child, "GlobalID")
|
|
lid = _child_by_name(child, "LocalID")
|
|
if gid is not None:
|
|
existing[int(gid.get("value", 0))] = child
|
|
if lid is not None:
|
|
max_local_id = max(max_local_id, int(lid.get("value", 0)))
|
|
if template_children is None:
|
|
template_children = child.get("children", [])
|
|
|
|
if target_ids:
|
|
kept = []
|
|
for child in folder.get("children", []):
|
|
if child.get("type") != "folder":
|
|
kept.append(child)
|
|
continue
|
|
gid_node = _child_by_name(child, "GlobalID")
|
|
if gid_node is None:
|
|
kept.append(child)
|
|
continue
|
|
try:
|
|
gid_val = int(gid_node.get("value", 0))
|
|
except (TypeError, ValueError):
|
|
kept.append(child)
|
|
continue
|
|
if gid_val in target_ids:
|
|
kept.append(child)
|
|
folder["children"] = kept
|
|
|
|
for child in folder.get("children", []):
|
|
if child.get("type") != "folder":
|
|
continue
|
|
gid = _child_by_name(child, "GlobalID")
|
|
lid = _child_by_name(child, "LocalID")
|
|
if gid is None or lid is None:
|
|
continue
|
|
required = _vx3_required_fields(
|
|
int(gid.get("value", 0)),
|
|
int(lid.get("value", 0)),
|
|
name_prefix,
|
|
ptz_by_id.get(int(gid.get("value", 0)), True),
|
|
)
|
|
_ensure_children_fields(child.get("children", []), required)
|
|
ptz_node = _child_by_name(child, "- Has PTZ")
|
|
if ptz_node is not None:
|
|
ptz_node["value"] = bool(ptz_by_id.get(int(gid.get("value", 0)), True))
|
|
enabled_node = _child_by_name(child, "Enabled")
|
|
if enabled_node is not None:
|
|
enabled_node["value"] = True
|
|
active_node = _child_by_name(child, "Active")
|
|
if active_node is not None:
|
|
active_node["value"] = True
|
|
|
|
for gid in sorted(target_ids):
|
|
if gid in existing:
|
|
continue
|
|
max_name += 1
|
|
max_local_id += 1
|
|
if template_children:
|
|
children = []
|
|
for node in template_children:
|
|
node_copy = copy.deepcopy(node)
|
|
if node_copy.get("name") == "GlobalID":
|
|
node_copy["value"] = gid
|
|
elif node_copy.get("name") == "LocalID":
|
|
node_copy["value"] = max_local_id
|
|
elif node_copy.get("name") in {"Name", "Description"}:
|
|
node_copy["value"] = f"{name_prefix}{gid}"
|
|
elif node_copy.get("name") == "- Has PTZ":
|
|
node_copy["value"] = bool(ptz_by_id.get(gid, True))
|
|
elif node_copy.get("name") in {"Enabled", "Active"}:
|
|
node_copy["value"] = True
|
|
children.append(node_copy)
|
|
else:
|
|
has_ptz = bool(ptz_by_id.get(gid, True))
|
|
children = [
|
|
_make_bool("- Has PTZ", has_ptz),
|
|
_make_bool("Active", True),
|
|
_make_string("Description", f"{name_prefix}{gid}"),
|
|
_make_bool("Enabled", True),
|
|
_make_int32("GlobalID", gid),
|
|
_make_int32("LocalID", max_local_id),
|
|
_make_string("Name", f"{name_prefix}{gid}"),
|
|
]
|
|
required = _vx3_required_fields(gid, max_local_id, name_prefix, ptz_by_id.get(gid, True))
|
|
_ensure_children_fields(children, required)
|
|
folder.get("children", []).append(
|
|
{"name": str(max_name), "type": "folder", "children": children}
|
|
)
|
|
|
|
|
|
def ensure_vx3_video_inputs(tree, global_ids, ptz_by_id=None):
|
|
_ensure_camera_inputs(
|
|
tree,
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualVX3", "VideoInputs"],
|
|
global_ids,
|
|
"Video input",
|
|
ptz_by_id,
|
|
)
|
|
_ensure_camera_inputs(
|
|
tree,
|
|
["Clients", "GeViIO", "GeViIO_Virtual", "VirtualCX3", "VideoInputs"],
|
|
global_ids,
|
|
"Video input",
|
|
ptz_by_id,
|
|
)
|
|
|
|
|
|
def ensure_global_video_inputs(tree, global_ids, ptz_by_id=None):
|
|
ptz_by_id = ptz_by_id or {}
|
|
geviio = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
|
|
if geviio:
|
|
for child in geviio.get("children", []):
|
|
if child.get("type") != "folder":
|
|
continue
|
|
name = str(child.get("name", ""))
|
|
if not name.startswith("MBeg"):
|
|
continue
|
|
_ensure_video_inputs_path(
|
|
tree,
|
|
global_ids,
|
|
ptz_by_id,
|
|
["Clients", "GeViIO", "GeViIO_01", name, "VideoInputs"],
|
|
)
|
|
return
|
|
_ensure_video_inputs_path(
|
|
tree,
|
|
global_ids,
|
|
ptz_by_id,
|
|
["Clients", "GeViIO", "GeViIO_01", "MBeg", "VideoInputs"],
|
|
)
|
|
_ensure_video_inputs_path(
|
|
tree,
|
|
global_ids,
|
|
ptz_by_id,
|
|
["Clients", "GeViIO", "GeViIO_01", "MBeg2", "VideoInputs"],
|
|
)
|
|
|
|
|
|
def prune_video_inputs(tree, global_ids):
|
|
target_ids = {int(x) for x in global_ids if isinstance(x, int) and x > 0}
|
|
if not target_ids:
|
|
return
|
|
|
|
def walk(node):
|
|
if not isinstance(node, dict) or node.get("type") != "folder":
|
|
return
|
|
if node.get("name") == "VideoInputs":
|
|
kept = []
|
|
for child in node.get("children", []):
|
|
if child.get("type") != "folder":
|
|
kept.append(child)
|
|
continue
|
|
gid_node = _child_by_name(child, "GlobalID")
|
|
if gid_node is None:
|
|
kept.append(child)
|
|
continue
|
|
try:
|
|
gid_val = int(gid_node.get("value", 0))
|
|
except (TypeError, ValueError):
|
|
kept.append(child)
|
|
continue
|
|
if gid_val in target_ids:
|
|
kept.append(child)
|
|
node["children"] = kept
|
|
|
|
ordered = []
|
|
for child in kept:
|
|
if child.get("type") != "folder":
|
|
continue
|
|
gid_node = _child_by_name(child, "GlobalID")
|
|
if gid_node is None:
|
|
continue
|
|
try:
|
|
gid_val = int(gid_node.get("value", 0))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
ordered.append((gid_val, child))
|
|
ordered.sort(key=lambda item: item[0])
|
|
local_id = 1
|
|
for _, child in ordered:
|
|
lid_node = _child_by_name(child, "LocalID")
|
|
if lid_node is not None:
|
|
lid_node["value"] = local_id
|
|
local_id += 1
|
|
for child in node.get("children", []):
|
|
walk(child)
|
|
|
|
walk(tree)
|
|
|
|
|
|
def _set_child_value(folder, name, value):
|
|
node = _child_by_name(folder, name)
|
|
if node is not None:
|
|
node["value"] = value
|
|
|
|
|
|
def _replace_named_children(folder, prefix, replacements):
|
|
children = folder.get("children", [])
|
|
first_idx = None
|
|
for idx, child in enumerate(children):
|
|
if str(child.get("name", "")).startswith(prefix):
|
|
first_idx = idx
|
|
break
|
|
if first_idx is None:
|
|
first_idx = len(children)
|
|
kept = []
|
|
for idx, child in enumerate(children):
|
|
if idx < first_idx:
|
|
kept.append(child)
|
|
continue
|
|
if str(child.get("name", "")).startswith(prefix):
|
|
continue
|
|
kept.append(child)
|
|
folder["children"] = kept[:first_idx] + replacements + kept[first_idx:]
|
|
|
|
|
|
def _set_mbeg_link_counts(tree, count):
|
|
def walk(node):
|
|
if not isinstance(node, dict):
|
|
return
|
|
if node.get("name") == "- Count Of MBeg Links":
|
|
node["value"] = int(count)
|
|
if node.get("type") == "folder":
|
|
for child in node.get("children", []):
|
|
walk(child)
|
|
walk(tree)
|
|
|
|
|
|
def _is_keyboard_interface(node):
|
|
if not isinstance(node, dict) or node.get("type") != "folder":
|
|
return False
|
|
if _child_by_name(node, "SerialParameters") is None:
|
|
return False
|
|
if _child_by_name(node, "VideoInputs") is None:
|
|
return False
|
|
type_node = _child_by_name(node, "Type")
|
|
try:
|
|
return int(type_node.get("value", 0)) == 5 if type_node else False
|
|
except (TypeError, ValueError):
|
|
return False
|
|
|
|
|
|
def _replace_keyboard_children(folder, new_children):
|
|
children = folder.get("children", [])
|
|
first_idx = None
|
|
kept = []
|
|
for idx, child in enumerate(children):
|
|
if _is_keyboard_interface(child):
|
|
if first_idx is None:
|
|
first_idx = idx
|
|
continue
|
|
kept.append(child)
|
|
if first_idx is None:
|
|
first_idx = len(kept)
|
|
folder["children"] = kept[:first_idx] + new_children + kept[first_idx:]
|
|
|
|
|
|
def _replace_keyboard_users(users, new_children):
|
|
children = users.get("children", [])
|
|
first_idx = None
|
|
kept = []
|
|
for idx, child in enumerate(children):
|
|
if child.get("type") == "folder" and child.get("name") not in {
|
|
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
|
|
}:
|
|
if first_idx is None:
|
|
first_idx = idx
|
|
continue
|
|
kept.append(child)
|
|
if first_idx is None:
|
|
first_idx = len(kept)
|
|
users["children"] = kept[:first_idx] + new_children + kept[first_idx:]
|
|
|
|
|
|
def apply_keyboards(tree, keyboards):
|
|
if not isinstance(keyboards, list):
|
|
return
|
|
entries = []
|
|
for item in keyboards:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
description = str(item.get("description", "")).strip()
|
|
host = str(item.get("host", "")).strip()
|
|
port = item.get("port")
|
|
if not host or port in (None, ""):
|
|
continue
|
|
try:
|
|
port_int = int(port)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
entries.append({
|
|
"name": str(item.get("name", "")).strip(),
|
|
"description": description,
|
|
"host": host,
|
|
"port": port_int,
|
|
})
|
|
if not entries:
|
|
return
|
|
|
|
client = _find_folder(tree, ["Clients", "GeViIO", "GeViIO_01"])
|
|
if not client:
|
|
raise ValueError("Clients/GeViIO/GeViIO_01 not found in the .set file")
|
|
users = _find_folder(tree, ["Users"])
|
|
if not users:
|
|
raise ValueError("Users folder not found in the .set file")
|
|
|
|
template = _child_by_name(client, "MBeg")
|
|
if not template:
|
|
raise ValueError("MBeg interface not found in the .set file")
|
|
template_user = _child_by_name(users, "MBeg")
|
|
if not template_user:
|
|
raise ValueError("Users/MBeg not found in the .set file")
|
|
|
|
base_interface_id = _child_by_name(template, "InterfaceID")
|
|
try:
|
|
base_interface_id = int(base_interface_id.get("value", 1)) if base_interface_id else 1
|
|
except (TypeError, ValueError):
|
|
base_interface_id = 1
|
|
base_user_id = _child_by_name(template_user, "UserID")
|
|
try:
|
|
base_user_id = int(base_user_id.get("value", 1)) if base_user_id else 1
|
|
except (TypeError, ValueError):
|
|
base_user_id = 1
|
|
|
|
existing_interfaces = [c for c in client.get("children", []) if _is_keyboard_interface(c)]
|
|
existing_users = [
|
|
c for c in users.get("children", [])
|
|
if c.get("type") == "folder" and c.get("name") not in {
|
|
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
|
|
}
|
|
]
|
|
user_by_name = {c.get("name"): c for c in existing_users}
|
|
host_to_iface = {}
|
|
for iface in existing_interfaces:
|
|
serial = _child_by_name(iface, "SerialParameters")
|
|
host_node = _child_by_name(serial, "IpHost") if serial else None
|
|
host = str(host_node.get("value", "")).strip() if host_node else ""
|
|
if host:
|
|
host_to_iface[host] = iface
|
|
|
|
template_iface = existing_interfaces[0] if existing_interfaces else template
|
|
template_user = user_by_name.get(template_iface.get("name")) or template_user
|
|
|
|
virtual_kdec = _find_folder(
|
|
tree, ["Clients", "GeViIO", "GeViIO_Virtual", "VirtualKDec"]
|
|
)
|
|
virtual_kdec_id = None
|
|
if virtual_kdec:
|
|
kid = _child_by_name(virtual_kdec, "InterfaceID")
|
|
try:
|
|
virtual_kdec_id = int(kid.get("value", 0)) if kid else None
|
|
except (TypeError, ValueError):
|
|
virtual_kdec_id = None
|
|
|
|
virtual_user = _child_by_name(users, "VirtualKDec")
|
|
virtual_user_id = None
|
|
if virtual_user:
|
|
uid = _child_by_name(virtual_user, "UserID")
|
|
try:
|
|
virtual_user_id = int(uid.get("value", 0)) if uid else None
|
|
except (TypeError, ValueError):
|
|
virtual_user_id = None
|
|
|
|
start_interface_id = (virtual_kdec_id or base_interface_id) + 1
|
|
start_user_id = (virtual_user_id or base_user_id) + 1
|
|
|
|
new_ifaces = []
|
|
new_users = []
|
|
for idx, entry in enumerate(entries, start=1):
|
|
iface = host_to_iface.get(entry["host"])
|
|
iface = copy.deepcopy(iface) if iface else copy.deepcopy(template_iface)
|
|
iface_name = entry.get("name") or f"Mbeg{idx}"
|
|
iface["name"] = iface_name
|
|
_set_child_value(iface, "Enabled", True)
|
|
_set_child_value(iface, "Protocol", 0)
|
|
_set_child_value(iface, "MBegMode", True)
|
|
_set_child_value(iface, "InterfaceID", start_interface_id + idx - 1)
|
|
serial = _child_by_name(iface, "SerialParameters")
|
|
if serial:
|
|
_set_child_value(serial, "IpHost", entry["host"])
|
|
_set_child_value(serial, "IpPort", entry["port"])
|
|
new_ifaces.append(iface)
|
|
|
|
user = user_by_name.get(iface_name)
|
|
user = copy.deepcopy(user) if user else copy.deepcopy(template_user)
|
|
user["name"] = iface_name
|
|
_set_child_value(user, "Username", iface_name)
|
|
_set_child_value(user, "Description", "")
|
|
_set_child_value(user, "UserID", start_user_id + idx - 1)
|
|
new_users.append(user)
|
|
|
|
_replace_keyboard_children(client, new_ifaces)
|
|
_replace_keyboard_users(users, new_users)
|
|
|
|
if virtual_kdec:
|
|
serial = _child_by_name(virtual_kdec, "SerialParameters")
|
|
if serial:
|
|
_set_child_value(serial, "IpHost", entries[0]["host"])
|
|
_set_child_value(serial, "IpPort", entries[0]["port"])
|
|
_set_child_value(serial, "ConnectionType", 1)
|
|
_set_child_value(serial, "PortNr", 0)
|
|
|
|
virtual_user = _child_by_name(users, "VirtualKDec")
|
|
if virtual_user:
|
|
_set_child_value(virtual_user, "Description", "")
|
|
|
|
mbeg_entries = [c for c in client.get("children", []) if _is_keyboard_interface(c)]
|
|
mbeg_entries.sort(key=lambda c: str(c.get("name", "")))
|
|
for iface in mbeg_entries:
|
|
_set_child_value(iface, "MBegMode", True)
|
|
|
|
mbeg_users = [
|
|
c for c in users.get("children", [])
|
|
if c.get("type") == "folder" and c.get("name") not in {
|
|
"GeViIO_01", "GeViIO_Virtual", "VirtualKDec", "VirtualVX3"
|
|
}
|
|
]
|
|
mbeg_users.sort(key=lambda c: str(c.get("name", "")))
|
|
|
|
total_mbeg = len(mbeg_entries)
|
|
_set_mbeg_link_counts(tree, total_mbeg)
|
|
|
|
def main(argv=None):
|
|
parser = argparse.ArgumentParser(description="Convert GeViSoft .set files to/from JSON.")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
dump = sub.add_parser("dump", help="Parse a .set file into JSON.")
|
|
dump.add_argument("input", help="Path to .set file")
|
|
dump.add_argument("output", nargs="?", help="Optional path for JSON output (stdout if omitted)")
|
|
dump.set_defaults(func=dump_command)
|
|
|
|
build = sub.add_parser("build", help="Create a .set file from JSON produced by dump.")
|
|
build.add_argument("input", help="Path to JSON file")
|
|
build.add_argument("output", help="Path to write .set file")
|
|
build.set_defaults(func=build_command)
|
|
|
|
args = parser.parse_args(argv)
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|