Files
GeViSetWEB/backend/app/main.py
2026-02-10 09:58:11 +01:00

354 lines
14 KiB
Python

import os
import re
import copy
import tempfile
import subprocess
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import json
from fastapi.staticfiles import StaticFiles
from . import geviset
from . import excel
from pathlib import Path
app = FastAPI(title="GeViSet Web")
_LAST_TREE = None
_ROOT_DIR = Path(__file__).resolve().parents[2]
_SCRIPT_PATH = _ROOT_DIR / "scripts" / "build_mapping.js"
_TEMPLATE_RULES = _ROOT_DIR / "frontend" / "src" / "templates" / "templateRules.json"
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/health")
async def health():
return {"ok": True}
@app.post("/api/set/parse")
async def parse_set(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".set"):
raise HTTPException(status_code=400, detail="Only .set files are supported")
contents = await file.read()
try:
tree = geviset.load_set_bytes(contents)
mapping = geviset.extract_mapping(tree)
gcore = geviset.extract_gcore(tree)
gsc = geviset.extract_gsc(tree)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
global _LAST_TREE
_LAST_TREE = tree
return {
"filename": file.filename,
"token": "last",
"tree": tree,
"mapping": mapping,
"gcore": gcore,
"gsc": gsc,
}
@app.post("/api/set/export")
async def export_set(payload: dict):
try:
tree = payload.get("tree")
token = payload.get("token")
if tree is None:
if token == "last" and _LAST_TREE is not None:
tree = copy.deepcopy(_LAST_TREE)
else:
raise ValueError("Missing tree in payload")
if "mapping" in payload:
geviset.apply_mapping(tree, payload["mapping"])
mapping_for_ids = geviset.extract_mapping(tree)
camera_ids = set()
ptz_by_id = {}
for rule in payload["mapping"].get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0 and isinstance(inp.get("ptz"), bool):
ptz_by_id[vid] = inp["ptz"]
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match and isinstance(inp.get("ptz"), bool):
ptz_by_id[int(match.group(1))] = inp["ptz"]
for rule in mapping_for_ids.get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0:
camera_ids.add(vid)
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match:
camera_ids.add(int(match.group(1)))
for field in rule.get("fields", []):
if field.get("name") == "VideoInput":
val = field.get("value")
if isinstance(val, int) and val > 0:
camera_ids.add(val)
elif isinstance(val, str) and val.isdigit():
camera_ids.add(int(val))
for out in rule.get("outputs", []):
action = out.get("action", "")
match = re.search(r"Camera:\\s*(\\d+)", action)
if match:
camera_ids.add(int(match.group(1)))
geviset.ensure_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_global_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_vx3_video_inputs(tree, camera_ids, ptz_by_id)
geviset.prune_video_inputs(tree, camera_ids)
print(
f"EXPORT camera_ids={len(camera_ids)} contains_101027={101027 in camera_ids}",
flush=True,
)
if "gcore" in payload:
geviset.apply_gcore(tree, payload["gcore"])
if "gsc" in payload:
geviset.apply_gsc(tree, payload["gsc"])
if "keyboards" in payload:
geviset.apply_keyboards(tree, payload["keyboards"])
if "videoOutputs" in payload:
geviset.apply_video_outputs(tree, payload["videoOutputs"])
if "logging" in payload:
geviset.apply_logging(tree, payload["logging"])
data = geviset.save_set_bytes(tree)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=updated.set"},
)
@app.post("/api/excel/servers")
async def import_servers(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_servers_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/action-mappings")
async def import_action_mappings(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_actions_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/keyboards")
async def import_keyboards(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_keyboards_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/video-outputs")
async def export_video_outputs(payload: dict):
try:
tree = payload.get("tree")
if tree is None:
raise ValueError("Missing tree in payload")
outputs = geviset.extract_video_outputs(tree)
data = excel.build_video_outputs_xlsx(outputs)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=video_outputs.xlsx"},
)
@app.post("/api/excel/video-outputs/import")
async def import_video_outputs(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_video_outputs_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/excel/logging")
async def export_logging(payload: dict):
try:
tree = payload.get("tree")
if tree is None:
raise ValueError("Missing tree in payload")
rows = geviset.extract_logging(tree)
data = excel.build_logging_xlsx(rows)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=logging.xlsx"},
)
@app.post("/api/excel/logging/import")
async def import_logging(file: UploadFile = File(...)):
if not file.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
contents = await file.read()
try:
parsed = excel.parse_logging_xlsx(contents)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return parsed
@app.post("/api/batch/build")
async def build_from_excel(
base_set: UploadFile = File(...),
servers: UploadFile = File(None),
actions: UploadFile = File(None),
outputs: UploadFile = File(None),
keyboards: UploadFile = File(None),
logging: UploadFile = File(None),
):
if not base_set.filename.lower().endswith(".set"):
raise HTTPException(status_code=400, detail="Base file must be .set")
try:
tree = geviset.load_set_bytes(await base_set.read())
if actions and actions.filename:
if not actions.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Actions must be .xlsx")
actions_parsed = excel.parse_actions_xlsx(await actions.read())
mapping = geviset.extract_mapping(tree)
with tempfile.TemporaryDirectory() as tmpdir:
map_in = Path(tmpdir) / "mapping_in.json"
rows_in = Path(tmpdir) / "actions_rows.json"
map_out = Path(tmpdir) / "mapping_out.json"
map_in.write_text(json.dumps(mapping), encoding="utf-8")
rows_in.write_text(json.dumps(actions_parsed["rows"]), encoding="utf-8")
subprocess.check_call(
["node", str(_SCRIPT_PATH), str(map_in), str(rows_in), str(_TEMPLATE_RULES), str(map_out)]
)
mapping_updated = json.loads(map_out.read_text(encoding="utf-8"))
geviset.apply_mapping(tree, mapping_updated)
mapping_for_ids = geviset.extract_mapping(tree)
camera_ids = set()
ptz_by_id = {}
for rule in mapping_updated.get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0 and isinstance(inp.get("ptz"), bool):
ptz_by_id[vid] = inp["ptz"]
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match and isinstance(inp.get("ptz"), bool):
ptz_by_id[int(match.group(1))] = inp["ptz"]
for rule in mapping_for_ids.get("rules", []):
inp = rule.get("input", {})
vid = inp.get("videoInputId")
if isinstance(vid, int) and vid > 0:
camera_ids.add(vid)
else:
name = inp.get("name", "")
match = re.search(r"(\\d+)$", str(name))
if match:
camera_ids.add(int(match.group(1)))
for field in rule.get("fields", []):
if field.get("name") == "VideoInput":
val = field.get("value")
if isinstance(val, int) and val > 0:
camera_ids.add(val)
elif isinstance(val, str) and val.isdigit():
camera_ids.add(int(val))
for out in rule.get("outputs", []):
action = out.get("action", "")
match = re.search(r"Camera:\\s*(\\d+)", action)
if match:
camera_ids.add(int(match.group(1)))
geviset.ensure_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_global_video_inputs(tree, camera_ids, ptz_by_id)
geviset.ensure_vx3_video_inputs(tree, camera_ids, ptz_by_id)
geviset.prune_video_inputs(tree, camera_ids)
if servers and servers.filename:
if not servers.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Servers must be .xlsx")
servers_parsed = excel.parse_servers_xlsx(await servers.read())
gcore_list = [s for s in servers_parsed["servers"] if s.get("type") == "gcore"]
gsc_list = [s for s in servers_parsed["servers"] if s.get("type") == "geviscope"]
for idx, s in enumerate(gcore_list, start=1):
s["id"] = str(idx)
for idx, s in enumerate(gsc_list, start=1):
s["id"] = str(idx)
bundle_gcore = geviset.extract_gcore(tree)
bundle_gsc = geviset.extract_gsc(tree)
bundle_gcore["servers"] = gcore_list
bundle_gsc["servers"] = gsc_list
geviset.apply_gcore(tree, bundle_gcore)
geviset.apply_gsc(tree, bundle_gsc)
if outputs and outputs.filename:
if not outputs.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Outputs must be .xlsx")
outputs_parsed = excel.parse_video_outputs_xlsx(await outputs.read())
geviset.apply_video_outputs(tree, outputs_parsed["outputs"])
if keyboards and keyboards.filename:
if not keyboards.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Keyboards must be .xlsx")
keyboards_parsed = excel.parse_keyboards_xlsx(await keyboards.read())
geviset.apply_keyboards(tree, keyboards_parsed["keyboards"])
if logging and logging.filename:
if not logging.filename.lower().endswith(".xlsx"):
raise HTTPException(status_code=400, detail="Logging must be .xlsx")
logging_parsed = excel.parse_logging_xlsx(await logging.read())
geviset.apply_logging(tree, logging_parsed["rows"])
data = geviset.save_set_bytes(tree)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
return StreamingResponse(
iter([data]),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=combined.set"},
)
static_dir = os.getenv("STATIC_DIR")
if static_dir and os.path.isdir(static_dir):
app.mount("/", StaticFiles(directory=static_dir, html=True), name="static")