Simplify UI and add batch build endpoint

This commit is contained in:
klas
2026-02-10 09:58:11 +01:00
commit dee991a51a
83 changed files with 7043 additions and 0 deletions

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@


467
backend/app/excel.py Normal file
View File

@@ -0,0 +1,467 @@
from io import BytesIO
from typing import Any, Dict, List, Tuple
import openpyxl
SERVER_HEADER_ALIASES = {
"hostname": "alias",
"alias": "alias",
"typ": "type",
"type": "type",
"ip server": "host",
"ip": "host",
"server": "host",
"username": "user",
"user": "user",
"password": "password",
}
KEYBOARD_HEADER_ALIASES = {
"description": "description",
"name": "name",
"ip address": "host",
"ip": "host",
"host": "host",
"ip host": "host",
"port": "port",
"ip port": "port",
}
def _clean(value):
if value is None:
return ""
return str(value).strip()
def _normalize_type(value: str) -> str:
lowered = value.lower()
if "geviscope" in lowered or "gsc" in lowered:
return "geviscope"
if "g-core" in lowered or "gcore" in lowered:
return "gcore"
return "gcore"
def _load_sheets(contents: bytes):
values_wb = openpyxl.load_workbook(BytesIO(contents), data_only=True)
formulas_wb = openpyxl.load_workbook(BytesIO(contents), data_only=False)
return values_wb.active, formulas_wb.active
def _cell_value(values_sheet, formulas_sheet, row: int, col: int) -> Tuple[Any, bool]:
if not col or col < 1:
return None, False
value = values_sheet.cell(row=row, column=col).value
if value is None:
fcell = formulas_sheet.cell(row=row, column=col)
if fcell.data_type == "f":
return None, True
return value, False
def parse_servers_xlsx(contents: bytes) -> Dict[str, Any]:
sheet, formula_sheet = _load_sheets(contents)
formula_missing = 0
header_row = None
for row in range(1, min(15, sheet.max_row + 1)):
for col in range(1, min(10, sheet.max_column + 1)):
value, is_formula = _cell_value(sheet, formula_sheet, row, col)
formula_missing += 1 if is_formula else 0
cell = _clean(value).lower()
if cell == "hostname":
header_row = row
break
if header_row:
break
if not header_row:
raise ValueError("Could not find header row with 'Hostname'")
header_map = {}
for col in range(1, sheet.max_column + 1):
value, is_formula = _cell_value(sheet, formula_sheet, header_row, col)
formula_missing += 1 if is_formula else 0
name = _clean(value).lower()
if name in SERVER_HEADER_ALIASES:
header_map[SERVER_HEADER_ALIASES[name]] = col
servers = []
skipped = 0
for row in range(header_row + 1, sheet.max_row + 1):
alias_value, alias_formula = _cell_value(sheet, formula_sheet, row, header_map.get("alias", 0))
host_value, host_formula = _cell_value(sheet, formula_sheet, row, header_map.get("host", 0))
formula_missing += int(alias_formula) + int(host_formula)
alias = _clean(alias_value)
host = _clean(host_value)
if not alias or not host:
skipped += 1
continue
type_value, type_formula = _cell_value(sheet, formula_sheet, row, header_map.get("type", 0))
user_value, user_formula = _cell_value(sheet, formula_sheet, row, header_map.get("user", 0))
pass_value, pass_formula = _cell_value(sheet, formula_sheet, row, header_map.get("password", 0))
formula_missing += int(type_formula) + int(user_formula) + int(pass_formula)
server_type = _normalize_type(_clean(type_value))
user = _clean(user_value) or "sysadmin"
password = _clean(pass_value)
servers.append({
"alias": alias,
"host": host,
"user": user,
"password": password,
"type": server_type,
"enabled": True,
"deactivateEcho": False,
"deactivateLiveCheck": False,
})
return {"servers": servers, "skipped": skipped, "formula_cells_missing": formula_missing}
def parse_actions_xlsx(contents: bytes) -> Dict[str, Any]:
sheet, formula_sheet = _load_sheets(contents)
formula_missing = 0
header_row = None
for row in range(1, min(15, sheet.max_row + 1)):
for col in range(1, min(20, sheet.max_column + 1)):
value, is_formula = _cell_value(sheet, formula_sheet, row, col)
formula_missing += 1 if is_formula else 0
if _clean(value).lower() == "camera id":
header_row = row
break
if header_row:
break
if not header_row:
raise ValueError("Could not find header row with 'Camera ID'")
headers = {}
for col in range(1, sheet.max_column + 1):
value, is_formula = _cell_value(sheet, formula_sheet, header_row, col)
formula_missing += 1 if is_formula else 0
name = _clean(value)
if name:
headers.setdefault(name, []).append(col)
def col(name, idx=0):
values = headers.get(name, [])
if idx < len(values):
return values[idx]
return None
compact_mode = all(
name in headers
for name in [
"Input Action",
"Input Category",
"Input Caption",
"Output Action",
"Output Caption",
"Output Server Alias",
]
)
rows: List[Dict[str, Any]] = []
for row in range(header_row + 1, sheet.max_row + 1):
camera_value, camera_formula = _cell_value(sheet, formula_sheet, row, col("Camera ID"))
formula_missing += 1 if camera_formula else 0
camera_id = _clean(camera_value)
if not camera_id:
continue
if compact_mode:
input_caption = _clean(_cell_value(sheet, formula_sheet, row, col("Input Caption"))[0])
output_server = _clean(_cell_value(sheet, formula_sheet, row, col("Output Server Alias"))[0])
entry = {
"cameraId": camera_id,
"server": _clean(_cell_value(sheet, formula_sheet, row, col("Server"))[0]) or output_server,
"serverType": _clean(_cell_value(sheet, formula_sheet, row, col("Server Type"))[0]),
"ptz": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ"))[0]),
"caption": input_caption,
"outputs": [
{
"kind": "output1",
"category": _clean(_cell_value(sheet, formula_sheet, row, col("Input Category"))[0]),
"action": _clean(_cell_value(sheet, formula_sheet, row, col("Input Action"))[0]),
"videoInput": camera_id,
},
],
}
server_type = _clean(_cell_value(sheet, formula_sheet, row, col("Server Type"))[0]).lower()
output_action = _clean(_cell_value(sheet, formula_sheet, row, col("Output Action"))[0])
output_caption = _clean(_cell_value(sheet, formula_sheet, row, col("Output Caption"))[0])
output_entry = {
"category": "",
"action": output_action,
"caption": output_caption,
"server": output_server,
"ptzHead": camera_id,
"speed": "",
}
if "g-core" in server_type or "gcore" in server_type or "gng" in server_type:
output_entry["kind"] = "gcore"
else:
output_entry["kind"] = "gsc"
entry["outputs"].append(output_entry)
else:
input_value, input_formula = _cell_value(sheet, formula_sheet, row, col("Caption", 0))
formula_missing += 1 if input_formula else 0
input_caption = _clean(input_value)
entry = {
"cameraId": camera_id,
"server": _clean(_cell_value(sheet, formula_sheet, row, col("Server"))[0]),
"serverType": _clean(_cell_value(sheet, formula_sheet, row, col("Server Type"))[0]),
"ptz": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ"))[0]),
"caption": input_caption,
"outputs": [
{
"kind": "output1",
"category": _clean(_cell_value(sheet, formula_sheet, row, col("Category", 0))[0]),
"action": _clean(_cell_value(sheet, formula_sheet, row, col("Action", 0))[0]),
"videoInput": _clean(_cell_value(sheet, formula_sheet, row, col("VideoInput", 0))[0]),
},
{
"kind": "gsc",
"category": _clean(_cell_value(sheet, formula_sheet, row, col("Category", 1))[0]),
"action": _clean(_cell_value(sheet, formula_sheet, row, col("Action", 1))[0]),
"caption": _clean(_cell_value(sheet, formula_sheet, row, col("Caption", 1))[0]),
"server": _clean(_cell_value(sheet, formula_sheet, row, col("GeviScope alias", 0))[0]),
"ptzHead": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ head", 0))[0]),
"speed": _clean(_cell_value(sheet, formula_sheet, row, col("speed", 0))[0]),
},
{
"kind": "gcore",
"category": _clean(_cell_value(sheet, formula_sheet, row, col("Category", 2))[0]),
"action": _clean(_cell_value(sheet, formula_sheet, row, col("Action", 2))[0]),
"caption": _clean(_cell_value(sheet, formula_sheet, row, col("Caption", 2))[0]),
"server": _clean(_cell_value(sheet, formula_sheet, row, col("G-Core alias", 0))[0]),
"ptzHead": _clean(_cell_value(sheet, formula_sheet, row, col("PTZ head", 1))[0]),
"speed": _clean(_cell_value(sheet, formula_sheet, row, col("speed", 1))[0]),
},
],
}
rows.append(entry)
return {"rows": rows, "formula_cells_missing": formula_missing}
def parse_keyboards_xlsx(contents: bytes) -> Dict[str, Any]:
sheet, formula_sheet = _load_sheets(contents)
formula_missing = 0
header_row = None
for row in range(1, min(15, sheet.max_row + 1)):
for col in range(1, min(10, sheet.max_column + 1)):
value, is_formula = _cell_value(sheet, formula_sheet, row, col)
formula_missing += 1 if is_formula else 0
cell = _clean(value).lower()
if cell in ("description", "name"):
header_row = row
break
if header_row:
break
if not header_row:
raise ValueError("Could not find header row with 'Name' or 'Description'")
header_map = {}
for col in range(1, sheet.max_column + 1):
value, is_formula = _cell_value(sheet, formula_sheet, header_row, col)
formula_missing += 1 if is_formula else 0
name = _clean(value).lower()
if name in KEYBOARD_HEADER_ALIASES:
header_map[KEYBOARD_HEADER_ALIASES[name]] = col
keyboards = []
skipped = 0
for row in range(header_row + 1, sheet.max_row + 1):
name_value, name_formula = _cell_value(sheet, formula_sheet, row, header_map.get("name", 0))
desc_value, desc_formula = _cell_value(sheet, formula_sheet, row, header_map.get("description", 0))
host_value, host_formula = _cell_value(sheet, formula_sheet, row, header_map.get("host", 0))
port_value, port_formula = _cell_value(sheet, formula_sheet, row, header_map.get("port", 0))
formula_missing += int(name_formula) + int(desc_formula) + int(host_formula) + int(port_formula)
name = _clean(name_value)
description = _clean(desc_value)
host = _clean(host_value)
port_raw = _clean(port_value)
if not name and not description and not host and not port_raw:
skipped += 1
continue
if not host or not port_raw:
skipped += 1
continue
try:
port = int(float(port_raw))
except ValueError:
skipped += 1
continue
keyboards.append({"name": name, "description": description, "host": host, "port": port})
return {"keyboards": keyboards, "skipped": skipped, "formula_cells_missing": formula_missing}
def parse_video_outputs_xlsx(contents: bytes) -> Dict[str, Any]:
sheet, formula_sheet = _load_sheets(contents)
formula_missing = 0
header_row = None
for row in range(1, min(15, sheet.max_row + 1)):
for col in range(1, min(10, sheet.max_column + 1)):
value, is_formula = _cell_value(sheet, formula_sheet, row, col)
formula_missing += 1 if is_formula else 0
cell = _clean(value).lower().replace(" ", "")
if cell in ("localid", "local_id", "local"):
header_row = row
break
if header_row:
break
if not header_row:
raise ValueError("Could not find header row with 'Local ID'")
header_map = {}
for col in range(1, sheet.max_column + 1):
value, is_formula = _cell_value(sheet, formula_sheet, header_row, col)
formula_missing += 1 if is_formula else 0
name = _clean(value).lower().replace(" ", "")
if name in ("localid", "local_id", "local"):
header_map["localId"] = col
elif name in ("globalid", "global_id", "global"):
header_map["globalId"] = col
elif name == "name":
header_map["name"] = col
elif name in ("description", "desc"):
header_map["description"] = col
outputs = []
skipped = 0
for row in range(header_row + 1, sheet.max_row + 1):
local_value, local_formula = _cell_value(sheet, formula_sheet, row, header_map.get("localId", 0))
global_value, global_formula = _cell_value(sheet, formula_sheet, row, header_map.get("globalId", 0))
name_value, name_formula = _cell_value(sheet, formula_sheet, row, header_map.get("name", 0))
desc_value, desc_formula = _cell_value(sheet, formula_sheet, row, header_map.get("description", 0))
formula_missing += int(local_formula) + int(global_formula) + int(name_formula) + int(desc_formula)
local_raw = _clean(local_value)
global_raw = _clean(global_value)
if not local_raw and not global_raw and not name_value and not desc_value:
skipped += 1
continue
try:
local_id = int(float(local_raw)) if local_raw else None
except ValueError:
local_id = None
try:
global_id = int(float(global_raw)) if global_raw else None
except ValueError:
global_id = None
outputs.append({
"localId": local_id,
"globalId": global_id,
"name": _clean(name_value),
"description": _clean(desc_value),
})
return {"outputs": outputs, "skipped": skipped, "formula_cells_missing": formula_missing}
def build_video_outputs_xlsx(outputs: List[Dict[str, Any]]) -> bytes:
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "VideoOutputs"
ws.append(["Local ID", "Global ID", "Name", "Description"])
for row in outputs:
ws.append([
row.get("localId"),
row.get("globalId"),
row.get("name", ""),
row.get("description", ""),
])
out = BytesIO()
wb.save(out)
return out.getvalue()
def build_logging_xlsx(rows: List[Dict[str, Any]]) -> bytes:
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Logging"
ws.append(["Action name", "Log into database", "Notify user", "Notify GeViCom"])
for row in rows:
ws.append([
row.get("actionName", ""),
"X" if row.get("db") else "",
"X" if row.get("user") else "",
"X" if row.get("com") else "",
])
out = BytesIO()
wb.save(out)
return out.getvalue()
def parse_logging_xlsx(contents: bytes) -> Dict[str, Any]:
sheet, formula_sheet = _load_sheets(contents)
formula_missing = 0
header_row = None
for row in range(1, min(15, sheet.max_row + 1)):
for col in range(1, min(10, sheet.max_column + 1)):
value, is_formula = _cell_value(sheet, formula_sheet, row, col)
formula_missing += 1 if is_formula else 0
cell = _clean(value).lower().replace(" ", "")
if cell in ("actionname", "action", "actionid", "action_id"):
header_row = row
break
if header_row:
break
if not header_row:
raise ValueError("Could not find header row with 'Action name'")
header_map = {}
for col in range(1, sheet.max_column + 1):
value, is_formula = _cell_value(sheet, formula_sheet, header_row, col)
formula_missing += 1 if is_formula else 0
name = _clean(value).lower().replace(" ", "")
if name in ("actionname", "action", "actionid", "action_id"):
header_map["actionName"] = col
elif name in ("logintodatabase", "logtodatabase", "database", "db"):
header_map["db"] = col
elif name in ("notifyuser", "user"):
header_map["user"] = col
elif name in ("notifygevicom", "gevicom", "com"):
header_map["com"] = col
def _to_bool(value: Any) -> bool:
if value is None:
return False
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
text = _clean(value).lower()
return text in ("x", "yes", "true", "1", "y")
rows = []
skipped = 0
for row in range(header_row + 1, sheet.max_row + 1):
action_value, action_formula = _cell_value(sheet, formula_sheet, row, header_map.get("actionName", 0))
db_value, db_formula = _cell_value(sheet, formula_sheet, row, header_map.get("db", 0))
user_value, user_formula = _cell_value(sheet, formula_sheet, row, header_map.get("user", 0))
com_value, com_formula = _cell_value(sheet, formula_sheet, row, header_map.get("com", 0))
formula_missing += int(action_formula) + int(db_formula) + int(user_formula) + int(com_formula)
action_name = _clean(action_value)
if not action_name and not db_value and not user_value and not com_value:
skipped += 1
continue
if not action_name:
skipped += 1
continue
rows.append({
"actionName": action_name,
"db": _to_bool(db_value),
"user": _to_bool(user_value),
"com": _to_bool(com_value),
})
return {"rows": rows, "skipped": skipped, "formula_cells_missing": formula_missing}

1398
backend/app/geviset.py Normal file

File diff suppressed because it is too large Load Diff

353
backend/app/main.py Normal file
View File

@@ -0,0 +1,353 @@
import os
import re
import copy
import tempfile
import subprocess
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import json
from fastapi.staticfiles import StaticFiles
from . import geviset
from . import excel
from pathlib import Path
app = FastAPI(title="GeViSet Web")
_LAST_TREE = None
_ROOT_DIR = Path(__file__).resolve().parents[2]
_SCRIPT_PATH = _ROOT_DIR / "scripts" / "build_mapping.js"
_TEMPLATE_RULES = _ROOT_DIR / "frontend" / "src" / "templates" / "templateRules.json"
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/health")
async def health():
return {"ok": True}
@app.post("/api/set/parse")
async def parse_set(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".set"):
raise HTTPException(status_code=400, detail="Only .set files are supported")
contents = await file.read()
try:
tree = geviset.load_set_bytes(contents)
mapping = geviset.extract_mapping(tree)
gcore = geviset.extract_gcore(tree)
gsc = geviset.extract_gsc(tree)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
global _LAST_TREE
_LAST_TREE = tree
return {
"filename": file.filename,
"token": "last",
"tree": tree,
"mapping": mapping,
"gcore": gcore,
"gsc": gsc,
}
@app.post("/api/set/export")
async def export_set(payload: dict):
try:
tree = payload.get("tree")
token = payload.get("token")
if tree is None:
if token == "last" and _LAST_TREE is not None:
tree = copy.deepcopy(_LAST_TREE)
else:
raise ValueError("Missing tree in payload")
if "mapping" in payload:
geviset.apply_mapping(tree, payload["mapping"])
mapping_for_ids = geviset.extract_mapping(tree)
camera_ids = set()
ptz_by_id = {}
for rule in payload["mapping"].get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0 and isinstance(inp.get("ptz"), bool):
ptz_by_id[vid] = inp["ptz"]
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match and isinstance(inp.get("ptz"), bool):
ptz_by_id[int(match.group(1))] = inp["ptz"]
for rule in mapping_for_ids.get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0:
camera_ids.add(vid)
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match:
camera_ids.add(int(match.group(1)))
for field in rule.get("fields", []):
if field.get("name") == "VideoInput":
val = field.get("value")
if isinstance(val, int) and val > 0:
camera_ids.add(val)
elif isinstance(val, str) and val.isdigit():
camera_ids.add(int(val))
for out in rule.get("outputs", []):
action = out.get("action", "")
match = re.search(r"Camera:\\s*(\\d+)", action)
if match:
camera_ids.add(int(match.group(1)))
geviset.ensure_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_global_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_vx3_video_inputs(tree, camera_ids, ptz_by_id)
geviset.prune_video_inputs(tree, camera_ids)
print(
f"EXPORT camera_ids={len(camera_ids)} contains_101027={101027 in camera_ids}",
flush=True,
)
if "gcore" in payload:
geviset.apply_gcore(tree, payload["gcore"])
if "gsc" in payload:
geviset.apply_gsc(tree, payload["gsc"])
if "keyboards" in payload:
geviset.apply_keyboards(tree, payload["keyboards"])
if "videoOutputs" in payload:
geviset.apply_video_outputs(tree, payload["videoOutputs"])
if "logging" in payload:
geviset.apply_logging(tree, payload["logging"])
data = geviset.save_set_bytes(tree)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=updated.set"},
)
@app.post("/api/excel/servers")
async def import_servers(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_servers_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/action-mappings")
async def import_action_mappings(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_actions_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/keyboards")
async def import_keyboards(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_keyboards_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/video-outputs")
async def export_video_outputs(payload: dict):
try:
tree = payload.get("tree")
if tree is None:
raise ValueError("Missing tree in payload")
outputs = geviset.extract_video_outputs(tree)
data = excel.build_video_outputs_xlsx(outputs)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=video_outputs.xlsx"},
)
@app.post("/api/excel/video-outputs/import")
async def import_video_outputs(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_video_outputs_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/logging")
async def export_logging(payload: dict):
try:
tree = payload.get("tree")
if tree is None:
raise ValueError("Missing tree in payload")
rows = geviset.extract_logging(tree)
data = excel.build_logging_xlsx(rows)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=logging.xlsx"},
)
@app.post("/api/excel/logging/import")
async def import_logging(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_logging_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/batch/build")
async def build_from_excel(
base_set: UploadFile = File(...),
servers: UploadFile = File(None),
actions: UploadFile = File(None),
outputs: UploadFile = File(None),
keyboards: UploadFile = File(None),
logging: UploadFile = File(None),
):
if not base_set.filename.lower().endswith(".set"):
raise HTTPException(status_code=400, detail="Base file must be .set")
try:
tree = geviset.load_set_bytes(await base_set.read())
if actions and actions.filename:
if not actions.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Actions must be .xlsx")
actions_parsed = excel.parse_actions_xlsx(await actions.read())
mapping = geviset.extract_mapping(tree)
with tempfile.TemporaryDirectory() as tmpdir:
map_in = Path(tmpdir) / "mapping_in.json"
rows_in = Path(tmpdir) / "actions_rows.json"
map_out = Path(tmpdir) / "mapping_out.json"
map_in.write_text(json.dumps(mapping), encoding="utf-8")
rows_in.write_text(json.dumps(actions_parsed["rows"]), encoding="utf-8")
subprocess.check_call(
["node", str(_SCRIPT_PATH), str(map_in), str(rows_in), str(_TEMPLATE_RULES), str(map_out)]
)
mapping_updated = json.loads(map_out.read_text(encoding="utf-8"))
geviset.apply_mapping(tree, mapping_updated)
mapping_for_ids = geviset.extract_mapping(tree)
camera_ids = set()
ptz_by_id = {}
for rule in mapping_updated.get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0 and isinstance(inp.get("ptz"), bool):
ptz_by_id[vid] = inp["ptz"]
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match and isinstance(inp.get("ptz"), bool):
ptz_by_id[int(match.group(1))] = inp["ptz"]
for rule in mapping_for_ids.get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0:
camera_ids.add(vid)
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match:
camera_ids.add(int(match.group(1)))
for field in rule.get("fields", []):
if field.get("name") == "VideoInput":
val = field.get("value")
if isinstance(val, int) and val > 0:
camera_ids.add(val)
elif isinstance(val, str) and val.isdigit():
camera_ids.add(int(val))
for out in rule.get("outputs", []):
action = out.get("action", "")
match = re.search(r"Camera:\\s*(\\d+)", action)
if match:
camera_ids.add(int(match.group(1)))
geviset.ensure_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_global_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_vx3_video_inputs(tree, camera_ids, ptz_by_id)
geviset.prune_video_inputs(tree, camera_ids)
if servers and servers.filename:
if not servers.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Servers must be .xlsx")
servers_parsed = excel.parse_servers_xlsx(await servers.read())
gcore_list = [s for s in servers_parsed["servers"] if s.get("type") == "gcore"]
gsc_list = [s for s in servers_parsed["servers"] if s.get("type") == "geviscope"]
for idx, s in enumerate(gcore_list, start=1):
s["id"] = str(idx)
for idx, s in enumerate(gsc_list, start=1):
s["id"] = str(idx)
bundle_gcore = geviset.extract_gcore(tree)
bundle_gsc = geviset.extract_gsc(tree)
bundle_gcore["servers"] = gcore_list
bundle_gsc["servers"] = gsc_list
geviset.apply_gcore(tree, bundle_gcore)
geviset.apply_gsc(tree, bundle_gsc)
if outputs and outputs.filename:
if not outputs.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Outputs must be .xlsx")
outputs_parsed = excel.parse_video_outputs_xlsx(await outputs.read())
geviset.apply_video_outputs(tree, outputs_parsed["outputs"])
if keyboards and keyboards.filename:
if not keyboards.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Keyboards must be .xlsx")
keyboards_parsed = excel.parse_keyboards_xlsx(await keyboards.read())
geviset.apply_keyboards(tree, keyboards_parsed["keyboards"])
if logging and logging.filename:
if not logging.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Logging must be .xlsx")
logging_parsed = excel.parse_logging_xlsx(await logging.read())
geviset.apply_logging(tree, logging_parsed["rows"])
data = geviset.save_set_bytes(tree)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=combined.set"},
)
static_dir = os.getenv("STATIC_DIR")
if static_dir and os.path.isdir(static_dir):
app.mount("/", StaticFiles(directory=static_dir, html=True), name="static")

4
backend/requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
fastapi==0.115.0
uvicorn==0.30.6
python-multipart==0.0.9
openpyxl==3.1.5